blob: 06a5ce458879c8d63293fa3123406b96281e06e0 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
(* Domain Pool - Manage worker domains for parallel Nock execution
*
* This module provides a pool of worker domains that can execute
* Nock computations in parallel across multiple CPU cores.
*
* Key innovation: Uses Domainslib.Task for work distribution
*)
(* Domain pool configuration *)
type config = {
num_domains: int; (* Number of worker domains, default: num_cpus - 1 *)
}
(* Domain pool state *)
type t = {
config: config;
pool: Domainslib.Task.pool;
}
(* Create domain pool *)
let create ?(num_domains = Domain.recommended_domain_count () - 1) () =
let num_domains = max 1 num_domains in (* At least 1 domain *)
Printf.printf "[DomainPool] Creating pool with %d domains\n%!" num_domains;
let config = { num_domains } in
let pool = Domainslib.Task.setup_pool ~num_domains () in
{ config; pool }
(* Shutdown domain pool *)
let shutdown pool =
Printf.printf "[DomainPool] Shutting down pool\n%!";
Domainslib.Task.teardown_pool pool.pool
(* Run a single task in the pool *)
let run pool f =
Domainslib.Task.run pool.pool f
(* Run multiple tasks in parallel *)
let parallel_map pool f items =
let items_array = Array.of_list items in
let n = Array.length items_array in
let results = Array.make n None in
Domainslib.Task.run pool.pool (fun () ->
Domainslib.Task.parallel_for pool.pool
~chunk_size:1
~start:0
~finish:(n - 1)
~body:(fun i ->
let result = f items_array.(i) in
results.(i) <- Some result
)
);
(* Convert results to list *)
Array.to_list results |> List.filter_map (fun x -> x)
(* Run tasks in parallel and collect results *)
let parallel_for pool ~start ~finish ~body =
let results = Array.make (finish - start + 1) None in
Domainslib.Task.run pool.pool (fun () ->
Domainslib.Task.parallel_for pool.pool
~chunk_size:1
~start
~finish:(finish + 1)
~body:(fun i ->
let result = body i in
results.(i - start) <- Some result
)
);
(* Collect results *)
Array.to_list results |> List.filter_map (fun x -> x)
(* Async await - execute task and return promise *)
let async pool f =
Domainslib.Task.async pool.pool f
(* Wait for async task to complete *)
let await = Domainslib.Task.await
(* Pool statistics type *)
type stats = {
num_domains: int;
available_cores: int;
}
(* Get pool statistics *)
let stats pool = {
num_domains = pool.config.num_domains;
available_cores = Domain.recommended_domain_count ();
}
|