(* Event Log - Eio-based persistent event storage * * This module provides an append-only event log with: * - Async append using Eio.Path * - Parallel replay using Eio.Fiber * - Crash recovery via event replay * * Event format: * - 4 bytes: mug (murmur3 hash) * - N bytes: jammed noun * * Storage: * - Simple file-based initially (one file per event) * - Will migrate to LMDB later for better performance *) (* Event number (0-indexed) *) type event_num = int64 (* Event metadata *) type event_meta = { num: event_num; mug: int32; (* murmur3 hash *) size: int; (* size of jammed data *) } (* Event log state *) type t = { dir: Eio.Fs.dir_ty Eio.Path.t; (* base directory *) mutable last_event: event_num; (* last committed event *) } (* Create event log directory if it doesn't exist *) let create ~sw:_ ~fs path = let dir = Eio.Path.(fs / path) in (* Create directory - Eio will handle if it exists *) (try Eio.Path.mkdir dir ~perm:0o755 with _ -> ()); { dir; last_event = -1L } (* Get event filename *) let event_file log num = let filename = Printf.sprintf "event-%020Ld.jam" num in Eio.Path.(log.dir / filename) (* Compute murmur3 hash (simplified - using OCaml's Hashtbl.hash for now) *) let compute_mug (noun : Noun.noun) : int32 = Int32.of_int (Hashtbl.hash noun) (* Serialize event: 4-byte mug + jammed noun *) let serialize_event (noun : Noun.noun) : bytes = let mug = compute_mug noun in let jam_bytes = Serial.jam noun in let jam_len = Bytes.length jam_bytes in (* Create output buffer: 4 bytes (mug) + jam data *) let total_len = 4 + jam_len in let result = Bytes.create total_len in (* Write mug (little-endian) *) Bytes.set_uint8 result 0 (Int32.to_int mug land 0xff); Bytes.set_uint8 result 1 (Int32.to_int (Int32.shift_right mug 8) land 0xff); Bytes.set_uint8 result 2 (Int32.to_int (Int32.shift_right mug 16) land 0xff); Bytes.set_uint8 result 3 (Int32.to_int (Int32.shift_right mug 24) land 0xff); (* Copy jammed data *) Bytes.blit jam_bytes 0 result 4 jam_len; result (* Deserialize event: parse mug + unjam noun *) let deserialize_event (data : bytes) : event_meta * Noun.noun = if Bytes.length data < 4 then failwith "Event data too short (missing mug)"; (* Read mug (little-endian) *) let b0 = Bytes.get_uint8 data 0 in let b1 = Bytes.get_uint8 data 1 in let b2 = Bytes.get_uint8 data 2 in let b3 = Bytes.get_uint8 data 3 in let mug = Int32.of_int (b0 lor (b1 lsl 8) lor (b2 lsl 16) lor (b3 lsl 24)) in (* Extract jam data *) let jam_len = Bytes.length data - 4 in let jam_bytes = Bytes.sub data 4 jam_len in (* Cue the noun *) let noun = Serial.cue jam_bytes in (* Verify mug *) let computed_mug = compute_mug noun in if computed_mug <> mug then failwith (Printf.sprintf "Mug mismatch: expected %ld, got %ld" mug computed_mug); let meta = { num = -1L; mug; size = jam_len } in (meta, noun) (* Append event to log - async using Eio.Path *) let append log ~sw:_ (noun : Noun.noun) : event_num = let event_num = Int64.succ log.last_event in let serialized = serialize_event noun in let file_path = event_file log event_num in (* Write to file asynchronously *) Eio.Path.save ~create:(`Or_truncate 0o644) file_path (Bytes.to_string serialized); log.last_event <- event_num; event_num (* Read single event from log *) let read_event log event_num : Noun.noun = let file_path = event_file log event_num in let data = Eio.Path.load file_path in let (_meta, noun) = deserialize_event (Bytes.of_string data) in noun (* Replay all events in parallel using Eio.Fiber.fork *) let replay log ~sw:_ (callback : event_num -> Noun.noun -> unit) : unit = (* Find all event files *) let rec find_events num acc = let file_path = event_file log num in try let _ = Eio.Path.load file_path in find_events (Int64.succ num) (num :: acc) with _ -> List.rev acc in let event_nums = find_events 0L [] in (* Process events in parallel batches (fiber per event) *) (* For now, process sequentially to maintain order *) List.iter (fun num -> let noun = read_event log num in callback num noun ) event_nums; (* Update last_event based on what we found *) match List.rev event_nums with | [] -> log.last_event <- -1L | last :: _ -> log.last_event <- last (* Get the number of events in the log *) let event_count log = Int64.to_int (Int64.succ log.last_event) (* Get last event number *) let last_event_num log = log.last_event