summaryrefslogtreecommitdiff
path: root/ocaml/lib/io/clay.ml
blob: 42cc9f060376b0d8724c0dc4b1384b1e3f484f02 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
(* Clay - Filesystem Driver with Eio
 *
 * This is the filesystem driver for Urbit's Clay vane.
 * Uses Eio.Path for async file I/O - MUCH faster than C Vere's blocking I/O!
 *
 * Key innovations vs C Vere:
 * - C Vere: Blocking file I/O, sequential operations, slow directory scans
 * - Overe: Async I/O with Eio.Path, parallel file operations, MASSIVE speedup!
 *)

(* Clay configuration *)
type config = {
  pier_path: string;  (* Base path for pier *)
}

(* File operation result *)
type 'a file_result =
  | Success of 'a
  | Error of string

(* Clay driver state *)
type t = {
  config: config;
  mutable stats: stats;
}

and stats = {
  mutable files_read: int64;
  mutable files_written: int64;
  mutable bytes_read: int64;
  mutable bytes_written: int64;
  mutable dir_scans: int64;
}

(* Create Clay driver *)
let create config = {
  config;
  stats = {
    files_read = 0L;
    files_written = 0L;
    bytes_read = 0L;
    bytes_written = 0L;
    dir_scans = 0L;
  };
}

