From 24eac75c69b3d74388bbbc8ee2b6792e7590e4c6 Mon Sep 17 00:00:00 2001 From: polwex Date: Mon, 6 Oct 2025 04:03:14 +0700 Subject: did this madman really implement parallelism on urbit --- ocaml/lib/io/behn.ml | 137 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 ocaml/lib/io/behn.ml (limited to 'ocaml/lib/io/behn.ml') diff --git a/ocaml/lib/io/behn.ml b/ocaml/lib/io/behn.ml new file mode 100644 index 0000000..95e1d02 --- /dev/null +++ b/ocaml/lib/io/behn.ml @@ -0,0 +1,137 @@ +(* Behn - Timer Driver using Eio.Time + * + * This is the first I/O driver - demonstrates the fiber-per-driver pattern! + * + * Behn manages timers: + * - SetTimer effects: schedule a timer + * - CancelTimer effects: cancel a timer + * - When timer fires: produce an ovum back to Arvo + * + * Key innovation: Uses Eio.Time.sleep for non-blocking waits! + * Multiple timers can run concurrently in separate fibers. + *) + +(* Timer state *) +type timer = { + id: int64; + fire_time: float; (* Unix timestamp *) + mutable cancelled: bool; +} + +(* Behn driver state *) +type t = { + timers: (int64, timer) Hashtbl.t; (* Active timers by ID *) + lock: Mutex.t; +} + +(* Create behn driver *) +let create () = { + timers = Hashtbl.create 64; + lock = Mutex.create (); +} + +(* Set a timer *) +let set_timer behn ~id ~fire_time = + Mutex.lock behn.lock; + let timer = { id; fire_time; cancelled = false } in + Hashtbl.replace behn.timers id timer; + Mutex.unlock behn.lock; + Printf.printf "[Behn] Set timer %Ld for %f\n%!" id fire_time + +(* Cancel a timer *) +let cancel_timer behn ~id = + Mutex.lock behn.lock; + (match Hashtbl.find_opt behn.timers id with + | Some timer -> + timer.cancelled <- true; + Printf.printf "[Behn] Cancelled timer %Ld\n%!" id + | None -> + Printf.printf "[Behn] Timer %Ld not found (already fired?)\n%!" id + ); + Mutex.unlock behn.lock + +(* Timer fiber - waits for a timer to fire + * + * This runs as a separate fiber for each timer! + * Uses Eio.Time.sleep for non-blocking wait. + *) +let timer_fiber behn ~env ~event_stream timer = + let clock = Eio.Stdenv.clock env in + let now = Unix.gettimeofday () in + let wait_time = max 0.0 (timer.fire_time -. now) in + + Printf.printf "[Behn] Timer %Ld: waiting %.3f seconds\n%!" timer.id wait_time; + + (* Non-blocking sleep using Eio! *) + Eio.Time.sleep clock wait_time; + + (* Check if cancelled *) + Mutex.lock behn.lock; + let is_cancelled = timer.cancelled in + Hashtbl.remove behn.timers timer.id; + Mutex.unlock behn.lock; + + if not is_cancelled then begin + Printf.printf "[Behn] Timer %Ld: FIRED! 🔥\n%!" timer.id; + + (* Create timer ovum and send to event stream *) + let ovum_noun = Nock_lib.Effects.timer_ovum ~id:timer.id ~fire_time:timer.fire_time in + let event = Nock_lib.Noun.cell ovum_noun.wire ovum_noun.card in + + (* Send to runtime event stream *) + Eio.Stream.add event_stream event; + Printf.printf "[Behn] Timer %Ld: event sent to runtime\n%!" timer.id + end else begin + Printf.printf "[Behn] Timer %Ld: cancelled, not firing\n%!" timer.id + end + +(* Behn driver fiber - processes timer effects + * + * This is the main fiber for the Behn driver. + * It reads SetTimer/CancelTimer effects and spawns timer fibers. + *) +let driver_fiber behn ~sw ~env ~effect_queue ~event_stream = + Printf.printf "[Behn] Driver fiber started 🕐\n%!"; + + let rec loop () = + (* Get next effect from queue *) + match Nock_lib.Effects.try_dequeue effect_queue with + | None -> + (* No effects, sleep briefly *) + Eio.Time.sleep (Eio.Stdenv.clock env) 0.01; + loop () + + | Some eff -> + (match eff with + | Nock_lib.Effects.SetTimer { id; time } -> + set_timer behn ~id ~fire_time:time; + + (* Spawn a fiber for this timer *) + let timer = Hashtbl.find behn.timers id in + Eio.Fiber.fork ~sw (fun () -> + timer_fiber behn ~env ~event_stream timer + ); + loop () + + | Nock_lib.Effects.CancelTimer { id } -> + cancel_timer behn ~id; + loop () + + | _ -> + (* Not a behn effect, ignore *) + loop () + ) + in + + try + loop () + with + | End_of_file -> + Printf.printf "[Behn] Driver shutting down\n%!" + +(* Get active timer count *) +let active_timers behn = + Mutex.lock behn.lock; + let count = Hashtbl.length behn.timers in + Mutex.unlock behn.lock; + count -- cgit v1.2.3