summaryrefslogtreecommitdiff
path: root/ocaml/lib/domain_pool.ml
blob: 06a5ce458879c8d63293fa3123406b96281e06e0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
(* Domain Pool - Manage worker domains for parallel Nock execution
 *
 * This module provides a pool of worker domains that can execute
 * Nock computations in parallel across multiple CPU cores.
 *
 * Key innovation: Uses Domainslib.Task for work distribution
 *)

(* Domain pool configuration *)
type config = {
  num_domains: int;  (* Number of worker domains, default: num_cpus - 1 *)
}

(* Domain pool state *)
type t = {
  config: config;
  pool: Domainslib.Task.pool;
}

(* Create domain pool *)
let create ?(num_domains = Domain.recommended_domain_count () - 1) () =
  let num_domains = max 1 num_domains in  (* At least 1 domain *)
  Printf.printf "[DomainPool] Creating pool with %d domains\n%!" num_domains;

  let config = { num_domains } in
  let pool = Domainslib.Task.setup_pool ~num_domains () in

  { config; pool }

(* Shutdown domain pool *)
let shutdown pool =
  Printf.printf "[DomainPool] Shutting down pool\n%!";
  Domainslib.Task.teardown_pool pool.pool

(* Run a single task in the pool *)
let run pool f =
  Domainslib.Task.run pool.pool f

(* Run multiple tasks in parallel *)
let parallel_map pool f items =
  let items_array = Array.of_list items in
  let n = Array.length items_array in
  let results = Array.make n None in

  Domainslib.Task.run pool.pool (fun () ->
    Domainslib.Task.parallel_for pool.pool
      ~chunk_size:1
      ~start:0
      ~finish:(n - 1)
      ~body:(fun i ->
        let result = f items_array.(i) in
        results.(i) <- Some result
      )
  );

  (* Convert results to list *)
  Array.to_list results |> List.filter_map (fun x -> x)

(* Run tasks in parallel and collect results *)
let parallel_for pool ~start ~finish ~body =
  let results = Array.make (finish - start + 1) None in

  Domainslib.Task.run pool.pool (fun () ->
    Domainslib.Task.parallel_for pool.pool
      ~chunk_size:1
      ~start
      ~finish:(finish + 1)
      ~body:(fun i ->
        let result = body i in
        results.(i - start) <- Some result
      )
  );

  (* Collect results *)
  Array.to_list results |> List.filter_map (fun x -> x)

(* Async await - execute task and return promise *)
let async pool f =
  Domainslib.Task.async pool.pool f

(* Wait for async task to complete *)
let await = Domainslib.Task.await

(* Pool statistics type *)
type stats = {
  num_domains: int;
  available_cores: int;
}

(* Get pool statistics *)
let stats pool = {
  num_domains = pool.config.num_domains;
  available_cores = Domain.recommended_domain_count ();
}