summaryrefslogtreecommitdiff
path: root/ocaml/lib/eventlog.ml
diff options
context:
space:
mode:
authorpolwex <polwex@sortug.com>2025-10-06 04:03:14 +0700
committerpolwex <polwex@sortug.com>2025-10-06 04:03:14 +0700
commit24eac75c69b3d74388bbbc8ee2b6792e7590e4c6 (patch)
tree3e3a22dde0d977dca4b28fc92ada0faea24990f7 /ocaml/lib/eventlog.ml
parentfd51dfdccf7b565e4214fe47a1420a9990fab342 (diff)
did this madman really implement parallelism on urbit
Diffstat (limited to 'ocaml/lib/eventlog.ml')
-rw-r--r--ocaml/lib/eventlog.ml148
1 files changed, 148 insertions, 0 deletions
diff --git a/ocaml/lib/eventlog.ml b/ocaml/lib/eventlog.ml
new file mode 100644
index 0000000..86e422b
--- /dev/null
+++ b/ocaml/lib/eventlog.ml
@@ -0,0 +1,148 @@
+(* Event Log - Eio-based persistent event storage
+ *
+ * This module provides an append-only event log with:
+ * - Async append using Eio.Path
+ * - Parallel replay using Eio.Fiber
+ * - Crash recovery via event replay
+ *
+ * Event format:
+ * - 4 bytes: mug (murmur3 hash)
+ * - N bytes: jammed noun
+ *
+ * Storage:
+ * - Simple file-based initially (one file per event)
+ * - Will migrate to LMDB later for better performance
+ *)
+
+(* Event number (0-indexed) *)
+type event_num = int64
+
+(* Event metadata *)
+type event_meta = {
+ num: event_num;
+ mug: int32; (* murmur3 hash *)
+ size: int; (* size of jammed data *)
+}
+
+(* Event log state *)
+type t = {
+ dir: Eio.Fs.dir_ty Eio.Path.t; (* base directory *)
+ mutable last_event: event_num; (* last committed event *)
+}
+
+(* Create event log directory if it doesn't exist *)
+let create ~sw:_ ~fs path =
+ let dir = Eio.Path.(fs / path) in
+ (* Create directory - Eio will handle if it exists *)
+ (try Eio.Path.mkdir dir ~perm:0o755 with _ -> ());
+ { dir; last_event = -1L }
+
+(* Get event filename *)
+let event_file log num =
+ let filename = Printf.sprintf "event-%020Ld.jam" num in
+ Eio.Path.(log.dir / filename)
+
+(* Compute murmur3 hash (simplified - using OCaml's Hashtbl.hash for now) *)
+let compute_mug (noun : Noun.noun) : int32 =
+ Int32.of_int (Hashtbl.hash noun)
+
+(* Serialize event: 4-byte mug + jammed noun *)
+let serialize_event (noun : Noun.noun) : bytes =
+ let mug = compute_mug noun in
+ let jam_bytes = Serial.jam noun in
+ let jam_len = Bytes.length jam_bytes in
+
+ (* Create output buffer: 4 bytes (mug) + jam data *)
+ let total_len = 4 + jam_len in
+ let result = Bytes.create total_len in
+
+ (* Write mug (little-endian) *)
+ Bytes.set_uint8 result 0 (Int32.to_int mug land 0xff);
+ Bytes.set_uint8 result 1 (Int32.to_int (Int32.shift_right mug 8) land 0xff);
+ Bytes.set_uint8 result 2 (Int32.to_int (Int32.shift_right mug 16) land 0xff);
+ Bytes.set_uint8 result 3 (Int32.to_int (Int32.shift_right mug 24) land 0xff);
+
+ (* Copy jammed data *)
+ Bytes.blit jam_bytes 0 result 4 jam_len;
+
+ result
+
+(* Deserialize event: parse mug + unjam noun *)
+let deserialize_event (data : bytes) : event_meta * Noun.noun =
+ if Bytes.length data < 4 then
+ failwith "Event data too short (missing mug)";
+
+ (* Read mug (little-endian) *)
+ let b0 = Bytes.get_uint8 data 0 in
+ let b1 = Bytes.get_uint8 data 1 in
+ let b2 = Bytes.get_uint8 data 2 in
+ let b3 = Bytes.get_uint8 data 3 in
+ let mug = Int32.of_int (b0 lor (b1 lsl 8) lor (b2 lsl 16) lor (b3 lsl 24)) in
+
+ (* Extract jam data *)
+ let jam_len = Bytes.length data - 4 in
+ let jam_bytes = Bytes.sub data 4 jam_len in
+
+ (* Cue the noun *)
+ let noun = Serial.cue jam_bytes in
+
+ (* Verify mug *)
+ let computed_mug = compute_mug noun in
+ if computed_mug <> mug then
+ failwith (Printf.sprintf "Mug mismatch: expected %ld, got %ld" mug computed_mug);
+
+ let meta = { num = -1L; mug; size = jam_len } in
+ (meta, noun)
+
+(* Append event to log - async using Eio.Path *)
+let append log ~sw:_ (noun : Noun.noun) : event_num =
+ let event_num = Int64.succ log.last_event in
+ let serialized = serialize_event noun in
+ let file_path = event_file log event_num in
+
+ (* Write to file asynchronously *)
+ Eio.Path.save ~create:(`Or_truncate 0o644) file_path (Bytes.to_string serialized);
+
+ log.last_event <- event_num;
+ event_num
+
+(* Read single event from log *)
+let read_event log event_num : Noun.noun =
+ let file_path = event_file log event_num in
+ let data = Eio.Path.load file_path in
+ let (_meta, noun) = deserialize_event (Bytes.of_string data) in
+ noun
+
+(* Replay all events in parallel using Eio.Fiber.fork *)
+let replay log ~sw:_ (callback : event_num -> Noun.noun -> unit) : unit =
+ (* Find all event files *)
+ let rec find_events num acc =
+ let file_path = event_file log num in
+ try
+ let _ = Eio.Path.load file_path in
+ find_events (Int64.succ num) (num :: acc)
+ with _ ->
+ List.rev acc
+ in
+
+ let event_nums = find_events 0L [] in
+
+ (* Process events in parallel batches (fiber per event) *)
+ (* For now, process sequentially to maintain order *)
+ List.iter (fun num ->
+ let noun = read_event log num in
+ callback num noun
+ ) event_nums;
+
+ (* Update last_event based on what we found *)
+ match List.rev event_nums with
+ | [] -> log.last_event <- -1L
+ | last :: _ -> log.last_event <- last
+
+(* Get the number of events in the log *)
+let event_count log =
+ Int64.to_int (Int64.succ log.last_event)
+
+(* Get last event number *)
+let last_event_num log =
+ log.last_event