summaryrefslogtreecommitdiff
path: root/ocaml/lib/eventlog.ml
diff options
context:
space:
mode:
Diffstat (limited to 'ocaml/lib/eventlog.ml')
-rw-r--r--ocaml/lib/eventlog.ml195
1 files changed, 195 insertions, 0 deletions
diff --git a/ocaml/lib/eventlog.ml b/ocaml/lib/eventlog.ml
new file mode 100644
index 0000000..b0c5993
--- /dev/null
+++ b/ocaml/lib/eventlog.ml
@@ -0,0 +1,195 @@
+(* Event Log - Persistent event storage
+ *
+ * This module provides an append-only event log with:
+ * - Synchronous append for simplicity
+ * - Sequential replay
+ * - Crash recovery via event replay
+ *
+ * Event format (matches Vere):
+ * - 4 bytes: mug (murmur3 hash of jammed event)
+ * - N bytes: jammed noun
+ *
+ * Storage:
+ * - Simple file-based initially (one file per event)
+ * - Files named: event-NNNNNNNNNNNNNNNNNNNN.jam (20-digit zero-padded)
+ * - Stored in <pier>/.urb/log/
+ * - Will migrate to LMDB later for better performance
+ *)
+
+open Noun
+
+(* Event number (0-indexed) *)
+type event_num = int64
+
+(* Event metadata *)
+type event_meta = {
+ num: event_num;
+ mug: int32; (* murmur3 hash *)
+ size: int; (* size of jammed data *)
+}
+
+(* Event log state *)
+type t = {
+ log_dir: string; (* .urb/log directory *)
+ mutable last_event: event_num; (* last committed event *)
+ mutable enabled: bool; (* whether logging is enabled *)
+}
+
+let debug_enabled () =
+ match Sys.getenv_opt "NEOVERE_EVENTLOG_DEBUG" with
+ | None -> false
+ | Some value ->
+ let v = String.lowercase_ascii value in
+ not (v = "0" || v = "false" || v = "off")
+
+let debug fmt =
+ if debug_enabled () then
+ Printf.ksprintf (fun msg -> Printf.printf "[eventlog] %s\n%!" msg) fmt
+ else
+ Printf.ksprintf (fun _ -> ()) fmt
+
+(* Create event log directory structure *)
+let create ?(enabled=true) pier_path =
+ let urb_dir = Filename.concat pier_path ".urb" in
+ let log_dir = Filename.concat urb_dir "log" in
+
+ (* Create directories if they don't exist *)
+ if not (Sys.file_exists urb_dir) then begin
+ debug "creating directory: %s" urb_dir;
+ Unix.mkdir urb_dir 0o755
+ end;
+
+ if not (Sys.file_exists log_dir) then begin
+ debug "creating directory: %s" log_dir;
+ Unix.mkdir log_dir 0o755
+ end;
+
+ debug "event log initialized at: %s" log_dir;
+ { log_dir; last_event = -1L; enabled }
+
+(* Get event filename *)
+let event_file log num =
+ let filename = Printf.sprintf "event-%020Ld.jam" num in
+ Filename.concat log.log_dir filename
+
+(* Compute murmur3 hash *)
+(* TODO: Use proper murmur3 implementation - for now using OCaml's Hashtbl.hash *)
+let compute_mug (noun : noun) : int32 =
+ Int32.of_int (Hashtbl.hash noun land 0x7FFFFFFF)
+
+(* Serialize event: 4-byte mug + jammed noun *)
+let serialize_event ?(verbose=false) (noun : noun) : bytes =
+ let jam_bytes = Serial.jam ~verbose noun in
+ let mug = compute_mug noun in
+ let jam_len = Bytes.length jam_bytes in
+
+ (* Create output buffer: 4 bytes (mug) + jam data *)
+ let total_len = 4 + jam_len in
+ let result = Bytes.create total_len in
+
+ (* Write mug (little-endian) *)
+ Bytes.set_int32_le result 0 mug;
+
+ (* Copy jammed data *)
+ Bytes.blit jam_bytes 0 result 4 jam_len;
+
+ result
+
+(* Deserialize event: parse mug + unjam noun *)
+let deserialize_event ?(verbose=false) (data : bytes) : event_meta * noun =
+ if Bytes.length data < 4 then
+ failwith "Event data too short (missing mug)";
+
+ (* Read mug (little-endian) *)
+ let mug = Bytes.get_int32_le data 0 in
+
+ (* Extract jam data *)
+ let jam_len = Bytes.length data - 4 in
+ let jam_bytes = Bytes.sub data 4 jam_len in
+
+ (* Cue the noun *)
+ let noun = Serial.cue ~verbose jam_bytes in
+
+ (* Verify mug *)
+ let computed_mug = compute_mug noun in
+ if computed_mug <> mug then
+ failwith (Printf.sprintf "Mug mismatch: expected %ld, got %ld" mug computed_mug);
+
+ let meta = { num = -1L; mug; size = jam_len } in
+ (meta, noun)
+
+(* Append event to log *)
+let append ?(verbose=false) log (noun : noun) : event_num =
+ if not log.enabled then begin
+ log.last_event <- Int64.succ log.last_event;
+ log.last_event
+ end else begin
+ let event_num = Int64.succ log.last_event in
+ let serialized = serialize_event ~verbose noun in
+ let file_path = event_file log event_num in
+
+ debug "appending event %Ld to %s (%d bytes)" event_num file_path (Bytes.length serialized);
+
+ (* Write to file *)
+ let oc = open_out_bin file_path in
+ output_bytes oc serialized;
+ close_out oc;
+
+ log.last_event <- event_num;
+ event_num
+ end
+
+(* Read single event from log *)
+let read_event ?(verbose=false) log event_num : noun =
+ let file_path = event_file log event_num in
+ debug "reading event %Ld from %s" event_num file_path;
+
+ let ic = open_in_bin file_path in
+ let len = in_channel_length ic in
+ let data = really_input_string ic len in
+ close_in ic;
+
+ let (_meta, noun) = deserialize_event ~verbose (Bytes.of_string data) in
+ noun
+
+(* Replay all events sequentially *)
+let replay ?(verbose=false) log (callback : event_num -> noun -> unit) : unit =
+ debug "starting replay...";
+
+ (* Find all event files by trying to read sequentially *)
+ let rec replay_from num =
+ let file_path = event_file log num in
+ if Sys.file_exists file_path then begin
+ debug "replaying event %Ld" num;
+ let noun = read_event ~verbose log num in
+ callback num noun;
+ replay_from (Int64.succ num)
+ end else begin
+ debug "replay complete at event %Ld" num;
+ (* Update last_event to reflect what we found *)
+ if num > 0L then
+ log.last_event <- Int64.pred num
+ else
+ log.last_event <- -1L
+ end
+ in
+
+ replay_from 0L
+
+(* Get the number of events in the log *)
+let event_count log =
+ Int64.to_int (Int64.succ log.last_event)
+
+(* Get last event number *)
+let last_event_num log =
+ log.last_event
+
+(* Disable logging (for testing or special modes) *)
+let disable log =
+ log.enabled <- false;
+ debug "event logging disabled"
+
+(* Enable logging *)
+let enable log =
+ log.enabled <- true;
+ debug "event logging enabled"