blob: 86e422b08afb40182adcd42befb1ebb77a712f22 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
(* Event Log - Eio-based persistent event storage
*
* This module provides an append-only event log with:
* - Async append using Eio.Path
* - Parallel replay using Eio.Fiber
* - Crash recovery via event replay
*
* Event format:
* - 4 bytes: mug (murmur3 hash)
* - N bytes: jammed noun
*
* Storage:
* - Simple file-based initially (one file per event)
* - Will migrate to LMDB later for better performance
*)
(* Event number (0-indexed) *)
type event_num = int64
(* Event metadata *)
type event_meta = {
num: event_num;
mug: int32; (* murmur3 hash *)
size: int; (* size of jammed data *)
}
(* Event log state *)
type t = {
dir: Eio.Fs.dir_ty Eio.Path.t; (* base directory *)
mutable last_event: event_num; (* last committed event *)
}
(* Create event log directory if it doesn't exist *)
let create ~sw:_ ~fs path =
let dir = Eio.Path.(fs / path) in
(* Create directory - Eio will handle if it exists *)
(try Eio.Path.mkdir dir ~perm:0o755 with _ -> ());
{ dir; last_event = -1L }
(* Get event filename *)
let event_file log num =
let filename = Printf.sprintf "event-%020Ld.jam" num in
Eio.Path.(log.dir / filename)
(* Compute murmur3 hash (simplified - using OCaml's Hashtbl.hash for now) *)
let compute_mug (noun : Noun.noun) : int32 =
Int32.of_int (Hashtbl.hash noun)
(* Serialize event: 4-byte mug + jammed noun *)
let serialize_event (noun : Noun.noun) : bytes =
let mug = compute_mug noun in
let jam_bytes = Serial.jam noun in
let jam_len = Bytes.length jam_bytes in
(* Create output buffer: 4 bytes (mug) + jam data *)
let total_len = 4 + jam_len in
let result = Bytes.create total_len in
(* Write mug (little-endian) *)
Bytes.set_uint8 result 0 (Int32.to_int mug land 0xff);
Bytes.set_uint8 result 1 (Int32.to_int (Int32.shift_right mug 8) land 0xff);
Bytes.set_uint8 result 2 (Int32.to_int (Int32.shift_right mug 16) land 0xff);
Bytes.set_uint8 result 3 (Int32.to_int (Int32.shift_right mug 24) land 0xff);
(* Copy jammed data *)
Bytes.blit jam_bytes 0 result 4 jam_len;
result
(* Deserialize event: parse mug + unjam noun *)
let deserialize_event (data : bytes) : event_meta * Noun.noun =
if Bytes.length data < 4 then
failwith "Event data too short (missing mug)";
(* Read mug (little-endian) *)
let b0 = Bytes.get_uint8 data 0 in
let b1 = Bytes.get_uint8 data 1 in
let b2 = Bytes.get_uint8 data 2 in
let b3 = Bytes.get_uint8 data 3 in
let mug = Int32.of_int (b0 lor (b1 lsl 8) lor (b2 lsl 16) lor (b3 lsl 24)) in
(* Extract jam data *)
let jam_len = Bytes.length data - 4 in
let jam_bytes = Bytes.sub data 4 jam_len in
(* Cue the noun *)
let noun = Serial.cue jam_bytes in
(* Verify mug *)
let computed_mug = compute_mug noun in
if computed_mug <> mug then
failwith (Printf.sprintf "Mug mismatch: expected %ld, got %ld" mug computed_mug);
let meta = { num = -1L; mug; size = jam_len } in
(meta, noun)
(* Append event to log - async using Eio.Path *)
let append log ~sw:_ (noun : Noun.noun) : event_num =
let event_num = Int64.succ log.last_event in
let serialized = serialize_event noun in
let file_path = event_file log event_num in
(* Write to file asynchronously *)
Eio.Path.save ~create:(`Or_truncate 0o644) file_path (Bytes.to_string serialized);
log.last_event <- event_num;
event_num
(* Read single event from log *)
let read_event log event_num : Noun.noun =
let file_path = event_file log event_num in
let data = Eio.Path.load file_path in
let (_meta, noun) = deserialize_event (Bytes.of_string data) in
noun
(* Replay all events in parallel using Eio.Fiber.fork *)
let replay log ~sw:_ (callback : event_num -> Noun.noun -> unit) : unit =
(* Find all event files *)
let rec find_events num acc =
let file_path = event_file log num in
try
let _ = Eio.Path.load file_path in
find_events (Int64.succ num) (num :: acc)
with _ ->
List.rev acc
in
let event_nums = find_events 0L [] in
(* Process events in parallel batches (fiber per event) *)
(* For now, process sequentially to maintain order *)
List.iter (fun num ->
let noun = read_event log num in
callback num noun
) event_nums;
(* Update last_event based on what we found *)
match List.rev event_nums with
| [] -> log.last_event <- -1L
| last :: _ -> log.last_event <- last
(* Get the number of events in the log *)
let event_count log =
Int64.to_int (Int64.succ log.last_event)
(* Get last event number *)
let last_event_num log =
log.last_event
|