summaryrefslogtreecommitdiff
path: root/ocaml/lib/eventlog.ml
blob: b0c59936fd0fb37e462267a731c56e1996bcfc2c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
(* Event Log - Persistent event storage
 *
 * This module provides an append-only event log with:
 * - Synchronous append for simplicity
 * - Sequential replay
 * - Crash recovery via event replay
 *
 * Event format (matches Vere):
 * - 4 bytes: mug (murmur3 hash of jammed event)
 * - N bytes: jammed noun
 *
 * Storage:
 * - Simple file-based initially (one file per event)
 * - Files named: event-NNNNNNNNNNNNNNNNNNNN.jam (20-digit zero-padded)
 * - Stored in <pier>/.urb/log/
 * - Will migrate to LMDB later for better performance
 *)

open Noun

(* Event number (0-indexed) *)
type event_num = int64

(* Event metadata *)
type event_meta = {
  num: event_num;
  mug: int32;      (* murmur3 hash *)
  size: int;       (* size of jammed data *)
}

(* Event log state *)
type t = {
  log_dir: string;                  (* .urb/log directory *)
  mutable last_event: event_num;    (* last committed event *)
  mutable enabled: bool;            (* whether logging is enabled *)
}

let debug_enabled () =
  match Sys.getenv_opt "NEOVERE_EVENTLOG_DEBUG" with
  | None -> false
  | Some value ->
      let v = String.lowercase_ascii value in
      not (v = "0" || v = "false" || v = "off")

let debug fmt =
  if debug_enabled () then
    Printf.ksprintf (fun msg -> Printf.printf "[eventlog] %s\n%!" msg) fmt
  else
    Printf.ksprintf (fun _ -> ()) fmt

(* Create event log directory structure *)
let create ?(enabled=true) pier_path =
  let urb_dir = Filename.concat pier_path ".urb" in
  let log_dir = Filename.concat urb_dir "log" in

  (* Create directories if they don't exist *)
  if not (Sys.file_exists urb_dir) then begin
    debug "creating directory: %s" urb_dir;
    Unix.mkdir urb_dir 0o755
  end;

  if not (Sys.file_exists log_dir) then begin
    debug "creating directory: %s" log_dir;
    Unix.mkdir log_dir 0o755
  end;

  debug "event log initialized at: %s" log_dir;
  { log_dir; last_event = -1L; enabled }

(* Get event filename *)
let event_file log num =
  let filename = Printf.sprintf "event-%020Ld.jam" num in
  Filename.concat log.log_dir filename

(* Compute murmur3 hash *)
(* TODO: Use proper murmur3 implementation - for now using OCaml's Hashtbl.hash *)
let compute_mug (noun : noun) : int32 =
  Int32.of_int (Hashtbl.hash noun land 0x7FFFFFFF)

(* Serialize event: 4-byte mug + jammed noun *)
let serialize_event ?(verbose=false) (noun : noun) : bytes =
  let jam_bytes = Serial.jam ~verbose noun in
  let mug = compute_mug noun in
  let jam_len = Bytes.length jam_bytes in

  (* Create output buffer: 4 bytes (mug) + jam data *)
  let total_len = 4 + jam_len in
  let result = Bytes.create total_len in

  (* Write mug (little-endian) *)
  Bytes.set_int32_le result 0 mug;

  (* Copy jammed data *)
  Bytes.blit jam_bytes 0 result 4 jam_len;

  result

(* Deserialize event: parse mug + unjam noun *)
let deserialize_event ?(verbose=false) (data : bytes) : event_meta * noun =
  if Bytes.length data < 4 then
    failwith "Event data too short (missing mug)";

  (* Read mug (little-endian) *)
  let mug = Bytes.get_int32_le data 0 in

  (* Extract jam data *)
  let jam_len = Bytes.length data - 4 in
  let jam_bytes = Bytes.sub data 4 jam_len in

  (* Cue the noun *)
  let noun = Serial.cue ~verbose jam_bytes in

  (* Verify mug *)
  let computed_mug = compute_mug noun in
  if computed_mug <> mug then
    failwith (Printf.sprintf "Mug mismatch: expected %ld, got %ld" mug computed_mug);

  let meta = { num = -1L; mug; size = jam_len } in
  (meta, noun)

(* Append event to log *)
let append ?(verbose=false) log (noun : noun) : event_num =
  if not log.enabled then begin
    log.last_event <- Int64.succ log.last_event;
    log.last_event
  end else begin
    let event_num = Int64.succ log.last_event in
    let serialized = serialize_event ~verbose noun in
    let file_path = event_file log event_num in

    debug "appending event %Ld to %s (%d bytes)" event_num file_path (Bytes.length serialized);

    (* Write to file *)
    let oc = open_out_bin file_path in
    output_bytes oc serialized;
    close_out oc;

    log.last_event <- event_num;
    event_num
  end

(* Read single event from log *)
let read_event ?(verbose=false) log event_num : noun =
  let file_path = event_file log event_num in
  debug "reading event %Ld from %s" event_num file_path;

  let ic = open_in_bin file_path in
  let len = in_channel_length ic in
  let data = really_input_string ic len in
  close_in ic;

  let (_meta, noun) = deserialize_event ~verbose (Bytes.of_string data) in
  noun

(* Replay all events sequentially *)
let replay ?(verbose=false) log (callback : event_num -> noun -> unit) : unit =
  debug "starting replay...";

  (* Find all event files by trying to read sequentially *)
  let rec replay_from num =
    let file_path = event_file log num in
    if Sys.file_exists file_path then begin
      debug "replaying event %Ld" num;
      let noun = read_event ~verbose log num in
      callback num noun;
      replay_from (Int64.succ num)
    end else begin
      debug "replay complete at event %Ld" num;
      (* Update last_event to reflect what we found *)
      if num > 0L then
        log.last_event <- Int64.pred num
      else
        log.last_event <- -1L
    end
  in

  replay_from 0L

(* Get the number of events in the log *)
let event_count log =
  Int64.to_int (Int64.succ log.last_event)

(* Get last event number *)
let last_event_num log =
  log.last_event

(* Disable logging (for testing or special modes) *)
let disable log =
  log.enabled <- false;
  debug "event logging disabled"

(* Enable logging *)
let enable log =
  log.enabled <- true;
  debug "event logging enabled"