diff options
| author | polwex <polwex@sortug.com> | 2025-10-20 13:13:39 +0700 |
|---|---|---|
| committer | polwex <polwex@sortug.com> | 2025-10-20 13:13:39 +0700 |
| commit | d21900836f89b2bf9cd55ff1708a4619c8b89656 (patch) | |
| tree | bb3a5842ae408ffa465814c6bbf27a5002866252 /ocaml/lib/eventlog_lmdb.ml | |
neoinityes
Diffstat (limited to 'ocaml/lib/eventlog_lmdb.ml')
| -rw-r--r-- | ocaml/lib/eventlog_lmdb.ml | 229 |
1 files changed, 229 insertions, 0 deletions
diff --git a/ocaml/lib/eventlog_lmdb.ml b/ocaml/lib/eventlog_lmdb.ml new file mode 100644 index 0000000..256a662 --- /dev/null +++ b/ocaml/lib/eventlog_lmdb.ml @@ -0,0 +1,229 @@ +[@@@ocaml.warning "-69"] + +open Noun + +(* Event log using LMDB backend to match Vere's implementation *) + +type event_num = int64 + +external murmur3_hash32 : string -> int32 = "caml_murmur3_hash32" + +let debug_enabled () = + match Sys.getenv_opt "NEOVERE_EVENTLOG_DEBUG" with + | None -> false + | Some value -> + let v = String.lowercase_ascii value in + not (v = "0" || v = "false" || v = "off") + +let debug fmt = + if debug_enabled () then + Printf.ksprintf (fun msg -> Printf.printf "[eventlog_lmdb] %s\n%!" msg) fmt + else + Printf.ksprintf (fun _ -> ()) fmt + +(* LMDB environment and databases *) +type t = { + env: Lmdb.Env.t; + events_map: (int64, string, [`Uni]) Lmdb.Map.t; (* event_num -> serialized bytes *) + meta_map: (string, string, [`Uni]) Lmdb.Map.t; (* metadata key-value *) + mutable last_event: event_num; + mutable enabled: bool; +} + +(* Serialize event: 4-byte mug + jammed noun (matches Vere) *) +let serialize_event ?(verbose=false) (noun : noun) : string = + let jam_bytes = Serial.jam ~verbose noun in + (* Compute mug from jammed bytes instead of traversing the noun tree + This is much faster for large nouns (3M+ nodes) and avoids stack overflow *) + let mug = murmur3_hash32 (Bytes.to_string jam_bytes) in + let jam_len = Bytes.length jam_bytes in + let total_len = 4 + jam_len in + let result = Bytes.create total_len in + Bytes.set_int32_le result 0 mug; + Bytes.blit jam_bytes 0 result 4 jam_len; + Bytes.to_string result + +(* Deserialize event: extract mug and cue the noun *) +let deserialize_event ?(verbose=false) (data : string) : noun = + if String.length data < 4 then + failwith "Event data too short (< 4 bytes)"; + let _mug = String.get_int32_le data 0 in + let jam_data = String.sub data 4 (String.length data - 4) in + Serial.cue ~verbose (Bytes.of_string jam_data) + +(* Create or open event log *) +let create ?(enabled=true) (pier_path : string) : t = + debug "opening LMDB event log at: %s" pier_path; + + (* Create .urb/log directory if it doesn't exist *) + let urb_dir = Filename.concat pier_path ".urb" in + let log_dir = Filename.concat urb_dir "log" in + + if not (Sys.file_exists urb_dir) then + Unix.mkdir urb_dir 0o755; + if not (Sys.file_exists log_dir) then + Unix.mkdir log_dir 0o755; + + (* Open LMDB environment - match Vere's default of 1TB (0x10000000000) *) + let env = Lmdb.Env.create Lmdb.Rw + ~max_maps:2 + ~map_size:0x10000000000 (* 1TB - matches Vere's default *) + ~flags:Lmdb.Env.Flags.no_subdir + (Filename.concat log_dir "data.mdb") + in + + (* Open or create the two databases *) + let events_map = + try + Lmdb.Map.open_existing Lmdb.Map.Nodup + ~name:"EVENTS" + ~key:Lmdb.Conv.int64_le + ~value:Lmdb.Conv.string + env + with Not_found -> + Lmdb.Map.create Lmdb.Map.Nodup + ~name:"EVENTS" + ~key:Lmdb.Conv.int64_le + ~value:Lmdb.Conv.string + env + in + + let meta_map = + try + Lmdb.Map.open_existing Lmdb.Map.Nodup + ~name:"META" + ~key:Lmdb.Conv.string + ~value:Lmdb.Conv.string + env + with Not_found -> + Lmdb.Map.create Lmdb.Map.Nodup + ~name:"META" + ~key:Lmdb.Conv.string + ~value:Lmdb.Conv.string + env + in + + (* Read last event number from metadata or find highest event *) + let last_event = + try + let last_str = Lmdb.Map.get meta_map "last_event" in + Int64.of_string last_str + with Not_found -> + (* Find highest event number by iterating backwards *) + try + match Lmdb.Txn.go Lmdb.Ro env (fun txn -> + Lmdb.Cursor.go Lmdb.Ro ~txn events_map (fun cursor -> + let (event_num, _) = Lmdb.Cursor.last cursor in + debug "found last event: %Ld" event_num; + event_num + ) + ) with + | Some event_num -> event_num + | None -> 0L + with Not_found -> + 0L + in + + debug "last event number: %Ld" last_event; + + { env; events_map; meta_map; last_event; enabled } + +(* Close the event log *) +let close (log : t) : unit = + debug "closing event log"; + Lmdb.Env.sync log.env; + Lmdb.Env.close log.env + +(* Append a new event to the log *) +let append ?(verbose=false) (log : t) (noun : noun) : event_num = + if not log.enabled then begin + log.last_event <- Int64.succ log.last_event; + log.last_event + end else begin + let event_num = Int64.succ log.last_event in + debug "appending event %Ld" event_num; + + debug "serializing event %Ld..." event_num; + let serialized = serialize_event ~verbose noun in + debug "serialized event %Ld: %d bytes" event_num (String.length serialized); + + (* Write event in a transaction *) + debug "starting LMDB transaction for event %Ld" event_num; + Lmdb.Txn.go Lmdb.Rw log.env (fun txn -> + debug "writing event %Ld to EVENTS map" event_num; + Lmdb.Map.set log.events_map ~txn event_num serialized; + debug "writing metadata for event %Ld" event_num; + Lmdb.Map.set log.meta_map ~txn "last_event" (Int64.to_string event_num); + debug "transaction complete for event %Ld" event_num; + Some () + ) |> ignore; + + log.last_event <- event_num; + debug "wrote event %Ld (%d bytes)" event_num (String.length serialized); + event_num + end + +(* Read a specific event from the log *) +let read_event ?(verbose=false) (log : t) (event_num : event_num) : noun = + debug "reading event %Ld" event_num; + + try + let serialized = Lmdb.Map.get log.events_map event_num in + debug "read event %Ld (%d bytes)" event_num (String.length serialized); + deserialize_event ~verbose serialized + with Not_found -> + failwith (Printf.sprintf "Event %Ld not found" event_num) + +(* Get the range of events in the log (first, last) *) +let gulf (log : t) : (event_num * event_num) option = + try + Lmdb.Txn.go Lmdb.Ro log.env (fun txn -> + Lmdb.Cursor.go Lmdb.Ro ~txn log.events_map (fun cursor -> + let (first_num, _) = Lmdb.Cursor.first cursor in + let (last_num, _) = Lmdb.Cursor.last cursor in + (first_num, last_num) + ) + ) + with Not_found -> + None + +(* Replay all events in the log *) +let replay ?(verbose=false) (log : t) (callback : event_num -> noun -> unit) : unit = + debug "starting replay"; + + match gulf log with + | None -> + debug "no events to replay"; + () + | Some (first, last) -> + debug "replaying events %Ld to %Ld" first last; + + Lmdb.Txn.go Lmdb.Ro log.env (fun txn -> + Lmdb.Cursor.go Lmdb.Ro ~txn log.events_map (fun cursor -> + let rec replay_loop () = + try + let (event_num, serialized) = Lmdb.Cursor.current cursor in + debug "replaying event %Ld" event_num; + let noun = deserialize_event ~verbose serialized in + callback event_num noun; + ignore (Lmdb.Cursor.next cursor); + replay_loop () + with Not_found -> + () (* End of cursor *) + in + (* Start from first event *) + ignore (Lmdb.Cursor.first cursor); + replay_loop (); + Some () + ) + ) |> ignore; + + debug "replay complete" + +(* Get last event number *) +let last_event (log : t) : event_num = + log.last_event + +(* Sync to disk *) +let sync (log : t) : unit = + Lmdb.Env.sync log.env |
