[@@@ocaml.warning "-69"] open Noun (* Event log using LMDB backend to match Vere's implementation *) type event_num = int64 external murmur3_hash32 : string -> int32 = "caml_murmur3_hash32" let debug_enabled () = match Sys.getenv_opt "NEOVERE_EVENTLOG_DEBUG" with | None -> false | Some value -> let v = String.lowercase_ascii value in not (v = "0" || v = "false" || v = "off") let debug fmt = if debug_enabled () then Printf.ksprintf (fun msg -> Printf.printf "[eventlog_lmdb] %s\n%!" msg) fmt else Printf.ksprintf (fun _ -> ()) fmt (* LMDB environment and databases *) type t = { env: Lmdb.Env.t; events_map: (int64, string, [`Uni]) Lmdb.Map.t; (* event_num -> serialized bytes *) meta_map: (string, string, [`Uni]) Lmdb.Map.t; (* metadata key-value *) mutable last_event: event_num; mutable enabled: bool; } (* Serialize event: 4-byte mug + jammed noun (matches Vere) *) let serialize_event ?(verbose=false) (noun : noun) : string = let jam_bytes = Serial.jam ~verbose noun in (* Compute mug from jammed bytes instead of traversing the noun tree This is much faster for large nouns (3M+ nodes) and avoids stack overflow *) let mug = murmur3_hash32 (Bytes.to_string jam_bytes) in let jam_len = Bytes.length jam_bytes in let total_len = 4 + jam_len in let result = Bytes.create total_len in Bytes.set_int32_le result 0 mug; Bytes.blit jam_bytes 0 result 4 jam_len; Bytes.to_string result (* Deserialize event: extract mug and cue the noun *) let deserialize_event ?(verbose=false) (data : string) : noun = if String.length data < 4 then failwith "Event data too short (< 4 bytes)"; let _mug = String.get_int32_le data 0 in let jam_data = String.sub data 4 (String.length data - 4) in Serial.cue ~verbose (Bytes.of_string jam_data) (* Create or open event log *) let create ?(enabled=true) (pier_path : string) : t = debug "opening LMDB event log at: %s" pier_path; (* Create .urb/log directory if it doesn't exist *) let urb_dir = Filename.concat pier_path ".urb" in let log_dir = Filename.concat urb_dir "log" in if not (Sys.file_exists urb_dir) then Unix.mkdir urb_dir 0o755; if not (Sys.file_exists log_dir) then Unix.mkdir log_dir 0o755; (* Open LMDB environment - match Vere's default of 1TB (0x10000000000) *) let env = Lmdb.Env.create Lmdb.Rw ~max_maps:2 ~map_size:0x10000000000 (* 1TB - matches Vere's default *) ~flags:Lmdb.Env.Flags.no_subdir (Filename.concat log_dir "data.mdb") in (* Open or create the two databases *) let events_map = try Lmdb.Map.open_existing Lmdb.Map.Nodup ~name:"EVENTS" ~key:Lmdb.Conv.int64_le ~value:Lmdb.Conv.string env with Not_found -> Lmdb.Map.create Lmdb.Map.Nodup ~name:"EVENTS" ~key:Lmdb.Conv.int64_le ~value:Lmdb.Conv.string env in let meta_map = try Lmdb.Map.open_existing Lmdb.Map.Nodup ~name:"META" ~key:Lmdb.Conv.string ~value:Lmdb.Conv.string env with Not_found -> Lmdb.Map.create Lmdb.Map.Nodup ~name:"META" ~key:Lmdb.Conv.string ~value:Lmdb.Conv.string env in (* Read last event number from metadata or find highest event *) let last_event = try let last_str = Lmdb.Map.get meta_map "last_event" in Int64.of_string last_str with Not_found -> (* Find highest event number by iterating backwards *) try match Lmdb.Txn.go Lmdb.Ro env (fun txn -> Lmdb.Cursor.go Lmdb.Ro ~txn events_map (fun cursor -> let (event_num, _) = Lmdb.Cursor.last cursor in debug "found last event: %Ld" event_num; event_num ) ) with | Some event_num -> event_num | None -> 0L with Not_found -> 0L in debug "last event number: %Ld" last_event; { env; events_map; meta_map; last_event; enabled } (* Close the event log *) let close (log : t) : unit = debug "closing event log"; Lmdb.Env.sync log.env; Lmdb.Env.close log.env (* Append a new event to the log *) let append ?(verbose=false) (log : t) (noun : noun) : event_num = if not log.enabled then begin log.last_event <- Int64.succ log.last_event; log.last_event end else begin let event_num = Int64.succ log.last_event in debug "appending event %Ld" event_num; debug "serializing event %Ld..." event_num; let serialized = serialize_event ~verbose noun in debug "serialized event %Ld: %d bytes" event_num (String.length serialized); (* Write event in a transaction *) debug "starting LMDB transaction for event %Ld" event_num; Lmdb.Txn.go Lmdb.Rw log.env (fun txn -> debug "writing event %Ld to EVENTS map" event_num; Lmdb.Map.set log.events_map ~txn event_num serialized; debug "writing metadata for event %Ld" event_num; Lmdb.Map.set log.meta_map ~txn "last_event" (Int64.to_string event_num); debug "transaction complete for event %Ld" event_num; Some () ) |> ignore; log.last_event <- event_num; debug "wrote event %Ld (%d bytes)" event_num (String.length serialized); event_num end (* Read a specific event from the log *) let read_event ?(verbose=false) (log : t) (event_num : event_num) : noun = debug "reading event %Ld" event_num; try let serialized = Lmdb.Map.get log.events_map event_num in debug "read event %Ld (%d bytes)" event_num (String.length serialized); deserialize_event ~verbose serialized with Not_found -> failwith (Printf.sprintf "Event %Ld not found" event_num) (* Get the range of events in the log (first, last) *) let gulf (log : t) : (event_num * event_num) option = try Lmdb.Txn.go Lmdb.Ro log.env (fun txn -> Lmdb.Cursor.go Lmdb.Ro ~txn log.events_map (fun cursor -> let (first_num, _) = Lmdb.Cursor.first cursor in let (last_num, _) = Lmdb.Cursor.last cursor in (first_num, last_num) ) ) with Not_found -> None (* Replay all events in the log *) let replay ?(verbose=false) (log : t) (callback : event_num -> noun -> unit) : unit = debug "starting replay"; match gulf log with | None -> debug "no events to replay"; () | Some (first, last) -> debug "replaying events %Ld to %Ld" first last; Lmdb.Txn.go Lmdb.Ro log.env (fun txn -> Lmdb.Cursor.go Lmdb.Ro ~txn log.events_map (fun cursor -> let rec replay_loop () = try let (event_num, serialized) = Lmdb.Cursor.current cursor in debug "replaying event %Ld" event_num; let noun = deserialize_event ~verbose serialized in callback event_num noun; ignore (Lmdb.Cursor.next cursor); replay_loop () with Not_found -> () (* End of cursor *) in (* Start from first event *) ignore (Lmdb.Cursor.first cursor); replay_loop (); Some () ) ) |> ignore; debug "replay complete" (* Get last event number *) let last_event (log : t) : event_num = log.last_event (* Sync to disk *) let sync (log : t) : unit = Lmdb.Env.sync log.env