diff --git a/src/lib_bees/task_worker.ml b/src/lib_bees/task_worker.ml index 5ba088fdbf8c3de354d879497056c704f3184107..3501248ae562f0bf26f7353d47ca14c2152af19b 100644 --- a/src/lib_bees/task_worker.ml +++ b/src/lib_bees/task_worker.ml @@ -83,11 +83,17 @@ let worker name domains = let launch_task_and_wait worker name on_request ?on_completion param = let r = Request.Task {name; on_request; param; on_completion} in - Worker.Queue.push_request_and_wait_eio worker r |> Lwt_eio.Promise.await_eio + Worker.Queue.push_request_and_wait_eio worker r let launch_tasks_and_wait worker name func ?on_completion args = - Lwt_list.map_p (launch_task_and_wait worker name ?on_completion func) args + Eio.Fiber.List.map + (fun arg -> + launch_task_and_wait worker name ?on_completion func arg + |> Eio.Promise.await) + args let launch_task worker name on_request ?on_completion param = let r = Request.Task {name; on_request; param; on_completion} in Worker.Queue.push_request_eio worker r + +let shutdown task_worker = Worker.shutdown_eio task_worker diff --git a/src/lib_bees/task_worker.mli b/src/lib_bees/task_worker.mli index 80f550833419526adb0f450679fa5865e0909438..a336d23eda282fe6a1953a74120981d53eb3cddd 100644 --- a/src/lib_bees/task_worker.mli +++ b/src/lib_bees/task_worker.mli @@ -35,7 +35,7 @@ val launch_task_and_wait : ('a -> 'b) -> ?on_completion:('b -> unit) -> 'a -> - ('b, 'c message_error) result Lwt.t + ('b, 'c message_error) result Eio.Promise.t (** [launch_tasks_and_wait worker name func ?on_completion args] runs {!val-launch_task_and_wait} for each each [arg] in [args], in parallel. *) @@ -45,7 +45,7 @@ val launch_tasks_and_wait : ('a -> 'b) -> ?on_completion:('b -> unit) -> 'a list -> - ('b, 'c message_error) result list Lwt.t + ('b, 'c message_error) result list (** [launch_task worker name func ?on_completion arg] create a request named [name] and executing [func args], and push it to [worker] queue, true if the @@ -59,3 +59,7 @@ val launch_task : ?on_completion:('b -> unit) -> 'a -> bool + +(** [shutdown worker] waits for all requests to be completed + then closes the worker. *) +val shutdown : task_worker -> unit diff --git a/src/lib_bees/test/test_bees_task_worker.ml b/src/lib_bees/test/test_bees_task_worker.ml index 4a6038fb75d71cf549e451c830ee96d09a5541d0..e2ee01fb73958cf567ae4fb90d658f25f05ef871 100644 --- a/src/lib_bees/test/test_bees_task_worker.ml +++ b/src/lib_bees/test/test_bees_task_worker.ml @@ -32,7 +32,7 @@ let tests_fibonacci = let rec fib n = if n <= 1 then n else fib (n - 1) + fib (n - 2) in let input = Stdlib.List.init 5 (fun i -> i + 10) in let expected = List.map fib input in - let* output = + let output = Tezos_bees.Task_worker.launch_tasks_and_wait worker "fib" fib input in let output = List.filter_map Result.to_option output in @@ -70,8 +70,8 @@ let tests_reuse = succ str_input in - let* int_output in - let* str_output in + let int_output = Eio.Promise.await int_output in + let str_output = Eio.Promise.await str_output in Assert.equal (Ok int_expected) int_output ; Assert.equal (Ok str_expected) str_output ; Lwt.return_ok () @@ -86,13 +86,14 @@ let tests_on_completion_callback = | Ok worker -> let r = ref 0 in let noop () = () in - let* _ = + let _ = Tezos_bees.Task_worker.launch_task_and_wait worker "callback" noop () + |> Eio.Promise.await in Assert.equal !r 0 ; let on_completion () = incr r in let tasks = 2 in - let* _ = + let _ = Tezos_bees.Task_worker.launch_tasks_and_wait ~on_completion worker diff --git a/tezt/manual_tests/eio_benchmarks/test.ml b/tezt/manual_tests/eio_benchmarks/test.ml index 8604028af69483bcf063f5a193e56403ab6bb742..76f2a3db2768410dad4d1ded476c2a21f236ca31 100644 --- a/tezt/manual_tests/eio_benchmarks/test.ml +++ b/tezt/manual_tests/eio_benchmarks/test.ml @@ -174,7 +174,7 @@ let () = (* Eio Worker running with Eio handler *) let rec bench_all domains = - if domains > int_of_string Sys.argv.(1) then Lwt.return_unit + if domains > int_of_string Sys.argv.(1) then () else ( Eio.traceln "Eio Worker running with eio handlers and %d domains@." @@ -198,9 +198,49 @@ let () = let worker = match worker with Ok w -> w | Error _ -> assert false in run eio_compute worker ; let _ = Worker.shutdown_eio worker in - Format.printf "stop@." ; - let () = (() [@profiler.stop]) in - bench_all (domains * 2)) + (let () = (() [@profiler.stop]) in + Eio.traceln "Task_worker running %d domains@." domains) + [@profiler.record + {verbosity = Notice} (Format.sprintf "task_worker_%d_domains" domains)]) ; + let task_worker = + match Tezos_bees.Task_worker.worker "task_worker" domains with + | Ok w -> w + | Error _ -> assert false + in + let eio_task_worker worker label algo = + (let res : + ((bool, 'a) result, 'b Tezos_bees.Task_worker.message_error) result + list = + (fun worker keys_and_signatures -> + Tezos_bees.Task_worker.launch_tasks_and_wait + worker + "task_worker_checks" + (fun (pk, _sk, msg, signature) -> + Ok (Signature.check pk signature msg)) + keys_and_signatures) + |> compute worker label algo + in + + let res = + List.map + (function + | Ok (Ok b) -> b + | Ok (Error _) -> assert false + | Error _ -> assert false) + res + in + assert (List.for_all Fun.id res)) + [@profiler.aggregate_f {verbosity = Notice} label] + in + + run eio_task_worker task_worker ; + let _ = Tezos_bees.Task_worker.shutdown task_worker in + let () = (() [@profiler.stop]) in + + Format.printf "stop@." ; + + bench_all (domains * 2) in + bench_all 1