(* Clay - Filesystem Driver with Eio * * This is the filesystem driver for Urbit's Clay vane. * Uses Eio.Path for async file I/O - MUCH faster than C Vere's blocking I/O! * * Key innovations vs C Vere: * - C Vere: Blocking file I/O, sequential operations, slow directory scans * - Overe: Async I/O with Eio.Path, parallel file operations, MASSIVE speedup! *) (* Clay configuration *) type config = { pier_path: string; (* Base path for pier *) } (* File operation result *) type 'a file_result = | Success of 'a | Error of string (* Clay driver state *) type t = { config: config; mutable stats: stats; } and stats = { mutable files_read: int64; mutable files_written: int64; mutable bytes_read: int64; mutable bytes_written: int64; mutable dir_scans: int64; } (* Create Clay driver *) let create config = { config; stats = { files_read = 0L; files_written = 0L; bytes_read = 0L; bytes_written = 0L; dir_scans = 0L; }; } (* Read file asynchronously *) let read_file clay ~env path = try let fs = Eio.Stdenv.fs env in let full_path = Eio.Path.(fs / clay.config.pier_path / path) in (* Async read - doesn't block other operations! *) let contents = Eio.Path.load full_path in clay.stats.files_read <- Int64.succ clay.stats.files_read; clay.stats.bytes_read <- Int64.add clay.stats.bytes_read (Int64.of_int (String.length contents)); Success (Bytes.of_string contents) with | e -> Error (Printf.sprintf "Failed to read %s: %s" path (Printexc.to_string e)) (* Write file asynchronously *) let write_file clay ~env path data = try let fs = Eio.Stdenv.fs env in (* Ensure pier directory exists *) let pier = Eio.Path.(fs / clay.config.pier_path) in (try match Eio.Path.kind ~follow:true pier with | `Directory -> () | _ -> Eio.Path.mkdir ~perm:0o755 pier with | _ -> Eio.Path.mkdir ~perm:0o755 pier ); (* Ensure subdirectory exists if needed *) let dir_path = Filename.dirname path in if dir_path <> "." && dir_path <> "" then ( let dir_full = Eio.Path.(fs / clay.config.pier_path / dir_path) in try match Eio.Path.kind ~follow:true dir_full with | `Directory -> () | _ -> Eio.Path.mkdir ~perm:0o755 dir_full with | _ -> Eio.Path.mkdir ~perm:0o755 dir_full ); let full_path = Eio.Path.(fs / clay.config.pier_path / path) in (* Async write - doesn't block other operations! *) Eio.Path.save ~create:(`Or_truncate 0o644) full_path (Bytes.to_string data); clay.stats.files_written <- Int64.succ clay.stats.files_written; clay.stats.bytes_written <- Int64.add clay.stats.bytes_written (Int64.of_int (Bytes.length data)); Success () with | e -> Error (Printf.sprintf "Failed to write %s: %s" path (Printexc.to_string e)) (* Delete file *) let delete_file clay ~env path = try let fs = Eio.Stdenv.fs env in let full_path = Eio.Path.(fs / clay.config.pier_path / path) in Eio.Path.unlink full_path; Success () with | e -> Error (Printf.sprintf "Failed to delete %s: %s" path (Printexc.to_string e)) (* Check if file exists *) let file_exists clay ~env path = try let fs = Eio.Stdenv.fs env in let full_path = Eio.Path.(fs / clay.config.pier_path / path) in match Eio.Path.kind ~follow:true full_path with | `Regular_file -> true | _ -> false with | _ -> false (* List directory contents *) let list_directory clay ~env path = try let fs = Eio.Stdenv.fs env in let full_path = Eio.Path.(fs / clay.config.pier_path / path) in clay.stats.dir_scans <- Int64.succ clay.stats.dir_scans; let entries = Eio.Path.read_dir full_path in Success entries with | e -> Error (Printf.sprintf "Failed to list directory %s: %s" path (Printexc.to_string e)) (* Parallel file read - read multiple files concurrently! *) let parallel_read clay ~env ~sw paths = Printf.printf "[Clay] Reading %d files in parallel...\n%!" (List.length paths); let start = Unix.gettimeofday () in (* Create fibers for each file read *) let results = List.map (fun path -> let promise = ref None in Eio.Fiber.fork ~sw (fun () -> let result = read_file clay ~env path in promise := Some (path, result) ); promise ) paths in (* Wait a bit for fibers to complete *) Eio.Time.sleep (Eio.Stdenv.clock env) 0.01; (* Collect results *) let collected = List.filter_map (fun promise -> match !promise with | Some result -> Some result | None -> None ) results in let elapsed = Unix.gettimeofday () -. start in Printf.printf "[Clay] Read %d/%d files in %.4fs (%.0f files/sec)\n%!" (List.length collected) (List.length paths) elapsed (float_of_int (List.length collected) /. elapsed); collected (* Parallel file write - write multiple files concurrently! *) let parallel_write clay ~env ~sw files = Printf.printf "[Clay] Writing %d files in parallel...\n%!" (List.length files); let start = Unix.gettimeofday () in (* Create fibers for each file write *) let results = List.map (fun (path, data) -> let promise = ref None in Eio.Fiber.fork ~sw (fun () -> let result = write_file clay ~env path data in promise := Some (path, result) ); promise ) files in (* Wait a bit for fibers to complete *) Eio.Time.sleep (Eio.Stdenv.clock env) 0.01; (* Collect results *) let collected = List.filter_map (fun promise -> match !promise with | Some result -> Some result | None -> None ) results in let elapsed = Unix.gettimeofday () -. start in Printf.printf "[Clay] Wrote %d/%d files in %.4fs (%.0f files/sec)\n%!" (List.length collected) (List.length files) elapsed (float_of_int (List.length collected) /. elapsed); collected (* Recursive directory scan *) let rec scan_directory clay ~env ?(prefix="") path = match list_directory clay ~env path with | Error e -> Printf.printf "[Clay] Error scanning %s: %s\n%!" path e; [] | Success entries -> List.fold_left (fun acc entry -> let full_path = if path = "" then entry else path ^ "/" ^ entry in let item_path = prefix ^ "/" ^ entry in let fs = Eio.Stdenv.fs env in let full = Eio.Path.(fs / clay.config.pier_path / full_path) in match Eio.Path.kind ~follow:false full with | `Directory -> (* Recurse into directory *) let sub_files = scan_directory clay ~env ~prefix:item_path full_path in sub_files @ acc | `Regular_file -> item_path :: acc | _ -> acc ) [] entries (* Batch copy - copy multiple files efficiently *) let batch_copy clay ~env ~sw src_paths dest_dir = Printf.printf "[Clay] Batch copying %d files to %s...\n%!" (List.length src_paths) dest_dir; let start = Unix.gettimeofday () in (* Read all source files in parallel *) let file_data = parallel_read clay ~env ~sw src_paths in (* Prepare destination paths *) let dest_files = List.map (fun (src_path, result) -> match result with | Success data -> let filename = Filename.basename src_path in let dest_path = dest_dir ^ "/" ^ filename in Some (dest_path, data) | Error _ -> None ) file_data |> List.filter_map (fun x -> x) in (* Write all destination files in parallel *) let _ = parallel_write clay ~env ~sw dest_files in let elapsed = Unix.gettimeofday () -. start in Printf.printf "[Clay] Batch copy completed in %.4fs\n%!" elapsed; Success (List.length dest_files) (* Watch directory for changes - fiber-based file watcher *) let watch_directory _clay ~env:_ ~sw:_ ~event_stream:_ _path = (* Note: Eio doesn't have built-in inotify yet *) (* This would use inotify on Linux or FSEvents on macOS *) (* For now, this is a placeholder *) Printf.printf "[Clay] File watching not yet implemented (requires inotify integration)\n%!"; () (* Get statistics *) let get_stats clay = clay.stats (* Run Clay driver *) let run clay ~env ~sw ~event_stream = Printf.printf "[Clay] Starting filesystem driver for pier: %s\n%!" clay.config.pier_path; (* Create pier directory if it doesn't exist *) let fs = Eio.Stdenv.fs env in let pier = Eio.Path.(fs / clay.config.pier_path) in (try match Eio.Path.kind ~follow:true pier with | `Directory -> () | _ -> Eio.Path.mkdir ~perm:0o755 pier with | _ -> Eio.Path.mkdir ~perm:0o755 pier ); (* Spawn file watcher fiber (if implemented) *) Eio.Fiber.fork ~sw (fun () -> watch_directory clay ~env ~sw ~event_stream "" ); Printf.printf "[Clay] Filesystem driver running!\n%!"