1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
|
(* Runtime - Eio-based Urbit Runtime
*
* This is THE CORE of the multi-core Urbit runtime!
*
* Architecture:
* - Eio.Switch for structured concurrency
* - Eio.Stream for lock-free event queue
* - Fiber per I/O driver (behn, ames, http, etc)
* - Concurrent event processing
*
* This is fundamentally different from C Vere:
* - C Vere: single-threaded, blocking I/O, sequential
* - OCaml Overe: multi-fiber, async I/O, concurrent
*)
(* Runtime configuration *)
type config = {
pier_path: string; (* Path to pier directory *)
event_log_path: string; (* Event log directory *)
snapshot_path: string; (* Snapshot file path *)
}
(* Runtime state *)
type t = {
config: config;
state: State.t; (* Ship state *)
event_log: Eventlog.t; (* Event log *)
effect_queue: Effects.queue; (* Effects to process *)
(* Statistics *)
mutable events_processed: int64;
mutable effects_executed: int64;
}
(* Create default config *)
let default_config ?(pier_path="./pier") () = {
pier_path;
event_log_path = pier_path ^ "/log";
snapshot_path = pier_path ^ "/snapshot.jam";
}
(* Create runtime *)
let create ~sw ~fs config =
let state = State.create () in
let event_log = Eventlog.create ~sw ~fs config.event_log_path in
let effect_queue = Effects.create_queue () in
{
config;
state;
event_log;
effect_queue;
events_processed = 0L;
effects_executed = 0L;
}
(* Process a single event through Arvo
*
* Steps:
* 1. Run Nock to apply event to kernel
* 2. Update kernel state
* 3. Parse effects from result
* 4. Enqueue effects for drivers
*)
let process_event runtime event_noun =
(* In real implementation, this would:
* let result = Nock.nock_on (State.get_arvo runtime.state) poke_formula in
* let new_state, effects = parse_result result in
* State.set_arvo runtime.state new_state;
*
* For now: simplified - just update event counter
*)
let _effects = State.poke runtime.state event_noun in
runtime.events_processed <- Int64.succ runtime.events_processed;
(* Parse effects and enqueue them *)
let effects = Effects.parse_effects event_noun in
List.iter (Effects.enqueue runtime.effect_queue) effects;
(* Append to event log *)
let _event_num = Eventlog.append runtime.event_log ~sw:(fun () -> ()) event_noun in
()
(* Event processing fiber - processes events from the queue
*
* This runs as a separate fiber, processing events as they arrive
*)
let event_processor runtime ~sw:_ stream =
Printf.printf "[Runtime] Event processor fiber started\n%!";
try
while true do
(* Read next event from stream *)
let event = Eio.Stream.take stream in
(* Process the event *)
process_event runtime event;
(* Periodic logging *)
if Int64.rem runtime.events_processed 100L = 0L then
Printf.printf "[Runtime] Processed %Ld events\n%!" runtime.events_processed
done
with
| End_of_file ->
Printf.printf "[Runtime] Event processor shutting down\n%!"
(* Effect executor fiber - executes effects as they're produced
*
* This runs as a separate fiber, executing effects concurrently
* with event processing
*)
let effect_executor runtime ~sw:_ ~env =
Printf.printf "[Runtime] Effect executor fiber started\n%!";
let rec loop () =
match Effects.try_dequeue runtime.effect_queue with
| None ->
(* No effects, sleep briefly *)
Eio.Time.sleep (Eio.Stdenv.clock env) 0.001;
loop ()
| Some eff ->
(* Execute the effect *)
(match eff with
| Effects.Log msg ->
Printf.printf "[Effect] Log: %s\n%!" msg
| Effects.SetTimer { id; time } ->
Printf.printf "[Effect] SetTimer: id=%Ld time=%f\n%!" id time
| Effects.CancelTimer { id } ->
Printf.printf "[Effect] CancelTimer: id=%Ld\n%!" id
| _ ->
Printf.printf "[Effect] Other effect\n%!"
);
runtime.effects_executed <- Int64.succ runtime.effects_executed;
loop ()
in
try
loop ()
with
| End_of_file ->
Printf.printf "[Runtime] Effect executor shutting down\n%!"
(* Main runtime loop with Eio
*
* This is THE KEY FUNCTION - the heart of the runtime!
*
* Creates:
* - Event stream (lock-free with Eio.Stream)
* - Event processor fiber
* - Effect executor fiber
* - I/O driver fibers (future)
*)
let run ~env config =
Printf.printf "š Starting Overe Runtime (Eio-based)\n%!";
Printf.printf " Pier: %s\n%!" config.pier_path;
Printf.printf " OCaml %s on %d cores\n%!"
Sys.ocaml_version (Domain.recommended_domain_count ());
Eio.Switch.run @@ fun sw ->
let fs = Eio.Stdenv.fs env in
(* Create runtime *)
let runtime = create ~sw ~fs config in
(* Create event stream (lock-free!) *)
let event_stream = Eio.Stream.create 1000 in
Printf.printf "ā Runtime created\n%!";
(* Load snapshot if it exists *)
(match State.load_snapshot runtime.state ~fs config.snapshot_path with
| Ok eve ->
Printf.printf "ā Loaded snapshot at event %Ld\n%!" eve
| Error msg ->
Printf.printf "ā No snapshot: %s (starting fresh)\n%!" msg;
(* Boot with empty kernel *)
State.boot runtime.state (Noun.atom 0)
);
(* Replay events from log *)
Printf.printf "Replaying events from log...\n%!";
Eventlog.replay runtime.event_log ~sw (fun event_num event ->
Printf.printf " Replayed event %Ld\n%!" event_num;
process_event runtime event
);
Printf.printf "ā Runtime ready! State: %s\n%!" (State.summary runtime.state);
Printf.printf "\n";
(* Spawn concurrent fibers using Eio.Fiber.both *)
Eio.Fiber.both
(* Event processor fiber *)
(fun () -> event_processor runtime ~sw event_stream)
(* Effect executor fiber *)
(fun () -> effect_executor runtime ~sw ~env);
(* When we get here, runtime is shutting down *)
Printf.printf "\nš Runtime shutting down...\n%!";
Printf.printf " Events processed: %Ld\n%!" runtime.events_processed;
Printf.printf " Effects executed: %Ld\n%!" runtime.effects_executed;
(* Save final snapshot *)
Printf.printf "Saving final snapshot...\n%!";
State.save_snapshot runtime.state ~fs config.snapshot_path;
Printf.printf "ā Snapshot saved\n%!"
(* Simplified run for testing - processes events synchronously *)
let run_simple ~env config events =
Printf.printf "Running simple runtime test...\n%!";
Eio.Switch.run @@ fun sw ->
let fs = Eio.Stdenv.fs env in
let runtime = create ~sw ~fs config in
(* Process each event *)
List.iter (process_event runtime) events;
Printf.printf "ā Processed %Ld events\n%!" runtime.events_processed;
runtime
(* Statistics record *)
type stats = {
events_processed: int64;
effects_executed: int64;
event_count: int;
state_summary: string;
}
(* Get runtime statistics *)
let get_stats (runtime : t) : stats = {
events_processed = runtime.events_processed;
effects_executed = runtime.effects_executed;
event_count = Eventlog.event_count runtime.event_log;
state_summary = State.summary runtime.state;
}
|