summaryrefslogtreecommitdiff
path: root/ocaml/lib/eventlog_lmdb.ml
diff options
context:
space:
mode:
Diffstat (limited to 'ocaml/lib/eventlog_lmdb.ml')
-rw-r--r--ocaml/lib/eventlog_lmdb.ml229
1 files changed, 229 insertions, 0 deletions
diff --git a/ocaml/lib/eventlog_lmdb.ml b/ocaml/lib/eventlog_lmdb.ml
new file mode 100644
index 0000000..256a662
--- /dev/null
+++ b/ocaml/lib/eventlog_lmdb.ml
@@ -0,0 +1,229 @@
+[@@@ocaml.warning "-69"]
+
+open Noun
+
+(* Event log using LMDB backend to match Vere's implementation *)
+
+type event_num = int64
+
+external murmur3_hash32 : string -> int32 = "caml_murmur3_hash32"
+
+let debug_enabled () =
+ match Sys.getenv_opt "NEOVERE_EVENTLOG_DEBUG" with
+ | None -> false
+ | Some value ->
+ let v = String.lowercase_ascii value in
+ not (v = "0" || v = "false" || v = "off")
+
+let debug fmt =
+ if debug_enabled () then
+ Printf.ksprintf (fun msg -> Printf.printf "[eventlog_lmdb] %s\n%!" msg) fmt
+ else
+ Printf.ksprintf (fun _ -> ()) fmt
+
+(* LMDB environment and databases *)
+type t = {
+ env: Lmdb.Env.t;
+ events_map: (int64, string, [`Uni]) Lmdb.Map.t; (* event_num -> serialized bytes *)
+ meta_map: (string, string, [`Uni]) Lmdb.Map.t; (* metadata key-value *)
+ mutable last_event: event_num;
+ mutable enabled: bool;
+}
+
+(* Serialize event: 4-byte mug + jammed noun (matches Vere) *)
+let serialize_event ?(verbose=false) (noun : noun) : string =
+ let jam_bytes = Serial.jam ~verbose noun in
+ (* Compute mug from jammed bytes instead of traversing the noun tree
+ This is much faster for large nouns (3M+ nodes) and avoids stack overflow *)
+ let mug = murmur3_hash32 (Bytes.to_string jam_bytes) in
+ let jam_len = Bytes.length jam_bytes in
+ let total_len = 4 + jam_len in
+ let result = Bytes.create total_len in
+ Bytes.set_int32_le result 0 mug;
+ Bytes.blit jam_bytes 0 result 4 jam_len;
+ Bytes.to_string result
+
+(* Deserialize event: extract mug and cue the noun *)
+let deserialize_event ?(verbose=false) (data : string) : noun =
+ if String.length data < 4 then
+ failwith "Event data too short (< 4 bytes)";
+ let _mug = String.get_int32_le data 0 in
+ let jam_data = String.sub data 4 (String.length data - 4) in
+ Serial.cue ~verbose (Bytes.of_string jam_data)
+
+(* Create or open event log *)
+let create ?(enabled=true) (pier_path : string) : t =
+ debug "opening LMDB event log at: %s" pier_path;
+
+ (* Create .urb/log directory if it doesn't exist *)
+ let urb_dir = Filename.concat pier_path ".urb" in
+ let log_dir = Filename.concat urb_dir "log" in
+
+ if not (Sys.file_exists urb_dir) then
+ Unix.mkdir urb_dir 0o755;
+ if not (Sys.file_exists log_dir) then
+ Unix.mkdir log_dir 0o755;
+
+ (* Open LMDB environment - match Vere's default of 1TB (0x10000000000) *)
+ let env = Lmdb.Env.create Lmdb.Rw
+ ~max_maps:2
+ ~map_size:0x10000000000 (* 1TB - matches Vere's default *)
+ ~flags:Lmdb.Env.Flags.no_subdir
+ (Filename.concat log_dir "data.mdb")
+ in
+
+ (* Open or create the two databases *)
+ let events_map =
+ try
+ Lmdb.Map.open_existing Lmdb.Map.Nodup
+ ~name:"EVENTS"
+ ~key:Lmdb.Conv.int64_le
+ ~value:Lmdb.Conv.string
+ env
+ with Not_found ->
+ Lmdb.Map.create Lmdb.Map.Nodup
+ ~name:"EVENTS"
+ ~key:Lmdb.Conv.int64_le
+ ~value:Lmdb.Conv.string
+ env
+ in
+
+ let meta_map =
+ try
+ Lmdb.Map.open_existing Lmdb.Map.Nodup
+ ~name:"META"
+ ~key:Lmdb.Conv.string
+ ~value:Lmdb.Conv.string
+ env
+ with Not_found ->
+ Lmdb.Map.create Lmdb.Map.Nodup
+ ~name:"META"
+ ~key:Lmdb.Conv.string
+ ~value:Lmdb.Conv.string
+ env
+ in
+
+ (* Read last event number from metadata or find highest event *)
+ let last_event =
+ try
+ let last_str = Lmdb.Map.get meta_map "last_event" in
+ Int64.of_string last_str
+ with Not_found ->
+ (* Find highest event number by iterating backwards *)
+ try
+ match Lmdb.Txn.go Lmdb.Ro env (fun txn ->
+ Lmdb.Cursor.go Lmdb.Ro ~txn events_map (fun cursor ->
+ let (event_num, _) = Lmdb.Cursor.last cursor in
+ debug "found last event: %Ld" event_num;
+ event_num
+ )
+ ) with
+ | Some event_num -> event_num
+ | None -> 0L
+ with Not_found ->
+ 0L
+ in
+
+ debug "last event number: %Ld" last_event;
+
+ { env; events_map; meta_map; last_event; enabled }
+
+(* Close the event log *)
+let close (log : t) : unit =
+ debug "closing event log";
+ Lmdb.Env.sync log.env;
+ Lmdb.Env.close log.env
+
+(* Append a new event to the log *)
+let append ?(verbose=false) (log : t) (noun : noun) : event_num =
+ if not log.enabled then begin
+ log.last_event <- Int64.succ log.last_event;
+ log.last_event
+ end else begin
+ let event_num = Int64.succ log.last_event in
+ debug "appending event %Ld" event_num;
+
+ debug "serializing event %Ld..." event_num;
+ let serialized = serialize_event ~verbose noun in
+ debug "serialized event %Ld: %d bytes" event_num (String.length serialized);
+
+ (* Write event in a transaction *)
+ debug "starting LMDB transaction for event %Ld" event_num;
+ Lmdb.Txn.go Lmdb.Rw log.env (fun txn ->
+ debug "writing event %Ld to EVENTS map" event_num;
+ Lmdb.Map.set log.events_map ~txn event_num serialized;
+ debug "writing metadata for event %Ld" event_num;
+ Lmdb.Map.set log.meta_map ~txn "last_event" (Int64.to_string event_num);
+ debug "transaction complete for event %Ld" event_num;
+ Some ()
+ ) |> ignore;
+
+ log.last_event <- event_num;
+ debug "wrote event %Ld (%d bytes)" event_num (String.length serialized);
+ event_num
+ end
+
+(* Read a specific event from the log *)
+let read_event ?(verbose=false) (log : t) (event_num : event_num) : noun =
+ debug "reading event %Ld" event_num;
+
+ try
+ let serialized = Lmdb.Map.get log.events_map event_num in
+ debug "read event %Ld (%d bytes)" event_num (String.length serialized);
+ deserialize_event ~verbose serialized
+ with Not_found ->
+ failwith (Printf.sprintf "Event %Ld not found" event_num)
+
+(* Get the range of events in the log (first, last) *)
+let gulf (log : t) : (event_num * event_num) option =
+ try
+ Lmdb.Txn.go Lmdb.Ro log.env (fun txn ->
+ Lmdb.Cursor.go Lmdb.Ro ~txn log.events_map (fun cursor ->
+ let (first_num, _) = Lmdb.Cursor.first cursor in
+ let (last_num, _) = Lmdb.Cursor.last cursor in
+ (first_num, last_num)
+ )
+ )
+ with Not_found ->
+ None
+
+(* Replay all events in the log *)
+let replay ?(verbose=false) (log : t) (callback : event_num -> noun -> unit) : unit =
+ debug "starting replay";
+
+ match gulf log with
+ | None ->
+ debug "no events to replay";
+ ()
+ | Some (first, last) ->
+ debug "replaying events %Ld to %Ld" first last;
+
+ Lmdb.Txn.go Lmdb.Ro log.env (fun txn ->
+ Lmdb.Cursor.go Lmdb.Ro ~txn log.events_map (fun cursor ->
+ let rec replay_loop () =
+ try
+ let (event_num, serialized) = Lmdb.Cursor.current cursor in
+ debug "replaying event %Ld" event_num;
+ let noun = deserialize_event ~verbose serialized in
+ callback event_num noun;
+ ignore (Lmdb.Cursor.next cursor);
+ replay_loop ()
+ with Not_found ->
+ () (* End of cursor *)
+ in
+ (* Start from first event *)
+ ignore (Lmdb.Cursor.first cursor);
+ replay_loop ();
+ Some ()
+ )
+ ) |> ignore;
+
+ debug "replay complete"
+
+(* Get last event number *)
+let last_event (log : t) : event_num =
+ log.last_event
+
+(* Sync to disk *)
+let sync (log : t) : unit =
+ Lmdb.Env.sync log.env