(* Event Log - Persistent event storage * * This module provides an append-only event log with: * - Synchronous append for simplicity * - Sequential replay * - Crash recovery via event replay * * Event format (matches Vere): * - 4 bytes: mug (murmur3 hash of jammed event) * - N bytes: jammed noun * * Storage: * - Simple file-based initially (one file per event) * - Files named: event-NNNNNNNNNNNNNNNNNNNN.jam (20-digit zero-padded) * - Stored in /.urb/log/ * - Will migrate to LMDB later for better performance *) open Noun (* Event number (0-indexed) *) type event_num = int64 (* Event metadata *) type event_meta = { num: event_num; mug: int32; (* murmur3 hash *) size: int; (* size of jammed data *) } (* Event log state *) type t = { log_dir: string; (* .urb/log directory *) mutable last_event: event_num; (* last committed event *) mutable enabled: bool; (* whether logging is enabled *) } let debug_enabled () = match Sys.getenv_opt "NEOVERE_EVENTLOG_DEBUG" with | None -> false | Some value -> let v = String.lowercase_ascii value in not (v = "0" || v = "false" || v = "off") let debug fmt = if debug_enabled () then Printf.ksprintf (fun msg -> Printf.printf "[eventlog] %s\n%!" msg) fmt else Printf.ksprintf (fun _ -> ()) fmt (* Create event log directory structure *) let create ?(enabled=true) pier_path = let urb_dir = Filename.concat pier_path ".urb" in let log_dir = Filename.concat urb_dir "log" in (* Create directories if they don't exist *) if not (Sys.file_exists urb_dir) then begin debug "creating directory: %s" urb_dir; Unix.mkdir urb_dir 0o755 end; if not (Sys.file_exists log_dir) then begin debug "creating directory: %s" log_dir; Unix.mkdir log_dir 0o755 end; debug "event log initialized at: %s" log_dir; { log_dir; last_event = -1L; enabled } (* Get event filename *) let event_file log num = let filename = Printf.sprintf "event-%020Ld.jam" num in Filename.concat log.log_dir filename (* Compute murmur3 hash *) (* TODO: Use proper murmur3 implementation - for now using OCaml's Hashtbl.hash *) let compute_mug (noun : noun) : int32 = Int32.of_int (Hashtbl.hash noun land 0x7FFFFFFF) (* Serialize event: 4-byte mug + jammed noun *) let serialize_event ?(verbose=false) (noun : noun) : bytes = let jam_bytes = Serial.jam ~verbose noun in let mug = compute_mug noun in let jam_len = Bytes.length jam_bytes in (* Create output buffer: 4 bytes (mug) + jam data *) let total_len = 4 + jam_len in let result = Bytes.create total_len in (* Write mug (little-endian) *) Bytes.set_int32_le result 0 mug; (* Copy jammed data *) Bytes.blit jam_bytes 0 result 4 jam_len; result (* Deserialize event: parse mug + unjam noun *) let deserialize_event ?(verbose=false) (data : bytes) : event_meta * noun = if Bytes.length data < 4 then failwith "Event data too short (missing mug)"; (* Read mug (little-endian) *) let mug = Bytes.get_int32_le data 0 in (* Extract jam data *) let jam_len = Bytes.length data - 4 in let jam_bytes = Bytes.sub data 4 jam_len in (* Cue the noun *) let noun = Serial.cue ~verbose jam_bytes in (* Verify mug *) let computed_mug = compute_mug noun in if computed_mug <> mug then failwith (Printf.sprintf "Mug mismatch: expected %ld, got %ld" mug computed_mug); let meta = { num = -1L; mug; size = jam_len } in (meta, noun) (* Append event to log *) let append ?(verbose=false) log (noun : noun) : event_num = if not log.enabled then begin log.last_event <- Int64.succ log.last_event; log.last_event end else begin let event_num = Int64.succ log.last_event in let serialized = serialize_event ~verbose noun in let file_path = event_file log event_num in debug "appending event %Ld to %s (%d bytes)" event_num file_path (Bytes.length serialized); (* Write to file *) let oc = open_out_bin file_path in output_bytes oc serialized; close_out oc; log.last_event <- event_num; event_num end (* Read single event from log *) let read_event ?(verbose=false) log event_num : noun = let file_path = event_file log event_num in debug "reading event %Ld from %s" event_num file_path; let ic = open_in_bin file_path in let len = in_channel_length ic in let data = really_input_string ic len in close_in ic; let (_meta, noun) = deserialize_event ~verbose (Bytes.of_string data) in noun (* Replay all events sequentially *) let replay ?(verbose=false) log (callback : event_num -> noun -> unit) : unit = debug "starting replay..."; (* Find all event files by trying to read sequentially *) let rec replay_from num = let file_path = event_file log num in if Sys.file_exists file_path then begin debug "replaying event %Ld" num; let noun = read_event ~verbose log num in callback num noun; replay_from (Int64.succ num) end else begin debug "replay complete at event %Ld" num; (* Update last_event to reflect what we found *) if num > 0L then log.last_event <- Int64.pred num else log.last_event <- -1L end in replay_from 0L (* Get the number of events in the log *) let event_count log = Int64.to_int (Int64.succ log.last_event) (* Get last event number *) let last_event_num log = log.last_event (* Disable logging (for testing or special modes) *) let disable log = log.enabled <- false; debug "event logging disabled" (* Enable logging *) let enable log = log.enabled <- true; debug "event logging enabled"