From 1b3ab87481391efb042535ac56da61df177eac71 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Thu, 25 Sep 2025 14:40:50 +0200 Subject: [PATCH 1/2] DAL: process shard batches in parallel --- src/lib_dal_node/message_validation.ml | 61 ++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 8 deletions(-) diff --git a/src/lib_dal_node/message_validation.ml b/src/lib_dal_node/message_validation.ml index 3a2d14df099f..b6bf574bf634 100644 --- a/src/lib_dal_node/message_validation.ml +++ b/src/lib_dal_node/message_validation.ml @@ -299,14 +299,17 @@ let batches_stats_tbl = Batches_stats.create 5 let last_stat_level = ref None -let update_batches_stats number_of_slots cryptobox batch_size head_level batch_l - duration = +let update_batches_stats number_of_slots cryptobox batch_size head_level + batch_ll duration = let {Cryptobox.number_of_shards; _} = Cryptobox.parameters cryptobox in let shard_distribution = Array.init number_of_slots (fun _ -> 0) in List.iter - (fun ({slot_index; _}, elems) -> - shard_distribution.(slot_index) <- List.length elems) - batch_l ; + (fun l -> + List.iter + (fun ({slot_index; _}, elems) -> + shard_distribution.(slot_index) <- List.length elems) + l) + batch_ll ; let shard_percentage = float_of_int (Array.fold_left ( + ) 0 shard_distribution) /. (float_of_int number_of_shards *. float_of_int number_of_slots) @@ -371,6 +374,22 @@ let may_finalize_batch_stats last_stat_level head_level = last_stat_level := Some head_level else () +(* Utility function that splits the given [seq] into a list of [n] lists in a + round robin fashion, all sub-lists having roughly the same number of + elements. *) +let round_robin_seq n seq = + let rec loop acc_full acc seq = + match seq () with + | Seq.Nil -> acc @ acc_full + | Seq.Cons (hd, tl) -> ( + match acc with + | [] -> loop [] acc_full seq + | acc_hd :: acc_tl -> loop ((hd :: acc_hd) :: acc_full) acc_tl tl) + in + loop [] (Stdlib.List.init n (fun _ -> [])) seq + +exception Bee_task_worker_error of string + let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch = if Node_context.is_bootstrap_node ctxt then @@ -499,16 +518,42 @@ let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch :: ({level; slot_index}, second_half) :: remaining_to_treat)) in - let batch_l = Batch_tbl.to_seq to_check_in_batch |> List.of_seq in + let domains = Tezos_bees.Task_worker.number_of_domains in let s = Unix.gettimeofday () in - treat_batch batch_l ; + (* Split the batches into [domains] sub-batches to feed each bee workers. + The load's distribution is likely to be slightly unbalanced. *) + let sub_batches = + round_robin_seq domains (Batch_tbl.to_seq to_check_in_batch) + in + let results = + Tezos_bees.Task_worker.launch_tasks_and_wait + "batch" + treat_batch + sub_batches + in + (* Ensure that all tasks are successful. *) + let () = + List.iter + (function + | Ok () -> () + | Error (Tezos_bees.Task_worker.Closed (Some err)) -> + raise + (Bee_task_worker_error + (Format.asprintf "%a" Error_monad.pp_print_trace err)) + | Error (Tezos_bees.Task_worker.Closed None) + | Error (Tezos_bees.Task_worker.Request_error _) -> + raise (Bee_task_worker_error "Unknown task_worker error") + | Error (Tezos_bees.Task_worker.Any exn) -> + raise (Bee_task_worker_error (Printexc.to_string exn))) + results + in let duration = Unix.gettimeofday () -. s in update_batches_stats proto_parameters.number_of_slots cryptobox batch_size head_level - batch_l + sub_batches duration ; let () = may_finalize_batch_stats last_stat_level head_level in Array.to_list result -- GitLab From c084464b739a60b7e395167508e7c0d75bfb5930 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Tue, 25 Nov 2025 14:56:29 +0100 Subject: [PATCH 2/2] DAL/node: properly wrap Lwt calls within Eio's scope --- src/lib_dal_node/event.ml | 14 ++++---------- src/lib_dal_node/message_validation.ml | 23 ++++++++++++++--------- src/lib_dal_node/slot_manager.ml | 18 ++++++++++-------- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/src/lib_dal_node/event.ml b/src/lib_dal_node/event.ml index 043fd74f4569..80e54021c0a7 100644 --- a/src/lib_dal_node/event.ml +++ b/src/lib_dal_node/event.ml @@ -1443,11 +1443,8 @@ let emit_dont_wait__message_validation_error ~message_id ~validation_error = message_validation_error (message_id, validation_error) -let emit_dont_wait__batch_validation_error ~level ~slot_index ~validation_error - = - emit__dont_wait__use_with_care - batch_validation_error - (level, slot_index, validation_error) +let emit_batch_validation_error ~level ~slot_index ~validation_error = + emit batch_validation_error (level, slot_index, validation_error) let emit_dont_wait__batch_validation_stats ~batch_id ~head_level ~number_of_shards ~shard_percentage ~duration = @@ -1605,11 +1602,8 @@ let emit_cannot_attest_slot_because_of_trap ~pkh ~published_level ~slot_index cannot_attest_slot_because_of_trap (pkh, published_level, slot_index, shard_index) -let emit_dont_wait__register_trap ~delegate ~published_level ~slot_index - ~shard_index = - emit__dont_wait__use_with_care - register_trap - (delegate, published_level, slot_index, shard_index) +let emit_register_trap ~delegate ~published_level ~slot_index ~shard_index = + emit register_trap (delegate, published_level, slot_index, shard_index) let emit_start_catchup ~start_level ~end_level ~levels_to_clean_up = emit start_catchup (start_level, end_level, levels_to_clean_up) diff --git a/src/lib_dal_node/message_validation.ml b/src/lib_dal_node/message_validation.ml index b6bf574bf634..d9be35cf119a 100644 --- a/src/lib_dal_node/message_validation.ml +++ b/src/lib_dal_node/message_validation.ml @@ -412,7 +412,10 @@ let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch (* [treat_batch] does not invalidate a whole slot when a single shard is invalid, otherwise it would be possible for a byzantine actor to craft a wrong message with the id of the published slot to prevent the validation - of all the valid shards associated to this slot. *) + of all the valid shards associated to this slot. + Important: this function is in the scope of Eio runtime and prohibits the + usage of Lwt. Lwt calls must be wrapped in a proper way. See + https://gitlab.com/tezos/tezos/-/issues/8140. *) let rec treat_batch = function | [] -> () | ({level; slot_index}, batch_list) :: remaining_to_treat -> ( @@ -475,10 +478,11 @@ let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch treat_batch remaining_to_treat | Error err -> let validation_error = string_of_validation_error err in - Event.emit_dont_wait__batch_validation_error - ~level - ~slot_index - ~validation_error ; + Tezos_bees.Hive.async_lwt (fun () -> + Event.emit_batch_validation_error + ~level + ~slot_index + ~validation_error) ; let batch_size = List.length batch_list in if batch_size = 1 then List.iter @@ -500,10 +504,11 @@ let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch | exception exn -> (* Don't crash if crypto raised an exception. *) let validation_error = Printexc.to_string exn in - Event.emit_dont_wait__batch_validation_error - ~level - ~slot_index - ~validation_error ; + Tezos_bees.Hive.async_lwt (fun () -> + Event.emit_batch_validation_error + ~level + ~slot_index + ~validation_error) ; let batch_size = List.length batch_list in if batch_size = 1 then List.iter diff --git a/src/lib_dal_node/slot_manager.ml b/src/lib_dal_node/slot_manager.ml index a4eee6368236..66f914dee25a 100644 --- a/src/lib_dal_node/slot_manager.ml +++ b/src/lib_dal_node/slot_manager.ml @@ -557,6 +557,9 @@ let get_slot_content ~reconstruct_if_missing ctxt slot_id = (* Main functions *) +(* Important: this function is in the scope of Eio runtime and prohibits the + usage of Lwt. Lwt calls must be wrapped in a proper way. See + https://gitlab.com/tezos/tezos/-/issues/8140. *) let maybe_register_trap traps_store ~traps_fraction message_id message = let delegate = message_id.Types.Message_id.pkh in let Types.Message.{share; shard_proof} = message in @@ -566,11 +569,12 @@ let maybe_register_trap traps_store ~traps_fraction message_id message = | Ok true -> let slot_id = Types.Slot_id.{slot_index; slot_level = level} in let () = - Event.emit_dont_wait__register_trap - ~delegate - ~published_level:slot_id.slot_level - ~slot_index:slot_id.slot_index - ~shard_index + Tezos_bees.Hive.async_lwt (fun () -> + Event.emit_register_trap + ~delegate + ~published_level:slot_id.slot_level + ~slot_index:slot_id.slot_index + ~shard_index) in Store.Traps.add traps_store @@ -581,14 +585,12 @@ let maybe_register_trap traps_store ~traps_fraction message_id message = ~shard_proof | Ok false -> () | Error _ -> - Lwt.dont_wait - (fun () -> + Tezos_bees.Hive.async_lwt (fun () -> Event.emit_trap_check_failure ~delegate ~published_level:level ~slot_index ~shard_index) - (fun exc -> raise exc) let add_commitment_shards ~shards_proofs_precomputation node_store cryptobox commitment slot polynomial = -- GitLab