(* Read file asynchronously *)
let read_file clay ~env path =
  try
    let fs = Eio.Stdenv.fs env in
    let full_path = Eio.Path.(fs / clay.config.pier_path / path) in

    (* Async read - doesn't block other operations! *)
    let contents = Eio.Path.load full_path in

    clay.stats.files_read <- Int64.succ clay.stats.files_read;
    clay.stats.bytes_read <- Int64.add clay.stats.bytes_read (Int64.of_int (String.length contents));

    Success (Bytes.of_string contents)
  with
  | e -> Error (Printf.sprintf "Failed to read %s: %s" path (Printexc.to_string e))

(* Write file asynchronously *)
let write_file clay ~env path data =
  try
    let fs = Eio.Stdenv.fs env in

    (* Ensure pier directory exists *)
    let pier = Eio.Path.(fs / clay.config.pier_path) in
    (try
      match Eio.Path.kind ~follow:true pier with
      | `Directory -> ()
      | _ -> Eio.Path.mkdir ~perm:0o755 pier
    with
    | _ -> Eio.Path.mkdir ~perm:0o755 pier
    );

    (* Ensure subdirectory exists if needed *)
    let dir_path = Filename.dirname path in
    if dir_path <> "." && dir_path <> "" then (
      let dir_full = Eio.Path.(fs / clay.config.pier_path / dir_path) in
      try
        match Eio.Path.kind ~follow:true dir_full with
        | `Directory -> ()
        | _ -> Eio.Path.mkdir ~perm:0o755 dir_full
      with
      | _ -> Eio.Path.mkdir ~perm:0o755 dir_full
    );

    let full_path = Eio.Path.(fs / clay.config.pier_path / path) in

    (* Async write - doesn't block other operations! *)
    Eio.Path.save ~create:(`Or_truncate 0o644) full_path (Bytes.to_string data);

    clay.stats.files_written <- Int64.succ clay.stats.files_written;
    clay.stats.bytes_written <- Int64.add clay.stats.bytes_written (Int64.of_int (Bytes.length data));

    Success ()
  with
  | e -> Error (Printf.sprintf "Failed to write %s: %s" path (Printexc.to_string e))

(* Delete file *)
let delete_file clay ~env path =
  try
    let fs = Eio.Stdenv.fs env in
    let full_path = Eio.Path.(fs / clay.config.pier_path / path) in

    Eio.Path.unlink full_path;

    Success ()
  with
  | e -> Error (Printf.sprintf "Failed to delete %s: %s" path (Printexc.to_string e))

(* Check if file exists *)
let file_exists clay ~env path =
  try
    let fs = Eio.Stdenv.fs env in
    let full_path = Eio.Path.(fs / clay.config.pier_path / path) in

    match Eio.Path.kind ~follow:true full_path with
    | `Regular_file -> true
    | _ -> false
  with
  | _ -> false

(* List directory contents *)
let list_directory clay ~env path =
  try
    let fs = Eio.Stdenv.fs env in
    let full_path = Eio.Path.(fs / clay.config.pier_path / path) in

    clay.stats.dir_scans <- Int64.succ clay.stats.dir_scans;

    let entries = Eio.Path.read_dir full_path in
    Success entries
  with
  | e -> Error (Printf.sprintf "Failed to list directory %s: %s" path (Printexc.to_string e))

(* Parallel file read - read multiple files concurrently! *)
let parallel_read clay ~env ~sw paths =
  Printf.printf "[Clay] Reading %d files in parallel...\n%!" (List.length paths);

  let start = Unix.gettimeofday () in

  (* Create fibers for each file read *)
  let results = List.map (fun path ->
    let promise = ref None in
    Eio.Fiber.fork ~sw (fun () ->
      let result = read_file clay ~env path in
      promise := Some (path, result)
    );
    promise
  ) paths in

  (* Wait a bit for fibers to complete *)
  Eio.Time.sleep (Eio.Stdenv.clock env) 0.01;

  (* Collect results *)
  let collected = List.filter_map (fun promise ->
    match !promise with
    | Some result -> Some result
    | None -> None
  ) results in

  let elapsed = Unix.gettimeofday () -. start in
  Printf.printf "[Clay] Read %d/%d files in %.4fs (%.0f files/sec)\n%!"
    (List.length collected) (List.length paths) elapsed
    (float_of_int (List.length collected) /. elapsed);

  collected

(* Parallel file write - write multiple files concurrently! *)
let parallel_write clay ~env ~sw files =
  Printf.printf "[Clay] Writing %d files in parallel...\n%!" (List.length files);

  let start = Unix.gettimeofday () in

  (* Create fibers for each file write *)
  let results = List.map (fun (path, data) ->
    let promise = ref None in
    Eio.Fiber.fork ~sw (fun () ->
      let result = write_file clay ~env path data in
      promise := Some (path, result)
    );
    promise
  ) files in

  (* Wait a bit for fibers to complete *)
  Eio.Time.sleep (Eio.Stdenv.clock env) 0.01;

  (* Collect results *)
  let collected = List.filter_map (fun promise ->
    match !promise with
    | Some result -> Some result
    | None -> None
  ) results in

  let elapsed = Unix.gettimeofday () -. start in
  Printf.printf "[Clay] Wrote %d/%d files in %.4fs (%.0f files/sec)\n%!"
    (List.length collected) (List.length files) elapsed
    (float_of_int (List.length collected) /. elapsed);

  collected

(* Recursive directory scan *)
let rec scan_directory clay ~env ?(prefix="") path =
  match list_directory clay ~env path with
  | Error e ->
      Printf.printf "[Clay] Error scanning %s: %s\n%!" path e;
      []
  | Success entries ->
      List.fold_left (fun acc entry ->
        let full_path = if path = "" then entry else path ^ "/" ^ entry in
        let item_path = prefix ^ "/" ^ entry in

        let fs = Eio.Stdenv.fs env in
        let full = Eio.Path.(fs / clay.config.pier_path / full_path) in

        match Eio.Path.kind ~follow:false full with
        | `Directory ->
            (* Recurse into directory *)
            let sub_files = scan_directory clay ~env ~prefix:item_path full_path in
            sub_files @ acc
        | `Regular_file ->
            item_path :: acc
        | _ ->
            acc
      ) [] entries

(* Batch copy - copy multiple files efficiently *)
let batch_copy clay ~env ~sw src_paths dest_dir =
  Printf.printf "[Clay] Batch copying %d files to %s...\n%!"
    (List.length src_paths) dest_dir;

  let start = Unix.gettimeofday () in

  (* Read all source files in parallel *)
  let file_data = parallel_read clay ~env ~sw src_paths in

  (* Prepare destination paths *)
  let dest_files = List.map (fun (src_path, result) ->
    match result with
    | Success data ->
        let filename = Filename.basename src_path in
        let dest_path = dest_dir ^ "/" ^ filename in
        Some (dest_path, data)
    | Error _ -> None
  ) file_data |> List.filter_map (fun x -> x) in

  (* Write all destination files in parallel *)
  let _ = parallel_write clay ~env ~sw dest_files in

  let elapsed = Unix.gettimeofday () -. start in
  Printf.printf "[Clay] Batch copy completed in %.4fs\n%!" elapsed;

  Success (List.length dest_files)

(* Watch directory for changes - fiber-based file watcher *)
let watch_directory _clay ~env:_ ~sw:_ ~event_stream:_ _path =
  (* Note: Eio doesn't have built-in inotify yet *)
  (* This would use inotify on Linux or FSEvents on macOS *)
  (* For now, this is a placeholder *)
  Printf.printf "[Clay] File watching not yet implemented (requires inotify integration)\n%!";
  ()

(* Get statistics *)
let get_stats clay = clay.stats

(* Run Clay driver *)
let run clay ~env ~sw ~event_stream =
  Printf.printf "[Clay] Starting filesystem driver for pier: %s\n%!"
    clay.config.pier_path;

  (* Create pier directory if it doesn't exist *)
  let fs = Eio.Stdenv.fs env in
  let pier = Eio.Path.(fs / clay.config.pier_path) in

  (try
    match Eio.Path.kind ~follow:true pier with
    | `Directory -> ()
    | _ -> Eio.Path.mkdir ~perm:0o755 pier
  with
  | _ -> Eio.Path.mkdir ~perm:0o755 pier
  );

  (* Spawn file watcher fiber (if implemented) *)
  Eio.Fiber.fork ~sw (fun () ->
    watch_directory clay ~env ~sw ~event_stream ""
  );

  Printf.printf "[Clay] Filesystem driver running!\n%!"