summaryrefslogtreecommitdiff
path: root/ocaml/lib/eventlog_lmdb.ml
blob: 256a6621b9be90d5ebf20a4aaef46b2c104ded81 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
[@@@ocaml.warning "-69"]

open Noun

(* Event log using LMDB backend to match Vere's implementation *)

type event_num = int64

external murmur3_hash32 : string ->  int32 = "caml_murmur3_hash32"

let debug_enabled () =
  match Sys.getenv_opt "NEOVERE_EVENTLOG_DEBUG" with
  | None -> false
  | Some value ->
      let v = String.lowercase_ascii value in
      not (v = "0" || v = "false" || v = "off")

let debug fmt =
  if debug_enabled () then
    Printf.ksprintf (fun msg -> Printf.printf "[eventlog_lmdb] %s\n%!" msg) fmt
  else
    Printf.ksprintf (fun _ -> ()) fmt

(* LMDB environment and databases *)
type t = {
  env: Lmdb.Env.t;
  events_map: (int64, string, [`Uni]) Lmdb.Map.t;  (* event_num -> serialized bytes *)
  meta_map: (string, string, [`Uni]) Lmdb.Map.t;   (* metadata key-value *)
  mutable last_event: event_num;
  mutable enabled: bool;
}

(* Serialize event: 4-byte mug + jammed noun (matches Vere) *)
let serialize_event ?(verbose=false) (noun : noun) : string =
  let jam_bytes = Serial.jam ~verbose noun in
  (* Compute mug from jammed bytes instead of traversing the noun tree
     This is much faster for large nouns (3M+ nodes) and avoids stack overflow *)
  let mug = murmur3_hash32 (Bytes.to_string jam_bytes) in
  let jam_len = Bytes.length jam_bytes in
  let total_len = 4 + jam_len in
  let result = Bytes.create total_len in
  Bytes.set_int32_le result 0 mug;
  Bytes.blit jam_bytes 0 result 4 jam_len;
  Bytes.to_string result

(* Deserialize event: extract mug and cue the noun *)
let deserialize_event ?(verbose=false) (data : string) : noun =
  if String.length data < 4 then
    failwith "Event data too short (< 4 bytes)";
  let _mug = String.get_int32_le data 0 in
  let jam_data = String.sub data 4 (String.length data - 4) in
  Serial.cue ~verbose (Bytes.of_string jam_data)

(* Create or open event log *)
let create ?(enabled=true) (pier_path : string) : t =
  debug "opening LMDB event log at: %s" pier_path;

  (* Create .urb/log directory if it doesn't exist *)
  let urb_dir = Filename.concat pier_path ".urb" in
  let log_dir = Filename.concat urb_dir "log" in

  if not (Sys.file_exists urb_dir) then
    Unix.mkdir urb_dir 0o755;
  if not (Sys.file_exists log_dir) then
    Unix.mkdir log_dir 0o755;

  (* Open LMDB environment - match Vere's default of 1TB (0x10000000000) *)
  let env = Lmdb.Env.create Lmdb.Rw
    ~max_maps:2
    ~map_size:0x10000000000  (* 1TB - matches Vere's default *)
    ~flags:Lmdb.Env.Flags.no_subdir
    (Filename.concat log_dir "data.mdb")
  in

  (* Open or create the two databases *)
  let events_map =
    try
      Lmdb.Map.open_existing Lmdb.Map.Nodup
        ~name:"EVENTS"
        ~key:Lmdb.Conv.int64_le
        ~value:Lmdb.Conv.string
        env
    with Not_found ->
      Lmdb.Map.create Lmdb.Map.Nodup
        ~name:"EVENTS"
        ~key:Lmdb.Conv.int64_le
        ~value:Lmdb.Conv.string
        env
  in

  let meta_map =
    try
      Lmdb.Map.open_existing Lmdb.Map.Nodup
        ~name:"META"
        ~key:Lmdb.Conv.string
        ~value:Lmdb.Conv.string
        env
    with Not_found ->
      Lmdb.Map.create Lmdb.Map.Nodup
        ~name:"META"
        ~key:Lmdb.Conv.string
        ~value:Lmdb.Conv.string
        env
  in

  (* Read last event number from metadata or find highest event *)
  let last_event =
    try
      let last_str = Lmdb.Map.get meta_map "last_event" in
      Int64.of_string last_str
    with Not_found ->
      (* Find highest event number by iterating backwards *)
      try
        match Lmdb.Txn.go Lmdb.Ro env (fun txn ->
          Lmdb.Cursor.go Lmdb.Ro ~txn events_map (fun cursor ->
            let (event_num, _) = Lmdb.Cursor.last cursor in
            debug "found last event: %Ld" event_num;
            event_num
          )
        ) with
        | Some event_num -> event_num
        | None -> 0L
      with Not_found ->
        0L
  in

  debug "last event number: %Ld" last_event;

  { env; events_map; meta_map; last_event; enabled }

(* Close the event log *)
let close (log : t) : unit =
  debug "closing event log";
  Lmdb.Env.sync log.env;
  Lmdb.Env.close log.env

(* Append a new event to the log *)
let append ?(verbose=false) (log : t) (noun : noun) : event_num =
  if not log.enabled then begin
    log.last_event <- Int64.succ log.last_event;
    log.last_event
  end else begin
    let event_num = Int64.succ log.last_event in
    debug "appending event %Ld" event_num;

    debug "serializing event %Ld..." event_num;
    let serialized = serialize_event ~verbose noun in
    debug "serialized event %Ld: %d bytes" event_num (String.length serialized);

    (* Write event in a transaction *)
    debug "starting LMDB transaction for event %Ld" event_num;
    Lmdb.Txn.go Lmdb.Rw log.env (fun txn ->
      debug "writing event %Ld to EVENTS map" event_num;
      Lmdb.Map.set log.events_map ~txn event_num serialized;
      debug "writing metadata for event %Ld" event_num;
      Lmdb.Map.set log.meta_map ~txn "last_event" (Int64.to_string event_num);
      debug "transaction complete for event %Ld" event_num;
      Some ()
    ) |> ignore;

    log.last_event <- event_num;
    debug "wrote event %Ld (%d bytes)" event_num (String.length serialized);
    event_num
  end

(* Read a specific event from the log *)
let read_event ?(verbose=false) (log : t) (event_num : event_num) : noun =
  debug "reading event %Ld" event_num;

  try
    let serialized = Lmdb.Map.get log.events_map event_num in
    debug "read event %Ld (%d bytes)" event_num (String.length serialized);
    deserialize_event ~verbose serialized
  with Not_found ->
    failwith (Printf.sprintf "Event %Ld not found" event_num)

(* Get the range of events in the log (first, last) *)
let gulf (log : t) : (event_num * event_num) option =
  try
    Lmdb.Txn.go Lmdb.Ro log.env (fun txn ->
      Lmdb.Cursor.go Lmdb.Ro ~txn log.events_map (fun cursor ->
        let (first_num, _) = Lmdb.Cursor.first cursor in
        let (last_num, _) = Lmdb.Cursor.last cursor in
        (first_num, last_num)
      )
    )
  with Not_found ->
    None

(* Replay all events in the log *)
let replay ?(verbose=false) (log : t) (callback : event_num -> noun -> unit) : unit =
  debug "starting replay";

  match gulf log with
  | None ->
      debug "no events to replay";
      ()
  | Some (first, last) ->
      debug "replaying events %Ld to %Ld" first last;

      Lmdb.Txn.go Lmdb.Ro log.env (fun txn ->
        Lmdb.Cursor.go Lmdb.Ro ~txn log.events_map (fun cursor ->
          let rec replay_loop () =
            try
              let (event_num, serialized) = Lmdb.Cursor.current cursor in
              debug "replaying event %Ld" event_num;
              let noun = deserialize_event ~verbose serialized in
              callback event_num noun;
              ignore (Lmdb.Cursor.next cursor);
              replay_loop ()
            with Not_found ->
              ()  (* End of cursor *)
          in
          (* Start from first event *)
          ignore (Lmdb.Cursor.first cursor);
          replay_loop ();
          Some ()
        )
      ) |> ignore;

      debug "replay complete"

(* Get last event number *)
let last_event (log : t) : event_num =
  log.last_event

(* Sync to disk *)
let sync (log : t) : unit =
  Lmdb.Env.sync log.env