From 174692d6108754e6e3957a959032012c37136e4c Mon Sep 17 00:00:00 2001 From: Mathias Bourgoin Date: Sun, 5 Oct 2025 00:32:06 +0200 Subject: [PATCH 1/9] bees: add saturn dependency for thread-safe tables --- manifest/externals.ml | 2 ++ manifest/product_octez.ml | 1 + opam/octez-libs.opam | 1 + opam/virtual/octez-deps.opam | 1 + src/lib_bees/dune | 3 ++- 5 files changed, 7 insertions(+), 1 deletion(-) diff --git a/manifest/externals.ml b/manifest/externals.ml index 8a650d74e1eb..a186b27a00da 100644 --- a/manifest/externals.ml +++ b/manifest/externals.ml @@ -286,6 +286,8 @@ let aches_lwt = external_lib "aches-lwt" V.(at_least "1.1.0") let safepass = external_lib "safepass" V.True +let saturn = external_lib "saturn" V.(at_least "0.5.0") + let secp256k1_internal = let version = V.(at_least "0.4.0") in external_lib "secp256k1-internal" version diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index 658ebd8cd4bb..cd549c1990bd 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -2661,6 +2661,7 @@ let octez_bees = octez_stdlib_unix |> open_; octez_profiler_backends; octez_profiler_complex_backends; + saturn; ] let _octez_bees_tests = diff --git a/opam/octez-libs.opam b/opam/octez-libs.opam index 6958208f509b..7056ad6b5e30 100644 --- a/opam/octez-libs.opam +++ b/opam/octez-libs.opam @@ -78,6 +78,7 @@ depends: [ "lru" { >= "0.3.0" } "semaphore-compat" { >= "1.0.1" } "checkseum" { != "0.5.0" } + "saturn" { >= "0.5.0" } "ringo" { >= "1.1.0" } "octez-internal-libs" { = version } "conf-rust" diff --git a/opam/virtual/octez-deps.opam b/opam/virtual/octez-deps.opam index e2eea7440e25..41dccf1fa430 100644 --- a/opam/virtual/octez-deps.opam +++ b/opam/virtual/octez-deps.opam @@ -111,6 +111,7 @@ depends: [ "rope" { >= "0.6.2" } "rusage" "safepass" + "saturn" { >= "0.5.0" } "secp256k1-internal" { >= "0.4.0" } "semaphore-compat" { >= "1.0.1" } "seqes" { >= "0.2" } diff --git a/src/lib_bees/dune b/src/lib_bees/dune index ed818160552b..86af8bd547f5 100644 --- a/src/lib_bees/dune +++ b/src/lib_bees/dune @@ -9,7 +9,8 @@ octez-libs.base octez-libs.stdlib-unix octez-libs.octez-profiler.backends - octez-libs.octez-profiler.complex_backends) + octez-libs.octez-profiler.complex_backends + saturn) (preprocess (pps octez-libs.ppx_profiler)) (preprocessor_deps (env_var TEZOS_PPX_PROFILER)) (flags -- GitLab From ed37901a2d9d2a41ca7fff748e8412b98f3d40bd Mon Sep 17 00:00:00 2001 From: mbourgoin Date: Mon, 1 Dec 2025 18:18:05 +0100 Subject: [PATCH 2/9] bees/hive: use lock-free Saturn registry --- src/lib_bees/hive.ml | 38 +++++++++++++++++--------------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/src/lib_bees/hive.ml b/src/lib_bees/hive.ml index 5fb172cafc83..9a0b373ef302 100644 --- a/src/lib_bees/hive.ml +++ b/src/lib_bees/hive.ml @@ -15,11 +15,21 @@ type worker = -> worker module WorkerTbl = struct - type t = worker String.Hashtbl.t + module Htbl = Saturn.Htbl - let create ~initial_size = String.Hashtbl.create initial_size + type t = (string, worker) Htbl.t - let find_opt tbl ~name = String.Hashtbl.find_opt tbl name + let create ~initial_size = + let min_buckets = max 1 initial_size in + Htbl.create ~hashed_type:(module String) ~min_buckets () + + let find_opt tbl ~name = Htbl.find_opt tbl name + + let replace tbl ~name worker = + if Htbl.try_add tbl name worker then () + else if Htbl.try_set tbl name worker then () + else (* unreachable *) + assert false end type t = { @@ -33,22 +43,6 @@ let hive = lwt_tasks_stream = Eio.Stream.create max_int; } -(* Initialize the [lwt_scheduler_loop] by running it in its own domain in the - main Eio switch *) -let () = - let lwt_scheduler_loop () = - let rec loop () : [`Stop_daemon] = - let lwt_closure = Eio.Stream.take hive.lwt_tasks_stream in - (* The loop will run in the [Event_loop] main domain, so [Eio.run_lwt] is - fine. *) - Lwt_eio.run_lwt lwt_closure ; - loop () - in - loop () - in - Tezos_base_unix.Event_loop.on_main_run (fun _env switch -> - Eio.Fiber.fork_daemon ~sw:switch lwt_scheduler_loop) - let async_lwt = Eio.Stream.add hive.lwt_tasks_stream exception Unknown_worker of string @@ -71,6 +65,8 @@ let launch_worker (type worker) ?switch (worker : worker) ~bee_name ~domains The Event_loop's main switch is meant to be a switch available during the whole life of your main process, so you can use it by default. + This accessor must be called from the main domain; callers off-main should + schedule via [run_on_main] to avoid blocking on the main switch lookup. *) let switch = match switch with @@ -81,9 +77,9 @@ let launch_worker (type worker) ?switch (worker : worker) ~bee_name ~domains Eio.Fiber.fork_daemon ~sw:switch (fun () -> Eio.Domain_manager.run env#domain_mgr (fun () -> worker_loop i worker)) done ; - String.Hashtbl.add + WorkerTbl.replace workers - bee_name + ~name:bee_name (Worker {worker; launched = Time.System.now (); subdomains = domains; switch}) -- GitLab From 25159e0dc5c76c1e70c2aa6e3317f9b12fe6f5bf Mon Sep 17 00:00:00 2001 From: Mathias Bourgoin Date: Fri, 21 Nov 2025 12:51:02 +0100 Subject: [PATCH 3/9] bees: launch workers on Event_loop main switch --- src/lib_base/unix/event_loop.mli | 4 ++- src/lib_bees/hive.ml | 39 +++++++++++++++++++++++++++ src/lib_bees/hive.mli | 4 +++ src/lib_bees/worker.ml | 45 ++++++++++++++++++-------------- src/lib_bees/worker.mli | 11 ++++++-- 5 files changed, 81 insertions(+), 22 deletions(-) diff --git a/src/lib_base/unix/event_loop.mli b/src/lib_base/unix/event_loop.mli index fe63dbb7992e..6639a9f30906 100644 --- a/src/lib_base/unix/event_loop.mli +++ b/src/lib_base/unix/event_loop.mli @@ -26,7 +26,9 @@ val env_exn : unit -> Eio_unix.Stdenv.base (** Retrieve the main switch for the current [main_run] being executed. The returned switch must not escape the scope of this [main_run] - execution. + execution. Call only from the main domain; code running off-main should + schedule work via {!Tezos_bees.Hive.run_on_main} or an equivalent helper to + avoid deadlocking while waiting for the switch. Ideally, an Eio-based function that needs to allocate resources locally should create its own switch to have better control over resource usage. diff --git a/src/lib_bees/hive.ml b/src/lib_bees/hive.ml index 9a0b373ef302..22cd0c549958 100644 --- a/src/lib_bees/hive.ml +++ b/src/lib_bees/hive.ml @@ -90,3 +90,42 @@ let get_error bee_name = to be called from within an existing worker loop. *) | None -> raise (Unknown_worker bee_name) | Some (Worker {switch; _}) -> Eio.Switch.get_error switch + +(* [main_job] encapsulates a closure to be executed on the main domain + and a resolver to return the result to the caller. *) +type main_job = + | Job : { + run : unit -> 'a; + resolver : ('a, exn) result Eio.Promise.u; + } + -> main_job + +(* A stream of jobs to be executed on the main domain. *) +let main_jobs = Eio.Stream.create max_int + +(* [run_main_jobs] is a daemon that consumes jobs from [main_jobs] and + executes them. It is started on the main domain by [Event_loop.on_main_run]. *) +let () = + let rec run_main_jobs () : [`Stop_daemon] = + let (Job {run; resolver}) = Eio.Stream.take main_jobs in + let outcome = + match run () with value -> Ok value | exception exn -> Error exn + in + Eio.Promise.resolve resolver outcome ; + run_main_jobs () + in + Tezos_base_unix.Event_loop.on_main_run (fun _env switch -> + Eio.Fiber.fork_daemon ~sw:switch run_main_jobs) + +let run_on_main f = + match Tezos_base_unix.Event_loop.main_switch () with + | None -> + invalid_arg + "Tezos_bees.Hive.run_on_main called before Event_loop main_run \ + has started" + | Some _ -> ( + let promise, resolver = Eio.Promise.create () in + Eio.Stream.add main_jobs (Job {run = f; resolver}) ; + match Eio.Promise.await promise with + | Ok value -> value + | Error exn -> raise exn) diff --git a/src/lib_bees/hive.mli b/src/lib_bees/hive.mli index 82c33d8bbf9e..598f0b2a4de2 100644 --- a/src/lib_bees/hive.mli +++ b/src/lib_bees/hive.mli @@ -37,3 +37,7 @@ val get_error : string -> exn option consequently advised to handle exceptions directly in the closure and return a [result]. *) val async_lwt : (unit -> unit Lwt.t) -> unit + +(** Schedule [f] to run on the Event_loop main switch and return its result. + Callers can invoke this from any domain to ensure main-domain execution. *) +val run_on_main : (unit -> 'a) -> 'a diff --git a/src/lib_bees/worker.ml b/src/lib_bees/worker.ml index 096d74d1b579..bbda8e1b2799 100644 --- a/src/lib_bees/worker.ml +++ b/src/lib_bees/worker.ml @@ -761,27 +761,34 @@ struct (module Handlers : EIO_HANDLERS with type self = kind t and type launch_error = launch_error) = - let state = Handlers.on_launch worker name parameters in - if id_name = base_name then - Worker_events.(emit__dont_wait__use_with_care started) () - else - Worker_events.(emit__dont_wait__use_with_care started_for) - (Format.asprintf "%a" Name.pp name) ; - match state with - | Ok state -> - worker.state <- Some state ; - worker.status <- Worker_types.Running (Time.System.now ()) ; - Hive.launch_worker - worker - ~bee_name:(Format.asprintf "%a" Name.pp name) - ~domains - (worker_loop (module Handlers)) ; - Ok worker - | Error e -> Error e + try + let state = + Hive.run_on_main (fun () -> Handlers.on_launch worker name parameters) + in + if id_name = base_name then + Hive.async_lwt (fun () -> Worker_events.(emit started) ()) + else + Hive.async_lwt (fun () -> + Worker_events.(emit started_for) (Format.asprintf "%a" Name.pp name)) ; + match state with + | Ok state -> + worker.state <- Some state ; + worker.status <- Worker_types.Running (Time.System.now ()) ; + Hive.run_on_main (fun () -> + Hive.launch_worker + worker + ~bee_name:(Format.asprintf "%a" Name.pp name) + ~domains + (worker_loop (module Handlers))) ; + Ok worker + | Error e -> Error e + with exn -> + (* Ensure any partially started worker is shut down before propagating. *) + (try shutdown_eio worker with _ -> ()) ; + raise exn (** [launch table name parameters handlers] creates a single worker - instance, using exclusively Lwt handlers. As such, it won't start multiple - domains. *) + instance, using exclusively Lwt handlers. *) let launch (type kind launch_error) (table : kind table) ?timeout ?domains name parameters (module Lwt_handlers : HANDLERS diff --git a/src/lib_bees/worker.mli b/src/lib_bees/worker.mli index 847c5c297e24..e7e0ea9918b5 100644 --- a/src/lib_bees/worker.mli +++ b/src/lib_bees/worker.mli @@ -118,7 +118,11 @@ module type T = sig end (** [launch table name parameters handlers] creates an instance of the - worker, of the given queue kind. *) + worker, of the given queue kind. + Requires callers to run inside {!Tezos_base_unix.Event_loop.main_run} + (or after the Event_loop main switch has been initialised), otherwise the + worker initialisation will block while waiting for the main-domain + scheduler. *) val launch : 'kind table -> ?timeout:Time.System.Span.t -> @@ -288,7 +292,10 @@ module type T = sig (** [launch table name parameters handlers] creates a swarm of bees (workers), each running on a separate domain. By default only a single - domain (then worker) is used to handle the requests. *) + domain (then worker) is used to handle the requests. + As with {!launch}, callers must ensure the Event_loop main switch has been + started (typically via {!Tezos_base_unix.Event_loop.main_run}) so that the + worker initialisation scheduled on the main domain can complete. *) val launch_eio : 'kind table -> ?timeout:Time.System.Span.t -> -- GitLab From 6a4b02c94c467c56f9b73e7e72a26255e3bdbb27 Mon Sep 17 00:00:00 2001 From: Mathias Bourgoin Date: Sun, 5 Oct 2025 09:59:13 +0200 Subject: [PATCH 4/9] bees/task_worker: guard worker creation with mutex --- src/lib_bees/task_worker.ml | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/lib_bees/task_worker.ml b/src/lib_bees/task_worker.ml index 2b360ba3d94b..5bc679686e7d 100644 --- a/src/lib_bees/task_worker.ml +++ b/src/lib_bees/task_worker.ml @@ -76,12 +76,12 @@ type 'a message_error = 'a Worker.message_error = | Any of exn (* This is a conservative limit that aims to fit all machines, without - overloading it.*) + overloading it. *) let default_max_domains = max (min (Domain.recommended_domain_count () / 2) 8) 1 let number_of_domains = default_max_domains -let worker = +let make_worker () = let table = Worker.create_table Queue in Eio.Lazy.from_fun ~cancel:`Protect @@ fun () -> match @@ -95,9 +95,29 @@ let worker = | Ok w -> w | Error _ -> assert false +let worker_lock = Eio.Mutex.create () + +let worker = ref (make_worker ()) + +let with_worker_state f = + Eio.Mutex.lock worker_lock ; + Fun.protect ~finally:(fun () -> Eio.Mutex.unlock worker_lock) f + +(* [get_worker] returns a worker instance, creating it if it doesn't exist. + Access is protected by a mutex to prevent race conditions from multiple + domains. *) +let get_worker () = + with_worker_state (fun () -> + try Eio.Lazy.force !worker + with exn -> + (* If forcing fails, reset the lazy so that the next call will try + to recreate the worker. *) + worker := make_worker () ; + raise exn) + let launch_task_and_wait name on_request ?on_completion param = let r = Request.Task {name; on_request; param; on_completion} in - Worker.Queue.push_request_and_wait_eio (Eio.Lazy.force worker) r + Worker.Queue.push_request_and_wait_eio (get_worker ()) r let launch_tasks_and_wait ?(max_fibers = max_int) name func ?on_completion args = @@ -109,4 +129,4 @@ let launch_tasks_and_wait ?(max_fibers = max_int) name func ?on_completion args let launch_task name on_request ?on_completion param = let r = Request.Task {name; on_request; param; on_completion} in - Worker.Queue.push_request_eio (Eio.Lazy.force worker) r + Worker.Queue.push_request_eio (get_worker ()) r -- GitLab From c6894bbad90b56ef9eb3a9a8b6dce770547538d0 Mon Sep 17 00:00:00 2001 From: mbourgoin Date: Mon, 1 Dec 2025 15:37:06 +0100 Subject: [PATCH 5/9] bees/task_worker: retry launch on resource errors --- src/lib_bees/task_worker.ml | 71 ++++++++++++++++++++++++----- src/lib_bees/task_worker_events.ml | 72 ++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 10 deletions(-) create mode 100644 src/lib_bees/task_worker_events.ml diff --git a/src/lib_bees/task_worker.ml b/src/lib_bees/task_worker.ml index 5bc679686e7d..3766b1d52b7a 100644 --- a/src/lib_bees/task_worker.ml +++ b/src/lib_bees/task_worker.ml @@ -43,6 +43,7 @@ module Types = struct end module Worker = Worker.MakeSingle (Name) (Request) (Types) +module Events = Task_worker_events type task_worker = Worker.infinite Worker.queue Worker.t @@ -81,19 +82,69 @@ let default_max_domains = max (min (Domain.recommended_domain_count () / 2) 8) 1 let number_of_domains = default_max_domains +let desired_domains = ref default_max_domains + +let current_domains = ref default_max_domains + +let double_resolution_logged = Atomic.make false + +let log_double_resolution_once msg = + if not (Atomic.get double_resolution_logged) then + if Atomic.compare_and_set double_resolution_logged false true then + Hive.async_lwt @@ fun () -> + Events.( + emit + promise_double_resolution_detected + (msg, Printexc.raw_backtrace_to_string (Printexc.get_callstack 16))) + +let rec is_worker_launch_resource_error = function + | Unix.Unix_error (Unix.ENOMEM, _, _) -> true + | Invalid_argument msg + (* Eio domain launch can sporadically surface an [Invalid_argument] about + an already-resolved promise when domain bootstrapping races; treat that + as a transient resource-style failure that can succeed on retry. *) + when String.starts_with ~prefix:"Can't resolve already-resolved promise" msg + || String.starts_with ~prefix:"Double resolve of a promise" msg -> + log_double_resolution_once msg ; + true + | Eio.Exn.Multiple trace -> + List.exists (fun (exn, _) -> is_worker_launch_resource_error exn) trace + | _ -> false + +(* [launch_worker_with_domains] tries to launch with [domains] and, on transient + resource errors, retries with [domains/2] until it succeeds or reaches 1. *) +let rec launch_worker_with_domains table domains = + match + try + Ok + (Worker.launch_eio + ~domains + table + ~name:"shared task worker" + () + (module Handlers)) + with exn -> Error exn + with + | Ok (Ok w) -> + current_domains := domains ; + if domains <> !desired_domains then + Hive.async_lwt (fun () -> + Events.(emit worker_launch_degraded (!desired_domains, domains))) ; + w + | Ok (Error _) -> assert false + | Error exn -> + Hive.async_lwt (fun () -> + Events.(emit worker_launch_failed (Printexc.to_string exn, domains))) ; + if domains > 1 && is_worker_launch_resource_error exn then ( + let next_domains = max 1 (domains / 2) in + Hive.async_lwt (fun () -> Events.(emit worker_retrying next_domains)) ; + launch_worker_with_domains table next_domains) + else raise exn + let make_worker () = let table = Worker.create_table Queue in Eio.Lazy.from_fun ~cancel:`Protect @@ fun () -> - match - Worker.launch_eio - ~domains:number_of_domains - table - ~name:"shared task worker" - () - (module Handlers) - with - | Ok w -> w - | Error _ -> assert false + launch_worker_with_domains table !desired_domains let worker_lock = Eio.Mutex.create () diff --git a/src/lib_bees/task_worker_events.ml b/src/lib_bees/task_worker_events.ml new file mode 100644 index 000000000000..57b17c9ab99b --- /dev/null +++ b/src/lib_bees/task_worker_events.ml @@ -0,0 +1,72 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* SPDX-FileCopyrightText: 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +include Internal_event.Simple + +let section = ["bees"; "task_worker"] + +let shutdown_enter = + declare_0 + ~section + ~name:"task_worker_shutdown_enter" + ~msg:"task worker shutdown: enter" + ~level:Debug + () + +let shutdown_exit = + declare_0 + ~section + ~name:"task_worker_shutdown_exit" + ~msg:"task worker shutdown: exit" + ~level:Debug + () + +let worker_launch_failed = + declare_2 + ~section + ~name:"task_worker_launch_failed" + ~msg:"worker launch failed with {error} (domains={domains})" + ~level:Error + ("error", Data_encoding.string) + ("domains", Data_encoding.int31) + +let worker_retrying = + declare_1 + ~section + ~name:"task_worker_retrying" + ~msg:"retrying with {domains} domains" + ~level:Info + ("domains", Data_encoding.int31) + +let worker_launch_degraded = + declare_2 + ~section + ~name:"task_worker_launch_degraded" + ~msg: + "worker launch succeeded with reduced capacity: requested {requested} \ + domains, running with {actual} domains" + ~level:Warning + ("requested", Data_encoding.int31) + ("actual", Data_encoding.int31) + +let task_launch_failed = + declare_1 + ~section + ~name:"task_worker_enqueue_failed" + ~msg:"failed to enqueue task: {error}" + ~level:Error + ("error", Data_encoding.string) + +let promise_double_resolution_detected = + declare_2 + ~section + ~name:"task_worker_promise_double_resolution" + ~msg: + "promise double resolution while launching worker: {error}\n{backtrace}" + ~level:Error + ("error", Data_encoding.string) + ("backtrace", Data_encoding.string) -- GitLab From 4351eeba347aef26d2c1f60a307e44d840347e2b Mon Sep 17 00:00:00 2001 From: Mathias Bourgoin Date: Sun, 5 Oct 2025 10:01:03 +0200 Subject: [PATCH 6/9] bees/task_worker: add shutdown helper for tests --- src/lib_bees/task_worker.ml | 15 +++++++++++++++ src/lib_bees/task_worker.mli | 6 ++++++ 2 files changed, 21 insertions(+) diff --git a/src/lib_bees/task_worker.ml b/src/lib_bees/task_worker.ml index 3766b1d52b7a..640a98527b0c 100644 --- a/src/lib_bees/task_worker.ml +++ b/src/lib_bees/task_worker.ml @@ -166,6 +166,21 @@ let get_worker () = worker := make_worker () ; raise exn) +let shutdown () = + with_worker_state (fun () -> + try + let w = Eio.Lazy.force !worker in + Hive.async_lwt (fun () -> Events.(emit shutdown_enter ())) ; + Fun.protect + ~finally:(fun () -> + Hive.async_lwt (fun () -> Events.(emit shutdown_exit ()))) + (fun () -> + Worker.shutdown_eio w ; + worker := make_worker ()) + with _ -> + (* If forcing fails or wasn't built yet, just reset the lazy. *) + worker := make_worker ()) + let launch_task_and_wait name on_request ?on_completion param = let r = Request.Task {name; on_request; param; on_completion} in Worker.Queue.push_request_and_wait_eio (get_worker ()) r diff --git a/src/lib_bees/task_worker.mli b/src/lib_bees/task_worker.mli index 41dec3df7c5d..f1800158dcac 100644 --- a/src/lib_bees/task_worker.mli +++ b/src/lib_bees/task_worker.mli @@ -53,3 +53,9 @@ val launch_tasks_and_wait : request is completed *) val launch_task : string -> ('a -> 'b) -> ?on_completion:('b -> unit) -> 'a -> bool + +(** [shutdown ()] cleanly shuts down the task worker and resets internal state. + This function is idempotent and safe to call multiple times. It is primarily + intended for test cleanup between iterations. The worker will be recreated + lazily on the next task submission. *) +val shutdown : unit -> unit -- GitLab From 050aa55054772a76a3e5b93906c09b92733fb928 Mon Sep 17 00:00:00 2001 From: Mathias Bourgoin Date: Thu, 20 Nov 2025 00:02:00 +0100 Subject: [PATCH 7/9] bees/test: add contention and launch-race regressions --- src/lib_bees/test/test_bees_unit_eio.ml | 125 +++++++++++++++++++++++- 1 file changed, 123 insertions(+), 2 deletions(-) diff --git a/src/lib_bees/test/test_bees_unit_eio.ml b/src/lib_bees/test/test_bees_unit_eio.ml index 8697edd1c797..8f325579b8fe 100644 --- a/src/lib_bees/test/test_bees_unit_eio.ml +++ b/src/lib_bees/test/test_bees_unit_eio.ml @@ -326,9 +326,117 @@ let wrap_qcheck test () = let _ = QCheck_alcotest.to_alcotest test in Result_syntax.return_unit +let test_worker_contention () = + let open Result_syntax in + (* Scenario: hammer the shared task worker from many fibers at once to ensure + contention doesn't break task execution or worker liveness. *) + let parallel_fibers = + min 16 (max 4 (Tezos_bees.Task_worker.number_of_domains * 2)) + in + let run_job payload = + Tezos_bees.Task_worker.launch_task_and_wait + "contention" + (fun id -> + for _ = 1 to 200 do + ignore (Sys.opaque_identity ()) + done ; + id) + payload + |> Eio.Promise.await + in + let total_jobs = parallel_fibers * 4 in + let inputs = + let rec aux acc i = + if i = total_jobs then List.rev acc else aux (i :: acc) (i + 1) + in + aux [] 0 + in + Eio.Fiber.List.iter + ~max_fibers:parallel_fibers + (fun payload -> + match run_job payload with + | Ok _ -> () + | Error (Closed _) -> + Alcotest.fail "worker unexpectedly closed while handling contention" + | Error (Request_error _) -> + Alcotest.fail "worker request error under contention" + | Error (Any exn) -> raise exn) + inputs ; + return_unit + +let test_worker_launch_race () = + let open Result_syntax in + (* Scenario: spin up multiple domains racing to launch tasks at startup, + exercising worker creation path under simultaneous requests. *) + let describe_message_error : _ Tezos_bees.Task_worker.message_error -> string + = function + | Closed _ -> "Closed" + | Request_error _ -> "Request_error" + | Any exn -> Format.asprintf "Any(%s)" (Printexc.to_string exn) + in + let fail_on_error err = + let msg = describe_message_error err in + Alcotest.failf "task worker failure: %s" msg + in + let env = Tezos_base_unix.Event_loop.env_exn () in + let concurrent_domains = + max 4 (Tezos_bees.Task_worker.number_of_domains * 4) + in + let requests_per_domain = 6 in + let iterations = 5 in + let run_job () = + match + Tezos_bees.Task_worker.launch_task_and_wait + "task-worker-race" + (fun () -> ()) + () + |> Eio.Promise.await + with + | Ok () -> () + | Error err -> fail_on_error err + in + let wait_for readiness target = + let rec loop () = + if Atomic.get readiness < target then ( + Eio.Time.sleep env#clock 0.00005 ; + loop ()) + in + loop () + in + let run_iteration iteration = + let readiness = Atomic.make 0 in + let start_wait, start_signal = Eio.Promise.create () in + let main_switch = Tezos_base_unix.Event_loop.main_switch_exn () in + let domain_tasks = + Array.init concurrent_domains (fun _domain_id -> + Eio.Fiber.fork_promise ~sw:main_switch (fun () -> + Eio.Domain_manager.run env#domain_mgr (fun () -> + ignore (Atomic.fetch_and_add readiness 1) ; + Eio.Promise.await start_wait ; + for _ = 1 to requests_per_domain do + run_job () + done))) + in + wait_for readiness concurrent_domains ; + Eio.Promise.resolve start_signal () ; + Array.iter + (fun promise -> + match Eio.Promise.await promise with + | Ok () -> () + | Error exn -> raise exn) + domain_tasks ; + Format.printf "[race] iteration %d/%d complete@\n%!" iteration iterations + in + Fun.protect ~finally:Tezos_bees.Task_worker.shutdown (fun () -> + for iteration = 1 to iterations do + run_iteration iteration + done) ; + return_unit + (** Tests run in fresh processes via [Alcotezt_process] so we do not fork after Eio/domains have been initialised within the current Tezt worker. *) -let tztest label fn = Alcotezt_process.test_case label `Quick fn +let tztest ?timeout label fn = + Alcotezt_process.test_case ?timeout label `Quick fn let tests_history = ( "Queue history", @@ -351,8 +459,21 @@ let tests_status = let tests_buffer = ("Buffer handling", [tztest "Dropbox/Async (eio handlers)" test_async_dropbox]) +let tests_contention = + ( "Task worker contention", + [ + tztest + ~timeout:30. + "Worker creation under contention (eio handlers)" + test_worker_contention; + tztest + ~timeout:30. + "Worker launch race across domains (eio handlers)" + test_worker_launch_race; + ] ) + let () = Alcotezt_process.run ~__FILE__ "Bees_workers (eio handlers)" - [tests_history; tests_status; tests_buffer] + [tests_history; tests_status; tests_buffer; tests_contention] -- GitLab From ce2b62eb8cf31c10d1543d2b5dcc5523c4958f8e Mon Sep 17 00:00:00 2001 From: Mathias Bourgoin Date: Fri, 21 Nov 2025 09:04:04 +0100 Subject: [PATCH 8/9] bees/test: enable task worker Alcotezt suite --- src/lib_bees/test/test_bees_task_worker.ml | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/lib_bees/test/test_bees_task_worker.ml b/src/lib_bees/test/test_bees_task_worker.ml index d35da14ee015..63a6840b32a6 100644 --- a/src/lib_bees/test/test_bees_task_worker.ml +++ b/src/lib_bees/test/test_bees_task_worker.ml @@ -81,10 +81,8 @@ let tests_on_completion_callback = in ("Run on_completion callback", [test]) -(* FIXME: https://gitlab.com/tezos/tezos/-/issues/7938 *) -(* let () = *) -(* Alcotest_lwt.run *) -(* ~__FILE__ *) -(* "Task worker" *) -(* [tests_fibonacci; tests_reuse; tests_on_completion_callback] *) -(* |> Lwt_main.run *) +let () = + Alcotezt_process.run + ~__FILE__ + "Task worker" + [tests_fibonacci; tests_reuse; tests_on_completion_callback] -- GitLab From 66dff970aff6be9a0a875fa43b9a23536a949b31 Mon Sep 17 00:00:00 2001 From: Mathias Bourgoin Date: Sun, 5 Oct 2025 10:01:33 +0200 Subject: [PATCH 9/9] changelog: add note on lib_bees thread safety --- CHANGES.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 89f75ac6dd9e..ec78f0f06a12 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -25,6 +25,13 @@ be documented here either. General ------- +- Hardened ``lib_bees`` worker lifecycle for OCaml 5.x Eio domains: switched + the worker registry to Saturn lock-free tables, serialized worker creation + with mutex protection, added retries under resource pressure, ensured worker + launch/initialization runs on the Event_loop main switch, and exposed clean + shutdown for tests. Improves reliability under domain contention and low + resources. (MR :gl:`!19990`) + Node ---- -- GitLab