blob: b0c59936fd0fb37e462267a731c56e1996bcfc2c (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
|
(* Event Log - Persistent event storage
*
* This module provides an append-only event log with:
* - Synchronous append for simplicity
* - Sequential replay
* - Crash recovery via event replay
*
* Event format (matches Vere):
* - 4 bytes: mug (murmur3 hash of jammed event)
* - N bytes: jammed noun
*
* Storage:
* - Simple file-based initially (one file per event)
* - Files named: event-NNNNNNNNNNNNNNNNNNNN.jam (20-digit zero-padded)
* - Stored in <pier>/.urb/log/
* - Will migrate to LMDB later for better performance
*)
open Noun
(* Event number (0-indexed) *)
type event_num = int64
(* Event metadata *)
type event_meta = {
num: event_num;
mug: int32; (* murmur3 hash *)
size: int; (* size of jammed data *)
}
(* Event log state *)
type t = {
log_dir: string; (* .urb/log directory *)
mutable last_event: event_num; (* last committed event *)
mutable enabled: bool; (* whether logging is enabled *)
}
let debug_enabled () =
match Sys.getenv_opt "NEOVERE_EVENTLOG_DEBUG" with
| None -> false
| Some value ->
let v = String.lowercase_ascii value in
not (v = "0" || v = "false" || v = "off")
let debug fmt =
if debug_enabled () then
Printf.ksprintf (fun msg -> Printf.printf "[eventlog] %s\n%!" msg) fmt
else
Printf.ksprintf (fun _ -> ()) fmt
(* Create event log directory structure *)
let create ?(enabled=true) pier_path =
let urb_dir = Filename.concat pier_path ".urb" in
let log_dir = Filename.concat urb_dir "log" in
(* Create directories if they don't exist *)
if not (Sys.file_exists urb_dir) then begin
debug "creating directory: %s" urb_dir;
Unix.mkdir urb_dir 0o755
end;
if not (Sys.file_exists log_dir) then begin
debug "creating directory: %s" log_dir;
Unix.mkdir log_dir 0o755
end;
debug "event log initialized at: %s" log_dir;
{ log_dir; last_event = -1L; enabled }
(* Get event filename *)
let event_file log num =
let filename = Printf.sprintf "event-%020Ld.jam" num in
Filename.concat log.log_dir filename
(* Compute murmur3 hash *)
(* TODO: Use proper murmur3 implementation - for now using OCaml's Hashtbl.hash *)
let compute_mug (noun : noun) : int32 =
Int32.of_int (Hashtbl.hash noun land 0x7FFFFFFF)
(* Serialize event: 4-byte mug + jammed noun *)
let serialize_event ?(verbose=false) (noun : noun) : bytes =
let jam_bytes = Serial.jam ~verbose noun in
let mug = compute_mug noun in
let jam_len = Bytes.length jam_bytes in
(* Create output buffer: 4 bytes (mug) + jam data *)
let total_len = 4 + jam_len in
let result = Bytes.create total_len in
(* Write mug (little-endian) *)
Bytes.set_int32_le result 0 mug;
(* Copy jammed data *)
Bytes.blit jam_bytes 0 result 4 jam_len;
result
(* Deserialize event: parse mug + unjam noun *)
let deserialize_event ?(verbose=false) (data : bytes) : event_meta * noun =
if Bytes.length data < 4 then
failwith "Event data too short (missing mug)";
(* Read mug (little-endian) *)
let mug = Bytes.get_int32_le data 0 in
(* Extract jam data *)
let jam_len = Bytes.length data - 4 in
let jam_bytes = Bytes.sub data 4 jam_len in
(* Cue the noun *)
let noun = Serial.cue ~verbose jam_bytes in
(* Verify mug *)
let computed_mug = compute_mug noun in
if computed_mug <> mug then
failwith (Printf.sprintf "Mug mismatch: expected %ld, got %ld" mug computed_mug);
let meta = { num = -1L; mug; size = jam_len } in
(meta, noun)
(* Append event to log *)
let append ?(verbose=false) log (noun : noun) : event_num =
if not log.enabled then begin
log.last_event <- Int64.succ log.last_event;
log.last_event
end else begin
let event_num = Int64.succ log.last_event in
let serialized = serialize_event ~verbose noun in
let file_path = event_file log event_num in
debug "appending event %Ld to %s (%d bytes)" event_num file_path (Bytes.length serialized);
(* Write to file *)
let oc = open_out_bin file_path in
output_bytes oc serialized;
close_out oc;
log.last_event <- event_num;
event_num
end
(* Read single event from log *)
let read_event ?(verbose=false) log event_num : noun =
let file_path = event_file log event_num in
debug "reading event %Ld from %s" event_num file_path;
let ic = open_in_bin file_path in
let len = in_channel_length ic in
let data = really_input_string ic len in
close_in ic;
let (_meta, noun) = deserialize_event ~verbose (Bytes.of_string data) in
noun
(* Replay all events sequentially *)
let replay ?(verbose=false) log (callback : event_num -> noun -> unit) : unit =
debug "starting replay...";
(* Find all event files by trying to read sequentially *)
let rec replay_from num =
let file_path = event_file log num in
if Sys.file_exists file_path then begin
debug "replaying event %Ld" num;
let noun = read_event ~verbose log num in
callback num noun;
replay_from (Int64.succ num)
end else begin
debug "replay complete at event %Ld" num;
(* Update last_event to reflect what we found *)
if num > 0L then
log.last_event <- Int64.pred num
else
log.last_event <- -1L
end
in
replay_from 0L
(* Get the number of events in the log *)
let event_count log =
Int64.to_int (Int64.succ log.last_event)
(* Get last event number *)
let last_event_num log =
log.last_event
(* Disable logging (for testing or special modes) *)
let disable log =
log.enabled <- false;
debug "event logging disabled"
(* Enable logging *)
let enable log =
log.enabled <- true;
debug "event logging enabled"
|