summaryrefslogtreecommitdiff
path: root/ocaml/lib/runtime.ml
diff options
context:
space:
mode:
Diffstat (limited to 'ocaml/lib/runtime.ml')
-rw-r--r--ocaml/lib/runtime.ml242
1 files changed, 242 insertions, 0 deletions
diff --git a/ocaml/lib/runtime.ml b/ocaml/lib/runtime.ml
new file mode 100644
index 0000000..6befd74
--- /dev/null
+++ b/ocaml/lib/runtime.ml
@@ -0,0 +1,242 @@
+(* Runtime - Eio-based Urbit Runtime
+ *
+ * This is THE CORE of the multi-core Urbit runtime!
+ *
+ * Architecture:
+ * - Eio.Switch for structured concurrency
+ * - Eio.Stream for lock-free event queue
+ * - Fiber per I/O driver (behn, ames, http, etc)
+ * - Concurrent event processing
+ *
+ * This is fundamentally different from C Vere:
+ * - C Vere: single-threaded, blocking I/O, sequential
+ * - OCaml Overe: multi-fiber, async I/O, concurrent
+ *)
+
+(* Runtime configuration *)
+type config = {
+ pier_path: string; (* Path to pier directory *)
+ event_log_path: string; (* Event log directory *)
+ snapshot_path: string; (* Snapshot file path *)
+}
+
+(* Runtime state *)
+type t = {
+ config: config;
+ state: State.t; (* Ship state *)
+ event_log: Eventlog.t; (* Event log *)
+ effect_queue: Effects.queue; (* Effects to process *)
+
+ (* Statistics *)
+ mutable events_processed: int64;
+ mutable effects_executed: int64;
+}
+
+(* Create default config *)
+let default_config ?(pier_path="./pier") () = {
+ pier_path;
+ event_log_path = pier_path ^ "/log";
+ snapshot_path = pier_path ^ "/snapshot.jam";
+}
+
+(* Create runtime *)
+let create ~sw ~fs config =
+ let state = State.create () in
+ let event_log = Eventlog.create ~sw ~fs config.event_log_path in
+ let effect_queue = Effects.create_queue () in
+
+ {
+ config;
+ state;
+ event_log;
+ effect_queue;
+ events_processed = 0L;
+ effects_executed = 0L;
+ }
+
+(* Process a single event through Arvo
+ *
+ * Steps:
+ * 1. Run Nock to apply event to kernel
+ * 2. Update kernel state
+ * 3. Parse effects from result
+ * 4. Enqueue effects for drivers
+ *)
+let process_event runtime event_noun =
+ (* In real implementation, this would:
+ * let result = Nock.nock_on (State.get_arvo runtime.state) poke_formula in
+ * let new_state, effects = parse_result result in
+ * State.set_arvo runtime.state new_state;
+ *
+ * For now: simplified - just update event counter
+ *)
+ let _effects = State.poke runtime.state event_noun in
+ runtime.events_processed <- Int64.succ runtime.events_processed;
+
+ (* Parse effects and enqueue them *)
+ let effects = Effects.parse_effects event_noun in
+ List.iter (Effects.enqueue runtime.effect_queue) effects;
+
+ (* Append to event log *)
+ let _event_num = Eventlog.append runtime.event_log ~sw:(fun () -> ()) event_noun in
+ ()
+
+(* Event processing fiber - processes events from the queue
+ *
+ * This runs as a separate fiber, processing events as they arrive
+ *)
+let event_processor runtime ~sw:_ stream =
+ Printf.printf "[Runtime] Event processor fiber started\n%!";
+
+ try
+ while true do
+ (* Read next event from stream *)
+ let event = Eio.Stream.take stream in
+
+ (* Process the event *)
+ process_event runtime event;
+
+ (* Periodic logging *)
+ if Int64.rem runtime.events_processed 100L = 0L then
+ Printf.printf "[Runtime] Processed %Ld events\n%!" runtime.events_processed
+ done
+ with
+ | End_of_file ->
+ Printf.printf "[Runtime] Event processor shutting down\n%!"
+
+(* Effect executor fiber - executes effects as they're produced
+ *
+ * This runs as a separate fiber, executing effects concurrently
+ * with event processing
+ *)
+let effect_executor runtime ~sw:_ ~env =
+ Printf.printf "[Runtime] Effect executor fiber started\n%!";
+
+ let rec loop () =
+ match Effects.try_dequeue runtime.effect_queue with
+ | None ->
+ (* No effects, sleep briefly *)
+ Eio.Time.sleep (Eio.Stdenv.clock env) 0.001;
+ loop ()
+
+ | Some eff ->
+ (* Execute the effect *)
+ (match eff with
+ | Effects.Log msg ->
+ Printf.printf "[Effect] Log: %s\n%!" msg
+
+ | Effects.SetTimer { id; time } ->
+ Printf.printf "[Effect] SetTimer: id=%Ld time=%f\n%!" id time
+
+ | Effects.CancelTimer { id } ->
+ Printf.printf "[Effect] CancelTimer: id=%Ld\n%!" id
+
+ | _ ->
+ Printf.printf "[Effect] Other effect\n%!"
+ );
+
+ runtime.effects_executed <- Int64.succ runtime.effects_executed;
+ loop ()
+ in
+
+ try
+ loop ()
+ with
+ | End_of_file ->
+ Printf.printf "[Runtime] Effect executor shutting down\n%!"
+
+(* Main runtime loop with Eio
+ *
+ * This is THE KEY FUNCTION - the heart of the runtime!
+ *
+ * Creates:
+ * - Event stream (lock-free with Eio.Stream)
+ * - Event processor fiber
+ * - Effect executor fiber
+ * - I/O driver fibers (future)
+ *)
+let run ~env config =
+ Printf.printf "šŸš€ Starting Overe Runtime (Eio-based)\n%!";
+ Printf.printf " Pier: %s\n%!" config.pier_path;
+ Printf.printf " OCaml %s on %d cores\n%!"
+ Sys.ocaml_version (Domain.recommended_domain_count ());
+
+ Eio.Switch.run @@ fun sw ->
+ let fs = Eio.Stdenv.fs env in
+
+ (* Create runtime *)
+ let runtime = create ~sw ~fs config in
+
+ (* Create event stream (lock-free!) *)
+ let event_stream = Eio.Stream.create 1000 in
+
+ Printf.printf "āœ“ Runtime created\n%!";
+
+ (* Load snapshot if it exists *)
+ (match State.load_snapshot runtime.state ~fs config.snapshot_path with
+ | Ok eve ->
+ Printf.printf "āœ“ Loaded snapshot at event %Ld\n%!" eve
+ | Error msg ->
+ Printf.printf "⚠ No snapshot: %s (starting fresh)\n%!" msg;
+ (* Boot with empty kernel *)
+ State.boot runtime.state (Noun.atom 0)
+ );
+
+ (* Replay events from log *)
+ Printf.printf "Replaying events from log...\n%!";
+ Eventlog.replay runtime.event_log ~sw (fun event_num event ->
+ Printf.printf " Replayed event %Ld\n%!" event_num;
+ process_event runtime event
+ );
+
+ Printf.printf "āœ“ Runtime ready! State: %s\n%!" (State.summary runtime.state);
+ Printf.printf "\n";
+
+ (* Spawn concurrent fibers using Eio.Fiber.both *)
+ Eio.Fiber.both
+ (* Event processor fiber *)
+ (fun () -> event_processor runtime ~sw event_stream)
+
+ (* Effect executor fiber *)
+ (fun () -> effect_executor runtime ~sw ~env);
+
+ (* When we get here, runtime is shutting down *)
+ Printf.printf "\nšŸ›‘ Runtime shutting down...\n%!";
+ Printf.printf " Events processed: %Ld\n%!" runtime.events_processed;
+ Printf.printf " Effects executed: %Ld\n%!" runtime.effects_executed;
+
+ (* Save final snapshot *)
+ Printf.printf "Saving final snapshot...\n%!";
+ State.save_snapshot runtime.state ~fs config.snapshot_path;
+ Printf.printf "āœ“ Snapshot saved\n%!"
+
+(* Simplified run for testing - processes events synchronously *)
+let run_simple ~env config events =
+ Printf.printf "Running simple runtime test...\n%!";
+
+ Eio.Switch.run @@ fun sw ->
+ let fs = Eio.Stdenv.fs env in
+ let runtime = create ~sw ~fs config in
+
+ (* Process each event *)
+ List.iter (process_event runtime) events;
+
+ Printf.printf "āœ“ Processed %Ld events\n%!" runtime.events_processed;
+
+ runtime
+
+(* Statistics record *)
+type stats = {
+ events_processed: int64;
+ effects_executed: int64;
+ event_count: int;
+ state_summary: string;
+}
+
+(* Get runtime statistics *)
+let get_stats (runtime : t) : stats = {
+ events_processed = runtime.events_processed;
+ effects_executed = runtime.effects_executed;
+ event_count = Eventlog.event_count runtime.event_log;
+ state_summary = State.summary runtime.state;
+}