diff options
Diffstat (limited to 'ocaml/lib/eventlog.ml')
-rw-r--r-- | ocaml/lib/eventlog.ml | 148 |
1 files changed, 148 insertions, 0 deletions
diff --git a/ocaml/lib/eventlog.ml b/ocaml/lib/eventlog.ml new file mode 100644 index 0000000..86e422b --- /dev/null +++ b/ocaml/lib/eventlog.ml @@ -0,0 +1,148 @@ +(* Event Log - Eio-based persistent event storage + * + * This module provides an append-only event log with: + * - Async append using Eio.Path + * - Parallel replay using Eio.Fiber + * - Crash recovery via event replay + * + * Event format: + * - 4 bytes: mug (murmur3 hash) + * - N bytes: jammed noun + * + * Storage: + * - Simple file-based initially (one file per event) + * - Will migrate to LMDB later for better performance + *) + +(* Event number (0-indexed) *) +type event_num = int64 + +(* Event metadata *) +type event_meta = { + num: event_num; + mug: int32; (* murmur3 hash *) + size: int; (* size of jammed data *) +} + +(* Event log state *) +type t = { + dir: Eio.Fs.dir_ty Eio.Path.t; (* base directory *) + mutable last_event: event_num; (* last committed event *) +} + +(* Create event log directory if it doesn't exist *) +let create ~sw:_ ~fs path = + let dir = Eio.Path.(fs / path) in + (* Create directory - Eio will handle if it exists *) + (try Eio.Path.mkdir dir ~perm:0o755 with _ -> ()); + { dir; last_event = -1L } + +(* Get event filename *) +let event_file log num = + let filename = Printf.sprintf "event-%020Ld.jam" num in + Eio.Path.(log.dir / filename) + +(* Compute murmur3 hash (simplified - using OCaml's Hashtbl.hash for now) *) +let compute_mug (noun : Noun.noun) : int32 = + Int32.of_int (Hashtbl.hash noun) + +(* Serialize event: 4-byte mug + jammed noun *) +let serialize_event (noun : Noun.noun) : bytes = + let mug = compute_mug noun in + let jam_bytes = Serial.jam noun in + let jam_len = Bytes.length jam_bytes in + + (* Create output buffer: 4 bytes (mug) + jam data *) + let total_len = 4 + jam_len in + let result = Bytes.create total_len in + + (* Write mug (little-endian) *) + Bytes.set_uint8 result 0 (Int32.to_int mug land 0xff); + Bytes.set_uint8 result 1 (Int32.to_int (Int32.shift_right mug 8) land 0xff); + Bytes.set_uint8 result 2 (Int32.to_int (Int32.shift_right mug 16) land 0xff); + Bytes.set_uint8 result 3 (Int32.to_int (Int32.shift_right mug 24) land 0xff); + + (* Copy jammed data *) + Bytes.blit jam_bytes 0 result 4 jam_len; + + result + +(* Deserialize event: parse mug + unjam noun *) +let deserialize_event (data : bytes) : event_meta * Noun.noun = + if Bytes.length data < 4 then + failwith "Event data too short (missing mug)"; + + (* Read mug (little-endian) *) + let b0 = Bytes.get_uint8 data 0 in + let b1 = Bytes.get_uint8 data 1 in + let b2 = Bytes.get_uint8 data 2 in + let b3 = Bytes.get_uint8 data 3 in + let mug = Int32.of_int (b0 lor (b1 lsl 8) lor (b2 lsl 16) lor (b3 lsl 24)) in + + (* Extract jam data *) + let jam_len = Bytes.length data - 4 in + let jam_bytes = Bytes.sub data 4 jam_len in + + (* Cue the noun *) + let noun = Serial.cue jam_bytes in + + (* Verify mug *) + let computed_mug = compute_mug noun in + if computed_mug <> mug then + failwith (Printf.sprintf "Mug mismatch: expected %ld, got %ld" mug computed_mug); + + let meta = { num = -1L; mug; size = jam_len } in + (meta, noun) + +(* Append event to log - async using Eio.Path *) +let append log ~sw:_ (noun : Noun.noun) : event_num = + let event_num = Int64.succ log.last_event in + let serialized = serialize_event noun in + let file_path = event_file log event_num in + + (* Write to file asynchronously *) + Eio.Path.save ~create:(`Or_truncate 0o644) file_path (Bytes.to_string serialized); + + log.last_event <- event_num; + event_num + +(* Read single event from log *) +let read_event log event_num : Noun.noun = + let file_path = event_file log event_num in + let data = Eio.Path.load file_path in + let (_meta, noun) = deserialize_event (Bytes.of_string data) in + noun + +(* Replay all events in parallel using Eio.Fiber.fork *) +let replay log ~sw:_ (callback : event_num -> Noun.noun -> unit) : unit = + (* Find all event files *) + let rec find_events num acc = + let file_path = event_file log num in + try + let _ = Eio.Path.load file_path in + find_events (Int64.succ num) (num :: acc) + with _ -> + List.rev acc + in + + let event_nums = find_events 0L [] in + + (* Process events in parallel batches (fiber per event) *) + (* For now, process sequentially to maintain order *) + List.iter (fun num -> + let noun = read_event log num in + callback num noun + ) event_nums; + + (* Update last_event based on what we found *) + match List.rev event_nums with + | [] -> log.last_event <- -1L + | last :: _ -> log.last_event <- last + +(* Get the number of events in the log *) +let event_count log = + Int64.to_int (Int64.succ log.last_event) + +(* Get last event number *) +let last_event_num log = + log.last_event |