blob: cb247684c21b1ab91fd6fa15688c4de77e65e5e4 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
|
(* Ames - UDP Networking Driver with Eio
*
* This is the networking backbone for ship-to-ship communication.
* Uses Eio.Net for async UDP I/O - can handle thousands of concurrent ships!
*
* Key innovation vs C Vere:
* - C Vere: Blocking UDP, single-threaded packet processing
* - Overe: Async UDP with Eio, concurrent packet handling across fibers
*)
(* Ames configuration *)
type config = {
port: int; (* UDP port to bind to *)
our_ship: string; (* Our ship name *)
galaxy_table: (string * string * int) list; (* (ship, ip, port) *)
}
(* Ames driver state *)
type 'a t = {
config: config;
socket: 'a Eio.Net.datagram_socket;
mutable stats: stats;
}
and stats = {
mutable packets_sent: int64;
mutable packets_recv: int64;
mutable bytes_sent: int64;
mutable bytes_recv: int64;
}
(* Ames packet header - simplified for now *)
type packet_header = {
version: int; (* Protocol version *)
sender: string; (* Sender ship *)
receiver: string; (* Receiver ship *)
sequence: int64; (* Packet sequence number *)
}
(* Ames packet *)
type packet = {
header: packet_header;
payload: bytes;
}
(* Create Ames driver *)
let create ~env ~sw config =
Printf.printf "[Ames] Starting on port %d for ship %s\n%!"
config.port config.our_ship;
(* Bind UDP socket *)
let net = Eio.Stdenv.net env in
let addr = `Udp (Eio.Net.Ipaddr.V4.any, config.port) in
let socket = Eio.Net.datagram_socket ~sw net addr in
Printf.printf "[Ames] Bound to UDP port %d\n%!" config.port;
{
config;
socket;
stats = {
packets_sent = 0L;
packets_recv = 0L;
bytes_sent = 0L;
bytes_recv = 0L;
};
}
(* Parse packet header - simplified *)
let parse_header bytes =
if Bytes.length bytes < 16 then
Error "Packet too short"
else
let version = int_of_char (Bytes.get bytes 0) in
(* For now, just extract version and create dummy header *)
Ok {
version;
sender = "~zod";
receiver = "~zod";
sequence = 0L;
}
(* Encode packet header - simplified *)
let encode_header header =
let buf = Bytes.create 16 in
Bytes.set buf 0 (char_of_int header.version);
(* TODO: Encode ship addresses and sequence *)
buf
(* Send packet *)
let send_packet ames dest_addr packet =
let header_bytes = encode_header packet.header in
let total_len = Bytes.length header_bytes + Bytes.length packet.payload in
let wire_packet = Bytes.create total_len in
Bytes.blit header_bytes 0 wire_packet 0 (Bytes.length header_bytes);
Bytes.blit packet.payload 0 wire_packet (Bytes.length header_bytes) (Bytes.length packet.payload);
(* Async send *)
Eio.Net.send ames.socket ~dst:dest_addr [Cstruct.of_bytes wire_packet];
(* Update stats *)
ames.stats.packets_sent <- Int64.succ ames.stats.packets_sent;
ames.stats.bytes_sent <- Int64.add ames.stats.bytes_sent (Int64.of_int total_len);
Printf.printf "[Ames] Sent %d bytes to %s\n%!" total_len
(match dest_addr with
| `Udp (ip, port) -> Printf.sprintf "%s:%d" (Format.asprintf "%a" Eio.Net.Ipaddr.pp ip) port
| _ -> "unknown")
(* Receive fiber - continuously receives UDP packets *)
let receive_fiber ames ~sw:_ ~event_stream =
Printf.printf "[Ames] Receive fiber started\n%!";
let buf = Cstruct.create 4096 in (* 4KB buffer per packet *)
let rec loop () =
try
(* Async receive - blocks this fiber but not others! *)
let addr, recv_len = Eio.Net.recv ames.socket buf in
let packet_bytes = Cstruct.to_bytes (Cstruct.sub buf 0 recv_len) in
(* Update stats *)
ames.stats.packets_recv <- Int64.succ ames.stats.packets_recv;
ames.stats.bytes_recv <- Int64.add ames.stats.bytes_recv (Int64.of_int recv_len);
(* Parse packet *)
(match parse_header packet_bytes with
| Ok header ->
Printf.printf "[Ames] Received %d bytes from %s (v%d)\n%!"
recv_len
(match addr with
| `Udp (ip, port) -> Printf.sprintf "%s:%d" (Format.asprintf "%a" Eio.Net.Ipaddr.pp ip) port
| _ -> "unknown")
header.version;
(* Create event for runtime *)
let ovum = Nock_lib.Effects.ames_packet
~from:header.sender
~data:packet_bytes
in
(* Send to runtime event queue *)
Eio.Stream.add event_stream ovum
| Error err ->
Printf.printf "[Ames] Failed to parse packet: %s\n%!" err
);
(* Loop forever *)
loop ()
with
| End_of_file -> Printf.printf "[Ames] Receive fiber closed\n%!"
| Eio.Cancel.Cancelled _ -> Printf.printf "[Ames] Receive fiber cancelled\n%!"
in
loop ()
(* Get statistics *)
let get_stats ames = ames.stats
(* Run Ames driver - spawns receive fiber *)
let run ames ~sw ~event_stream =
Printf.printf "[Ames] Running Ames driver...\n%!";
(* Spawn receive fiber *)
Eio.Fiber.fork ~sw (fun () ->
receive_fiber ames ~sw ~event_stream
);
Printf.printf "[Ames] Ames driver running!\n%!"
|