From 24eac75c69b3d74388bbbc8ee2b6792e7590e4c6 Mon Sep 17 00:00:00 2001 From: polwex Date: Mon, 6 Oct 2025 04:03:14 +0700 Subject: did this madman really implement parallelism on urbit --- ocaml/lib/runtime.ml | 242 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 ocaml/lib/runtime.ml (limited to 'ocaml/lib/runtime.ml') diff --git a/ocaml/lib/runtime.ml b/ocaml/lib/runtime.ml new file mode 100644 index 0000000..6befd74 --- /dev/null +++ b/ocaml/lib/runtime.ml @@ -0,0 +1,242 @@ +(* Runtime - Eio-based Urbit Runtime + * + * This is THE CORE of the multi-core Urbit runtime! + * + * Architecture: + * - Eio.Switch for structured concurrency + * - Eio.Stream for lock-free event queue + * - Fiber per I/O driver (behn, ames, http, etc) + * - Concurrent event processing + * + * This is fundamentally different from C Vere: + * - C Vere: single-threaded, blocking I/O, sequential + * - OCaml Overe: multi-fiber, async I/O, concurrent + *) + +(* Runtime configuration *) +type config = { + pier_path: string; (* Path to pier directory *) + event_log_path: string; (* Event log directory *) + snapshot_path: string; (* Snapshot file path *) +} + +(* Runtime state *) +type t = { + config: config; + state: State.t; (* Ship state *) + event_log: Eventlog.t; (* Event log *) + effect_queue: Effects.queue; (* Effects to process *) + + (* Statistics *) + mutable events_processed: int64; + mutable effects_executed: int64; +} + +(* Create default config *) +let default_config ?(pier_path="./pier") () = { + pier_path; + event_log_path = pier_path ^ "/log"; + snapshot_path = pier_path ^ "/snapshot.jam"; +} + +(* Create runtime *) +let create ~sw ~fs config = + let state = State.create () in + let event_log = Eventlog.create ~sw ~fs config.event_log_path in + let effect_queue = Effects.create_queue () in + + { + config; + state; + event_log; + effect_queue; + events_processed = 0L; + effects_executed = 0L; + } + +(* Process a single event through Arvo + * + * Steps: + * 1. Run Nock to apply event to kernel + * 2. Update kernel state + * 3. Parse effects from result + * 4. Enqueue effects for drivers + *) +let process_event runtime event_noun = + (* In real implementation, this would: + * let result = Nock.nock_on (State.get_arvo runtime.state) poke_formula in + * let new_state, effects = parse_result result in + * State.set_arvo runtime.state new_state; + * + * For now: simplified - just update event counter + *) + let _effects = State.poke runtime.state event_noun in + runtime.events_processed <- Int64.succ runtime.events_processed; + + (* Parse effects and enqueue them *) + let effects = Effects.parse_effects event_noun in + List.iter (Effects.enqueue runtime.effect_queue) effects; + + (* Append to event log *) + let _event_num = Eventlog.append runtime.event_log ~sw:(fun () -> ()) event_noun in + () + +(* Event processing fiber - processes events from the queue + * + * This runs as a separate fiber, processing events as they arrive + *) +let event_processor runtime ~sw:_ stream = + Printf.printf "[Runtime] Event processor fiber started\n%!"; + + try + while true do + (* Read next event from stream *) + let event = Eio.Stream.take stream in + + (* Process the event *) + process_event runtime event; + + (* Periodic logging *) + if Int64.rem runtime.events_processed 100L = 0L then + Printf.printf "[Runtime] Processed %Ld events\n%!" runtime.events_processed + done + with + | End_of_file -> + Printf.printf "[Runtime] Event processor shutting down\n%!" + +(* Effect executor fiber - executes effects as they're produced + * + * This runs as a separate fiber, executing effects concurrently + * with event processing + *) +let effect_executor runtime ~sw:_ ~env = + Printf.printf "[Runtime] Effect executor fiber started\n%!"; + + let rec loop () = + match Effects.try_dequeue runtime.effect_queue with + | None -> + (* No effects, sleep briefly *) + Eio.Time.sleep (Eio.Stdenv.clock env) 0.001; + loop () + + | Some eff -> + (* Execute the effect *) + (match eff with + | Effects.Log msg -> + Printf.printf "[Effect] Log: %s\n%!" msg + + | Effects.SetTimer { id; time } -> + Printf.printf "[Effect] SetTimer: id=%Ld time=%f\n%!" id time + + | Effects.CancelTimer { id } -> + Printf.printf "[Effect] CancelTimer: id=%Ld\n%!" id + + | _ -> + Printf.printf "[Effect] Other effect\n%!" + ); + + runtime.effects_executed <- Int64.succ runtime.effects_executed; + loop () + in + + try + loop () + with + | End_of_file -> + Printf.printf "[Runtime] Effect executor shutting down\n%!" + +(* Main runtime loop with Eio + * + * This is THE KEY FUNCTION - the heart of the runtime! + * + * Creates: + * - Event stream (lock-free with Eio.Stream) + * - Event processor fiber + * - Effect executor fiber + * - I/O driver fibers (future) + *) +let run ~env config = + Printf.printf "šŸš€ Starting Overe Runtime (Eio-based)\n%!"; + Printf.printf " Pier: %s\n%!" config.pier_path; + Printf.printf " OCaml %s on %d cores\n%!" + Sys.ocaml_version (Domain.recommended_domain_count ()); + + Eio.Switch.run @@ fun sw -> + let fs = Eio.Stdenv.fs env in + + (* Create runtime *) + let runtime = create ~sw ~fs config in + + (* Create event stream (lock-free!) *) + let event_stream = Eio.Stream.create 1000 in + + Printf.printf "āœ“ Runtime created\n%!"; + + (* Load snapshot if it exists *) + (match State.load_snapshot runtime.state ~fs config.snapshot_path with + | Ok eve -> + Printf.printf "āœ“ Loaded snapshot at event %Ld\n%!" eve + | Error msg -> + Printf.printf "⚠ No snapshot: %s (starting fresh)\n%!" msg; + (* Boot with empty kernel *) + State.boot runtime.state (Noun.atom 0) + ); + + (* Replay events from log *) + Printf.printf "Replaying events from log...\n%!"; + Eventlog.replay runtime.event_log ~sw (fun event_num event -> + Printf.printf " Replayed event %Ld\n%!" event_num; + process_event runtime event + ); + + Printf.printf "āœ“ Runtime ready! State: %s\n%!" (State.summary runtime.state); + Printf.printf "\n"; + + (* Spawn concurrent fibers using Eio.Fiber.both *) + Eio.Fiber.both + (* Event processor fiber *) + (fun () -> event_processor runtime ~sw event_stream) + + (* Effect executor fiber *) + (fun () -> effect_executor runtime ~sw ~env); + + (* When we get here, runtime is shutting down *) + Printf.printf "\nšŸ›‘ Runtime shutting down...\n%!"; + Printf.printf " Events processed: %Ld\n%!" runtime.events_processed; + Printf.printf " Effects executed: %Ld\n%!" runtime.effects_executed; + + (* Save final snapshot *) + Printf.printf "Saving final snapshot...\n%!"; + State.save_snapshot runtime.state ~fs config.snapshot_path; + Printf.printf "āœ“ Snapshot saved\n%!" + +(* Simplified run for testing - processes events synchronously *) +let run_simple ~env config events = + Printf.printf "Running simple runtime test...\n%!"; + + Eio.Switch.run @@ fun sw -> + let fs = Eio.Stdenv.fs env in + let runtime = create ~sw ~fs config in + + (* Process each event *) + List.iter (process_event runtime) events; + + Printf.printf "āœ“ Processed %Ld events\n%!" runtime.events_processed; + + runtime + +(* Statistics record *) +type stats = { + events_processed: int64; + effects_executed: int64; + event_count: int; + state_summary: string; +} + +(* Get runtime statistics *) +let get_stats (runtime : t) : stats = { + events_processed = runtime.events_processed; + effects_executed = runtime.effects_executed; + event_count = Eventlog.event_count runtime.event_log; + state_summary = State.summary runtime.state; +} -- cgit v1.2.3