(* Runtime - Eio-based Urbit Runtime * * This is THE CORE of the multi-core Urbit runtime! * * Architecture: * - Eio.Switch for structured concurrency * - Eio.Stream for lock-free event queue * - Fiber per I/O driver (behn, ames, http, etc) * - Concurrent event processing * * This is fundamentally different from C Vere: * - C Vere: single-threaded, blocking I/O, sequential * - OCaml Overe: multi-fiber, async I/O, concurrent *) (* Runtime configuration *) type config = { pier_path: string; (* Path to pier directory *) event_log_path: string; (* Event log directory *) snapshot_path: string; (* Snapshot file path *) } (* Runtime state *) type t = { config: config; state: State.t; (* Ship state *) event_log: Eventlog.t; (* Event log *) effect_queue: Effects.queue; (* Effects to process *) (* Statistics *) mutable events_processed: int64; mutable effects_executed: int64; } (* Create default config *) let default_config ?(pier_path="./pier") () = { pier_path; event_log_path = pier_path ^ "/log"; snapshot_path = pier_path ^ "/snapshot.jam"; } (* Create runtime *) let create ~sw ~fs config = let state = State.create () in let event_log = Eventlog.create ~sw ~fs config.event_log_path in let effect_queue = Effects.create_queue () in { config; state; event_log; effect_queue; events_processed = 0L; effects_executed = 0L; } (* Process a single event through Arvo * * Steps: * 1. Run Nock to apply event to kernel * 2. Update kernel state * 3. Parse effects from result * 4. Enqueue effects for drivers *) let process_event runtime event_noun = (* In real implementation, this would: * let result = Nock.nock_on (State.get_arvo runtime.state) poke_formula in * let new_state, effects = parse_result result in * State.set_arvo runtime.state new_state; * * For now: simplified - just update event counter *) let _effects = State.poke runtime.state event_noun in runtime.events_processed <- Int64.succ runtime.events_processed; (* Parse effects and enqueue them *) let effects = Effects.parse_effects event_noun in List.iter (Effects.enqueue runtime.effect_queue) effects; (* Append to event log *) let _event_num = Eventlog.append runtime.event_log ~sw:(fun () -> ()) event_noun in () (* Event processing fiber - processes events from the queue * * This runs as a separate fiber, processing events as they arrive *) let event_processor runtime ~sw:_ stream = Printf.printf "[Runtime] Event processor fiber started\n%!"; try while true do (* Read next event from stream *) let event = Eio.Stream.take stream in (* Process the event *) process_event runtime event; (* Periodic logging *) if Int64.rem runtime.events_processed 100L = 0L then Printf.printf "[Runtime] Processed %Ld events\n%!" runtime.events_processed done with | End_of_file -> Printf.printf "[Runtime] Event processor shutting down\n%!" (* Effect executor fiber - executes effects as they're produced * * This runs as a separate fiber, executing effects concurrently * with event processing *) let effect_executor runtime ~sw:_ ~env = Printf.printf "[Runtime] Effect executor fiber started\n%!"; let rec loop () = match Effects.try_dequeue runtime.effect_queue with | None -> (* No effects, sleep briefly *) Eio.Time.sleep (Eio.Stdenv.clock env) 0.001; loop () | Some eff -> (* Execute the effect *) (match eff with | Effects.Log msg -> Printf.printf "[Effect] Log: %s\n%!" msg | Effects.SetTimer { id; time } -> Printf.printf "[Effect] SetTimer: id=%Ld time=%f\n%!" id time | Effects.CancelTimer { id } -> Printf.printf "[Effect] CancelTimer: id=%Ld\n%!" id | _ -> Printf.printf "[Effect] Other effect\n%!" ); runtime.effects_executed <- Int64.succ runtime.effects_executed; loop () in try loop () with | End_of_file -> Printf.printf "[Runtime] Effect executor shutting down\n%!" (* Main runtime loop with Eio * * This is THE KEY FUNCTION - the heart of the runtime! * * Creates: * - Event stream (lock-free with Eio.Stream) * - Event processor fiber * - Effect executor fiber * - I/O driver fibers (future) *) let run ~env config = Printf.printf "šŸš€ Starting Overe Runtime (Eio-based)\n%!"; Printf.printf " Pier: %s\n%!" config.pier_path; Printf.printf " OCaml %s on %d cores\n%!" Sys.ocaml_version (Domain.recommended_domain_count ()); Eio.Switch.run @@ fun sw -> let fs = Eio.Stdenv.fs env in (* Create runtime *) let runtime = create ~sw ~fs config in (* Create event stream (lock-free!) *) let event_stream = Eio.Stream.create 1000 in Printf.printf "āœ“ Runtime created\n%!"; (* Load snapshot if it exists *) (match State.load_snapshot runtime.state ~fs config.snapshot_path with | Ok eve -> Printf.printf "āœ“ Loaded snapshot at event %Ld\n%!" eve | Error msg -> Printf.printf "⚠ No snapshot: %s (starting fresh)\n%!" msg; (* Boot with empty kernel *) State.boot runtime.state (Noun.atom 0) ); (* Replay events from log *) Printf.printf "Replaying events from log...\n%!"; Eventlog.replay runtime.event_log ~sw (fun event_num event -> Printf.printf " Replayed event %Ld\n%!" event_num; process_event runtime event ); Printf.printf "āœ“ Runtime ready! State: %s\n%!" (State.summary runtime.state); Printf.printf "\n"; (* Spawn concurrent fibers using Eio.Fiber.both *) Eio.Fiber.both (* Event processor fiber *) (fun () -> event_processor runtime ~sw event_stream) (* Effect executor fiber *) (fun () -> effect_executor runtime ~sw ~env); (* When we get here, runtime is shutting down *) Printf.printf "\nšŸ›‘ Runtime shutting down...\n%!"; Printf.printf " Events processed: %Ld\n%!" runtime.events_processed; Printf.printf " Effects executed: %Ld\n%!" runtime.effects_executed; (* Save final snapshot *) Printf.printf "Saving final snapshot...\n%!"; State.save_snapshot runtime.state ~fs config.snapshot_path; Printf.printf "āœ“ Snapshot saved\n%!" (* Simplified run for testing - processes events synchronously *) let run_simple ~env config events = Printf.printf "Running simple runtime test...\n%!"; Eio.Switch.run @@ fun sw -> let fs = Eio.Stdenv.fs env in let runtime = create ~sw ~fs config in (* Process each event *) List.iter (process_event runtime) events; Printf.printf "āœ“ Processed %Ld events\n%!" runtime.events_processed; runtime (* Statistics record *) type stats = { events_processed: int64; effects_executed: int64; event_count: int; state_summary: string; } (* Get runtime statistics *) let get_stats (runtime : t) : stats = { events_processed = runtime.events_processed; effects_executed = runtime.effects_executed; event_count = Eventlog.event_count runtime.event_log; state_summary = State.summary runtime.state; }