From d21900836f89b2bf9cd55ff1708a4619c8b89656 Mon Sep 17 00:00:00 2001 From: polwex Date: Mon, 20 Oct 2025 13:13:39 +0700 Subject: neoinit --- ocaml/lib/eventlog.ml | 195 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 195 insertions(+) create mode 100644 ocaml/lib/eventlog.ml (limited to 'ocaml/lib/eventlog.ml') diff --git a/ocaml/lib/eventlog.ml b/ocaml/lib/eventlog.ml new file mode 100644 index 0000000..b0c5993 --- /dev/null +++ b/ocaml/lib/eventlog.ml @@ -0,0 +1,195 @@ +(* Event Log - Persistent event storage + * + * This module provides an append-only event log with: + * - Synchronous append for simplicity + * - Sequential replay + * - Crash recovery via event replay + * + * Event format (matches Vere): + * - 4 bytes: mug (murmur3 hash of jammed event) + * - N bytes: jammed noun + * + * Storage: + * - Simple file-based initially (one file per event) + * - Files named: event-NNNNNNNNNNNNNNNNNNNN.jam (20-digit zero-padded) + * - Stored in /.urb/log/ + * - Will migrate to LMDB later for better performance + *) + +open Noun + +(* Event number (0-indexed) *) +type event_num = int64 + +(* Event metadata *) +type event_meta = { + num: event_num; + mug: int32; (* murmur3 hash *) + size: int; (* size of jammed data *) +} + +(* Event log state *) +type t = { + log_dir: string; (* .urb/log directory *) + mutable last_event: event_num; (* last committed event *) + mutable enabled: bool; (* whether logging is enabled *) +} + +let debug_enabled () = + match Sys.getenv_opt "NEOVERE_EVENTLOG_DEBUG" with + | None -> false + | Some value -> + let v = String.lowercase_ascii value in + not (v = "0" || v = "false" || v = "off") + +let debug fmt = + if debug_enabled () then + Printf.ksprintf (fun msg -> Printf.printf "[eventlog] %s\n%!" msg) fmt + else + Printf.ksprintf (fun _ -> ()) fmt + +(* Create event log directory structure *) +let create ?(enabled=true) pier_path = + let urb_dir = Filename.concat pier_path ".urb" in + let log_dir = Filename.concat urb_dir "log" in + + (* Create directories if they don't exist *) + if not (Sys.file_exists urb_dir) then begin + debug "creating directory: %s" urb_dir; + Unix.mkdir urb_dir 0o755 + end; + + if not (Sys.file_exists log_dir) then begin + debug "creating directory: %s" log_dir; + Unix.mkdir log_dir 0o755 + end; + + debug "event log initialized at: %s" log_dir; + { log_dir; last_event = -1L; enabled } + +(* Get event filename *) +let event_file log num = + let filename = Printf.sprintf "event-%020Ld.jam" num in + Filename.concat log.log_dir filename + +(* Compute murmur3 hash *) +(* TODO: Use proper murmur3 implementation - for now using OCaml's Hashtbl.hash *) +let compute_mug (noun : noun) : int32 = + Int32.of_int (Hashtbl.hash noun land 0x7FFFFFFF) + +(* Serialize event: 4-byte mug + jammed noun *) +let serialize_event ?(verbose=false) (noun : noun) : bytes = + let jam_bytes = Serial.jam ~verbose noun in + let mug = compute_mug noun in + let jam_len = Bytes.length jam_bytes in + + (* Create output buffer: 4 bytes (mug) + jam data *) + let total_len = 4 + jam_len in + let result = Bytes.create total_len in + + (* Write mug (little-endian) *) + Bytes.set_int32_le result 0 mug; + + (* Copy jammed data *) + Bytes.blit jam_bytes 0 result 4 jam_len; + + result + +(* Deserialize event: parse mug + unjam noun *) +let deserialize_event ?(verbose=false) (data : bytes) : event_meta * noun = + if Bytes.length data < 4 then + failwith "Event data too short (missing mug)"; + + (* Read mug (little-endian) *) + let mug = Bytes.get_int32_le data 0 in + + (* Extract jam data *) + let jam_len = Bytes.length data - 4 in + let jam_bytes = Bytes.sub data 4 jam_len in + + (* Cue the noun *) + let noun = Serial.cue ~verbose jam_bytes in + + (* Verify mug *) + let computed_mug = compute_mug noun in + if computed_mug <> mug then + failwith (Printf.sprintf "Mug mismatch: expected %ld, got %ld" mug computed_mug); + + let meta = { num = -1L; mug; size = jam_len } in + (meta, noun) + +(* Append event to log *) +let append ?(verbose=false) log (noun : noun) : event_num = + if not log.enabled then begin + log.last_event <- Int64.succ log.last_event; + log.last_event + end else begin + let event_num = Int64.succ log.last_event in + let serialized = serialize_event ~verbose noun in + let file_path = event_file log event_num in + + debug "appending event %Ld to %s (%d bytes)" event_num file_path (Bytes.length serialized); + + (* Write to file *) + let oc = open_out_bin file_path in + output_bytes oc serialized; + close_out oc; + + log.last_event <- event_num; + event_num + end + +(* Read single event from log *) +let read_event ?(verbose=false) log event_num : noun = + let file_path = event_file log event_num in + debug "reading event %Ld from %s" event_num file_path; + + let ic = open_in_bin file_path in + let len = in_channel_length ic in + let data = really_input_string ic len in + close_in ic; + + let (_meta, noun) = deserialize_event ~verbose (Bytes.of_string data) in + noun + +(* Replay all events sequentially *) +let replay ?(verbose=false) log (callback : event_num -> noun -> unit) : unit = + debug "starting replay..."; + + (* Find all event files by trying to read sequentially *) + let rec replay_from num = + let file_path = event_file log num in + if Sys.file_exists file_path then begin + debug "replaying event %Ld" num; + let noun = read_event ~verbose log num in + callback num noun; + replay_from (Int64.succ num) + end else begin + debug "replay complete at event %Ld" num; + (* Update last_event to reflect what we found *) + if num > 0L then + log.last_event <- Int64.pred num + else + log.last_event <- -1L + end + in + + replay_from 0L + +(* Get the number of events in the log *) +let event_count log = + Int64.to_int (Int64.succ log.last_event) + +(* Get last event number *) +let last_event_num log = + log.last_event + +(* Disable logging (for testing or special modes) *) +let disable log = + log.enabled <- false; + debug "event logging disabled" + +(* Enable logging *) +let enable log = + log.enabled <- true; + debug "event logging enabled" -- cgit v1.2.3