From 323d2ec93432cc4527cf1ea8b8d2b5b080b59d2f Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Wed, 24 Sep 2025 08:44:04 +0200 Subject: [PATCH 1/4] Manifest/DAL: add lib_bees dep --- manifest/product_octez.ml | 1 + src/lib_dal_node/dune | 1 + 2 files changed, 2 insertions(+) diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index 00be6ea363a6..f4291516eab8 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -5399,6 +5399,7 @@ let octez_dal_node_lib = bls12_381_archive; octez_base |> open_ ~m:"TzPervasives"; octez_base_unix; + octez_bees; octez_dal_node_services |> open_; octez_dal_node_migrations; octez_protocol_updater |> open_; diff --git a/src/lib_dal_node/dune b/src/lib_dal_node/dune index 93280a3e91c8..e54e002919b6 100644 --- a/src/lib_dal_node/dune +++ b/src/lib_dal_node/dune @@ -9,6 +9,7 @@ bls12-381.archive octez-libs.base octez-libs.base.unix + octez-libs.tezos-bees tezos-dal-node-services dal_node_migrations octez-shell-libs.protocol-updater -- GitLab From 22ceb6bf8f24d75cab96d1477d2f61ad5e352b4a Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Wed, 15 Oct 2025 13:43:04 +0200 Subject: [PATCH 2/4] DAL/Amplificator: Eio-ify amplificator --- src/lib_dal_node/amplificator.ml | 326 +++++++++++----------------- src/lib_dal_node/cli.ml | 4 +- src/lib_dal_node/event.ml | 8 +- src/lib_dal_node/process_worker.ml | 98 ++------- src/lib_dal_node/process_worker.mli | 78 +++---- 5 files changed, 181 insertions(+), 333 deletions(-) diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index 9847cc5914be..ab60ff00002e 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -1,7 +1,7 @@ (*****************************************************************************) (* *) (* SPDX-License-Identifier: MIT *) -(* SPDX-FileCopyrightText: 2024 Nomadic Labs *) +(* SPDX-FileCopyrightText: 2025 Nomadic Labs *) (* *) (*****************************************************************************) @@ -145,29 +145,23 @@ let proved_shards_encoding = (req "proofs" shards_proofs_encoding) (* This module contains the logic for a shard-proving process worker running in - background. It communicates with the main DAL process via pipe ipc. *) + background. It communicates with the main DAL process via Eio streams. + All the functions within this modules are expected to be run within the + [Process_worker] runtime, that is a Eio dedicated domain. Thus, Lwt code + needs to be properly wrapped with the adequate [Lwt_eio] wrapper. + *) module Reconstruction_process_worker : sig val reconstruct_process_worker : - Lwt_io.input Lwt_io.channel -> - Lwt_io.output Lwt_io.channel -> + Bytes.t Eio.Stream.t -> + Bytes.t Eio.Stream.t -> Cryptobox.t * Cryptobox.shards_proofs_precomputation -> - (unit, error trace) result Lwt.t + unit end = struct - let welcome_handshake ic = - let open Lwt_result_syntax in - let* () = - let* r = Process_worker.read_message ic in - match r with - | `End_of_file -> - fail - [ - Reconstruction_process_worker_error - "Invalid initialization message"; - ] - | `Message b when Bytes.equal b welcome -> return_unit - | `Message b -> fail_with_exn (Invalid_argument (String.of_bytes b)) - in - return_unit + let welcome_handshake input = + let r = Process_worker.read_message input in + match r with + | b when Bytes.equal b welcome -> () + | b -> raise (Invalid_argument (String.of_bytes b)) let reconstruct cryptobox precomputation shards = let open Result_syntax in @@ -186,217 +180,170 @@ end = struct return proved_shards_encoded (* Utility function to reply on successfully validated shards. *) - let reply_success ~oc ~query_id ~proved_shards_encoded = - let open Lwt_syntax in - (* Sends back the proved_shards_encoded to the main dal process *) - let* () = Event.emit_crypto_process_sending_reply ~query_id in - let* () = Lwt_io.write_int oc query_id in - let* _ = Process_worker.write_message oc (Bytes.of_string "OK") in - let* _ = Process_worker.write_message oc proved_shards_encoded in - return_unit + let reply_success ~output ~query_id ~proved_shards_encoded = + (* Sends back the proved_shards_encoded to the main DAL process. *) + let () = + Tezos_bees.Hive.async_lwt (fun () -> + Event.emit_crypto_process_sending_reply ~query_id) + in + let () = Eio.Stream.add output (Bytes.of_string (Int.to_string query_id)) in + let () = Process_worker.write_message output (Bytes.of_string "OK") in + let () = Process_worker.write_message output proved_shards_encoded in + () (* Utility function to reply on unsuccessful validated shard. *) - let reply_error_query ~oc ~query_id ~error = - let open Lwt_syntax in - (* Sends back the proved_shards_encoded to the main dal process *) - let* () = Event.emit_crypto_process_sending_reply_error ~query_id in - let* () = Lwt_io.write_int oc query_id in - let* _ = Process_worker.write_message oc (Bytes.of_string "ERR") in + let reply_error_query ~output ~query_id ~error = + (* Sends back the proved_shards_encoded to the main DAL process. *) + let () = + Tezos_bees.Hive.async_lwt (fun () -> + Event.emit_crypto_process_sending_reply_error ~query_id) + in + let () = Eio.Stream.add output (Bytes.of_string (Int.to_string query_id)) in + let () = Process_worker.write_message output (Bytes.of_string "ERR") in let bytes = Bytes.of_string error in - let* _ = Process_worker.write_message oc bytes in - return_unit + let () = Process_worker.write_message output bytes in + () (* Utility function to trigger a shard reconstruction. *) - let process_query query_id ic cryptobox shards_proofs_precomputation = - let open Lwt_result_syntax in + let process_query ~query_id ~input cryptobox shards_proofs_precomputation = + let open Result_syntax in (* Read query from main dal process *) - let* bytes_shards = - let* r = Process_worker.read_message ic in - match r with - | `End_of_file -> - fail - [ - Reconstruction_process_worker_error - "Incomplete message received. Terminating."; - ] - | `Message msg -> return msg - in + let bytes_shards = Process_worker.read_message input in let shards = Data_encoding.( Binary.of_bytes_exn (list Cryptobox.shard_encoding) bytes_shards) |> List.to_seq in - let*! () = Event.emit_crypto_process_received_query ~query_id in - (* Crypto computation *) + let () = + Tezos_bees.Hive.async_lwt (fun () -> + Event.emit_crypto_process_received_query ~query_id) + in + (* crypto computation *) let* proved_shards_encoded = reconstruct cryptobox shards_proofs_precomputation shards - |> Lwt.return |> Errors.to_tzresult in return proved_shards_encoded (* The main function that is run in the [Process_worker.t]. + [input] is the stream on which the apmlificator will read requests from the + DAL node. [output] is the stream on which the amplificator will answer to + the DAL node. Initialization phase: receive init message: proto_parameters Running phase: loop - - receive shards - - calculate all shards and their proofs - - sent shards and proofs *) - let reconstruct_process_worker ic oc (cryptobox, shards_proofs_precomputation) + - receive shards, + - calculate all shards and their proofs, + - sent shards and proofs. + *) + let reconstruct_process_worker (input : Bytes.t Eio.Stream.t) + (output : Bytes.t Eio.Stream.t) (cryptobox, shards_proofs_precomputation) = - let open Lwt_result_syntax in (* Read init message from parent with parameters required to initialize cryptobox *) - let* () = welcome_handshake ic in - let*! () = Event.emit_crypto_process_started ~pid:(Unix.getpid ()) in + let () = welcome_handshake input in + let () = + Tezos_bees.Hive.async_lwt (fun () -> Event.emit_crypto_process_started ()) + in let rec loop () = - Lwt.catch - (fun () -> - (* Note: this read_int is very unlikely to appear unless obvious issue *) - let*! query_id = Lwt_io.read_int ic in - let* () = - Lwt.catch - (fun () -> - let*! r = - process_query - query_id - ic - cryptobox - shards_proofs_precomputation - in - match r with - | Ok proved_shards_encoded -> - let*! () = - reply_success ~query_id ~proved_shards_encoded ~oc - in - loop () - | Error err -> - let err = - Format.asprintf "%a" Error_monad.pp_print_trace err - in - (* send a reply with the error, and continue *) - let*! () = reply_error_query ~oc ~query_id ~error:err in - loop ()) - (function - | End_of_file -> raise End_of_file - | exn -> - let error = Printexc.to_string exn in - let*! () = reply_error_query ~oc ~query_id ~error in - return_unit) - in - return_unit) - (function - | Lwt.Canceled - (* if terminated by direct signal (normal termination) *) - | End_of_file -> - (* Buffer was closed by the parent (normal termination) *) - let*! () = Event.emit_crypto_process_stopped () in - return_unit - | exn -> - (* Lwt_io.read_int could fail, in this case, there is an severe - issue: pipe issue between the processes, memory full ? - In this case, we give up *) - let msg = Printexc.to_string exn in - let*! () = Event.emit_crypto_process_error ~msg in - fail [error_of_exn exn]) + let query_id = + Eio.Stream.take input |> Bytes.to_string |> int_of_string + in + try + let r = + process_query ~query_id ~input cryptobox shards_proofs_precomputation + in + match r with + | Ok proved_shards_encoded -> + let () = reply_success ~query_id ~proved_shards_encoded ~output in + loop () + | Error err -> + let (`Other err) = err in + let error = Format.asprintf "%a" Error_monad.pp_print_trace err in + (* send a reply with the error, and continue *) + let () = reply_error_query ~output ~query_id ~error in + loop () + with exn -> + let error = Printexc.to_string exn in + let () = + Tezos_bees.Hive.async_lwt (fun () -> + Event.emit_crypto_process_error ~msg:error) + in + let () = reply_error_query ~output ~query_id ~error in + raise exn in loop () end (* Serialize queries to crypto worker process while the query queue is not - * empty. Wait until the query queue contains - * anything *) + empty. Wait until the query queue contains anything. This function aims to be + run by the main DAL node process. *) let query_sender_job {query_pipe; process; _} = let open Lwt_result_syntax in + let process_input = Process_worker.input_channel process in let rec loop () = let*! (Query_msg {query_id; shards; _}) = Lwt_pipe.Unbounded.pop query_pipe in - let oc = Process_worker.output_channel process in let*! () = Event.emit_main_process_sending_query ~query_id in (* Serialization: query_id, then shards *) - let*! () = Lwt_io.write_int oc query_id in - let* () = - let* r = Process_worker.write_message oc shards in - match r with - | `End_of_file -> - fail - [ - Reconstruction_process_worker_error - "Incomplete message. Terminating"; - ] - | `Write_ok -> return_unit + let*! () = + Lwt_eio.run_eio (fun () -> + Eio.Stream.add + process_input + (Bytes.of_string (Int.to_string query_id))) + in + let*! () = + Lwt_eio.run_eio (fun () -> + Process_worker.write_message process_input shards) in loop () in - Lwt.catch loop (function - (* Buffer was closed by the parent before proceeding an entire message - (sigterm, etc.) *) - | End_of_file -> return_unit - (* Unknown exception *) - | exn -> - let err = [error_of_exn exn] in - Lwt.return (Error err)) + Lwt.catch loop (function exn -> + (* Unknown exception *) + let err = [error_of_exn exn] in + Lwt.return (Error err)) +(* This job aims to read the responses (completed jobs) sent by the amplificator + process. This function aims to be run by the main DAL node process. *) let reply_receiver_job {process; query_store; _} node_context = let open Lwt_result_syntax in - let ic = Process_worker.input_channel process in + let process_output = Process_worker.output_channel process in let gs_worker = Node_context.get_gs_worker node_context in let node_store = Node_context.get_store node_context in let rec loop () = - let*! id = Lwt_io.read_int ic in + let*! query_id = + Lwt_eio.run_eio (fun () -> + Eio.Stream.take process_output |> Bytes.to_string |> int_of_string) + in let (Query {slot_id; commitment; proto_parameters; reconstruction_start_time}) = - Query_store.find query_store id + Query_store.find query_store query_id in (* Messages queue is unbounded *) - Query_store.remove query_store id ; + Query_store.remove query_store query_id ; let length = Query_store.length query_store in let () = Dal_metrics.update_amplification_queue_length length in (* The first message should be a OK | ERR *) - let* msg = - let* r = Process_worker.read_message ic in - match r with - | `End_of_file -> - fail - [ - Amplification_reply_receiver_job - "Incomplete message received. Terminating"; - ] - | `Message msg -> return msg + let*! msg = + Lwt_eio.run_eio (fun () -> Process_worker.read_message process_output) in match Bytes.to_string msg with | "ERR" -> - let* msg = - let* r = Process_worker.read_message ic in - match r with - | `End_of_file -> - fail - [ - Amplification_reply_receiver_job - "Incomplete message received. Terminating"; - ] - | `Message msg -> return msg + let*! msg = + Lwt_eio.run_eio (fun () -> Process_worker.read_message process_output) in (* The error message *) let msg = Bytes.to_string msg in let*! () = - Event.emit_main_process_received_reply_error ~query_id:id ~msg + Event.emit_main_process_received_reply_error ~query_id ~msg in loop () | "OK" -> - let* msg = - let* r = Process_worker.read_message ic in - match r with - | `End_of_file -> - fail - [ - Amplification_reply_receiver_job - "Incomplete message received. Terminating"; - ] - | `Message msg -> return msg + let*! msg = + Lwt_eio.run_eio (fun () -> Process_worker.read_message process_output) in - let*! () = Event.emit_main_process_received_reply ~query_id:id in + let*! () = Event.emit_main_process_received_reply ~query_id in let shards, shard_proofs = Data_encoding.Binary.of_bytes_exn proved_shards_encoding msg in @@ -438,14 +385,11 @@ let reply_receiver_job {process; query_store; _} node_context = let*! () = Event.emit_crypto_process_error ~msg:"Unexpected message" in fail [Amplification_reply_receiver_job "Unexpected message"] in - Lwt.catch loop (function - (* Buffer was closed before proceeding an entire message (sigterm, etc.) *) - | End_of_file -> return_unit - (* Unknown exception *) - | exn -> - let err = [error_of_exn exn] in - let*! () = Event.emit_crypto_process_fatal ~msg:"Unexpected error" in - Lwt.return (Error err)) + Lwt.catch loop (function exn -> + (* Unknown exception *) + let err = [error_of_exn exn] in + let*! () = Event.emit_crypto_process_fatal ~msg:"Unexpected error" in + Lwt.return (Error err)) let determine_amplification_delays node_ctxt = let open Result_syntax in @@ -483,11 +427,13 @@ let start_amplificator node_ctxt = | None -> fail [Errors.Amplificator_initialization_failed] | Some v -> return v in - (* Fork a process to offload cryptographic calculations *) - let* process = - Process_worker.run - Reconstruction_process_worker.reconstruct_process_worker - (cryptobox, shards_proofs_precomputation) + (* Offload cryptographic calculations into a dedicated Eio domain using the + [Process_worker]. *) + let*! process = + Lwt_eio.run_eio (fun () -> + Process_worker.run + Reconstruction_process_worker.reconstruct_process_worker + (cryptobox, shards_proofs_precomputation)) in let query_pipe = Lwt_pipe.Unbounded.create () in let query_store = Query_store.create 23 in @@ -504,27 +450,11 @@ let start_amplificator node_ctxt = amplification_random_delay_max; } in - let (_ : Lwt_exit.clean_up_callback_id) = - Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _exit_code -> - let pid = Process_worker.pid process in - Unix.kill pid Sys.sigterm ; - let _exit = Lwt_unix.waitpid [Lwt_unix.WNOHANG] pid in - Lwt.return_unit) - in return amplificator let welcome_handshake amplificator = - let open Lwt_result_syntax in - let oc = Process_worker.output_channel amplificator.process in - let* r = Process_worker.write_message oc welcome in - match r with - | `End_of_file -> - fail - [ - Reconstruction_process_worker_error - "Impossible to write init message. Terminating."; - ] - | `Write_ok -> return_unit + let ic = Process_worker.input_channel amplificator.process in + Lwt_eio.run_eio (fun () -> Process_worker.write_message ic welcome) let make node_ctxt = let open Lwt_result_syntax in @@ -553,7 +483,7 @@ let make node_ctxt = let () = Lwt.cancel amplificator_reply_receiver_job in Lwt.return_unit) in - let* () = welcome_handshake amplificator in + let*! () = welcome_handshake amplificator in return amplificator let enqueue_job_shards_proof amplificator commitment slot_id proto_parameters diff --git a/src/lib_dal_node/cli.ml b/src/lib_dal_node/cli.ml index a74ac6442829..0ad7cd8984d2 100644 --- a/src/lib_dal_node/cli.ml +++ b/src/lib_dal_node/cli.ml @@ -1000,8 +1000,8 @@ let run subcommand cli_options = let main_run subcommand cli_options = Lwt.Exception_filter.(set handle_all_except_runtime) ; - Tezos_base_unix.Event_loop.main_run ~process_name:"dal node" @@ fun () -> - wrap_with_error @@ run subcommand cli_options + Tezos_base_unix.Event_loop.main_run ~eio:true ~process_name:"dal node" + @@ fun () -> wrap_with_error @@ run subcommand cli_options let commands = let run subcommand data_dir config_file rpc_addr expected_pow listen_addr diff --git a/src/lib_dal_node/event.ml b/src/lib_dal_node/event.ml index 3ed0128bf97a..d117e07aae5c 100644 --- a/src/lib_dal_node/event.ml +++ b/src/lib_dal_node/event.ml @@ -770,13 +770,13 @@ open struct () let crypto_process_started = - declare_1 + declare_0 ~section:(section @ ["crypto"]) ~prefix_name_with_section:true ~name:"crypto_process_started" - ~msg:"cryptographic child process started (pid: {pid})" + ~msg:"cryptographic child process started" ~level:Notice - ("pid", Data_encoding.int31) + () let crypto_process_stopped = declare_0 @@ -1490,7 +1490,7 @@ let emit_store_upgraded ~old_version ~new_version = let emit_store_upgrade_error () = emit store_upgrade_error () -let emit_crypto_process_started ~pid = emit crypto_process_started pid +let emit_crypto_process_started () = emit crypto_process_started () let emit_crypto_process_stopped () = emit crypto_process_stopped () diff --git a/src/lib_dal_node/process_worker.ml b/src/lib_dal_node/process_worker.ml index 12151750c609..42eb6b6bdc4d 100644 --- a/src/lib_dal_node/process_worker.ml +++ b/src/lib_dal_node/process_worker.ml @@ -1,93 +1,31 @@ (*****************************************************************************) (* *) (* SPDX-License-Identifier: MIT *) -(* SPDX-FileCopyrightText: 2024 Nomadic Labs *) +(* SPDX-FileCopyrightText: 2025 Nomadic Labs *) (* *) (*****************************************************************************) -type t = {pid : int; ic : Lwt_io.input_channel; oc : Lwt_io.output_channel} +type t = {input : Bytes.t Eio.Stream.t; output : Bytes.t Eio.Stream.t} -let input_channel t = t.ic +let input_channel t = t.input -let output_channel t = t.oc +let output_channel t = t.output -let pid t = t.pid +let read_message ic = Eio.Stream.take ic -(** Communication error between the main and forked process *) -type error += Process_worker_ipc_error of string +let write_message oc (msg : Bytes.t) = Eio.Stream.add oc msg -let () = - register_error_kind - `Permanent - ~id:"dal.node.process_worker.ipc_error" - ~title:"IPC error between main process and worker process" - ~description:"IPC error between main process and worker process" - ~pp:(fun ppf msg -> - Format.fprintf - ppf - "IPC error between main process and worker process: %s" - msg) - Data_encoding.(obj1 (req "error" string)) - (function Process_worker_ipc_error str -> Some str | _ -> None) - (fun str -> Process_worker_ipc_error str) - -let read_message ic = - let open Lwt_result_syntax in - Lwt.catch - (fun () -> - let*! len = Lwt_io.read_int ic in - let msg = Bytes.create len in - let*! () = Lwt_io.read_into_exactly ic msg 0 len in - return (`Message msg)) - (function - (* On incomplete message, we let the caller decide how to handle the error, - either retrying, ignoring, etc.*) - | End_of_file -> return `End_of_file - (* Other errors are fatal *) - | exn -> - fail (Process_worker_ipc_error "read_message" :: [error_of_exn exn])) - -let write_message oc msg = - let open Lwt_result_syntax in - Lwt.catch - (fun () -> - let len = Bytes.length msg in - let*! () = Lwt_io.write_int oc len in - let*! () = Lwt_io.write_from_exactly oc msg 0 len in - return `Write_ok) - (function - (* On incomplete message, we let the caller decide how to handle the error, - either retrying, ignoring, etc.*) - | End_of_file -> return `End_of_file - (* Other errors are fatal *) - | exn -> - fail (Process_worker_ipc_error "write_message" :: [error_of_exn exn])) +(* This value allows to limit the number of pending requests. If the limit is + reached, sending messages to the process will be blocking. *) +let process_stream_limit = 1024 let run f args = - let open Lwt_result_syntax in - let*! () = Lwt_io.flush_all () in - let ic_parent, oc_child = Lwt_io.pipe ~cloexec:true () in - let ic_child, oc_parent = Lwt_io.pipe ~cloexec:true () in - match Lwt_unix.fork () with - | 0 -> - (* Child *) - - (* Close useless ends *) - let*! () = Lwt_io.close ic_parent in - let*! () = Lwt_io.close oc_parent in - - let run = f ic_child oc_child args in - - (* Adds signal handlers in this child process *) - let (_ : Lwt_exit.clean_up_callback_id) = - Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> - let () = Lwt.cancel run in - Lwt.return_unit) - in - let* () = run in - exit 0 - | pid -> - (* Parent *) - let*! () = Lwt_io.close ic_child in - let*! () = Lwt_io.close oc_child in - return {pid; ic = ic_parent; oc = oc_parent} + let in_child = Eio.Stream.create process_stream_limit in + let out_child = Eio.Stream.create process_stream_limit in + let (_promise : 'a Eio.Promise.t) = + let mgr = Eio.Stdenv.domain_mgr (Tezos_base_unix.Event_loop.env_exn ()) in + let sw = Tezos_base_unix.Event_loop.main_switch_exn () in + Eio.Fiber.fork_promise ~sw (fun () -> + Eio.Domain_manager.run mgr (fun () -> f in_child out_child args)) + in + {input = in_child; output = out_child} diff --git a/src/lib_dal_node/process_worker.mli b/src/lib_dal_node/process_worker.mli index 67c007cf5b6f..23c633dce746 100644 --- a/src/lib_dal_node/process_worker.mli +++ b/src/lib_dal_node/process_worker.mli @@ -1,59 +1,39 @@ (*****************************************************************************) (* *) (* SPDX-License-Identifier: MIT *) -(* SPDX-FileCopyrightText: 2024 Nomadic Labs *) +(* SPDX-FileCopyrightText: 2025 Nomadic Labs *) (* *) (*****************************************************************************) -(** A unix worker process, that includes a tiny ipc protocol using unix pipes. - A message in this protocol involves sending the size, followed by the - payload. *) +(** A worker process running on a dedicated domain that communicate using Eio + streams of [Bytes.t]. *) type t -(** Error in inter process communication (IPC) between the main and forked - process. *) -type error += Process_worker_ipc_error of string - -(** [pid pw] returns the pid of the process worker [pw] *) -val pid : t -> int - -(** [output_channel pw] accesses the output channel of the process_worker [pw] *) -val output_channel : t -> Lwt_io.output_channel - -(** [input_channel pw] accesses the input channel of the process_worker [pw] *) -val input_channel : t -> Lwt_io.input_channel - -(** [read_message inchan] reads the message [msg] from the Lwt_io.input_channel - [inchan]. The promise is fulfilled when the entire message, as sent by the - [write_message] is received. It returns `End_of_file if the channel was - closed, or Error if an unexpected error is raised. *) -val read_message : - Lwt_io.input_channel -> - ([`Message of bytes | `End_of_file], error trace) result Lwt.t - -(** [write_message outchan msg] writes the message [msg] to the - Lwt_io.output_channel - The size of the message [msg] is implicitly sent, ensuring the whole message - to be sent. The promise is fulfilled and returns `Write_ok is the whole message - was correctly written on the output channel, or `End_of_file if the channel - was closed. Returns Error for unexpected errors *) -val write_message : - Lwt_io.output_channel -> - bytes -> - ([`Write_ok | `End_of_file], error trace) result Lwt.t - -(** [run f arg] will run the function [f] : input_channel output_channel arg, - in an forked process. This [run] function forks, open some pipes ends - between the parent and the child. The [f] function is expected to read and - write on the provided unix pipes. The returned value is a process manager - containing the other pipe ends, on which read_message and write_message can - be used. - As of now, only one reader and one writer are supported. There is not yet - a locking mechanism and lwt channels are publicly accessible. - On parent termination, a sigterm signal is sent to the child process, - triggering a Lwt.cancel of the [f] function. +(** [output_channel pw] accesses the output channel of the process_worker [pw]. + This is useful to get data from the process worker. *) +val output_channel : t -> Bytes.t Eio.Stream.t + +(** [input_channel pw] accesses the input channel of the process_worker [pw]. + This is useful to send data to the process worker. *) +val input_channel : t -> Bytes.t Eio.Stream.t + +(** [read_message input_stream] reads the message [msg] from the Eio.Stream + [input_stream]. *) +val read_message : Bytes.t Eio.Stream.t -> bytes + +(** [write_message output_stream msg] writes the message [msg] to the + Eio.Stream [output_stream]. *) +val write_message : Bytes.t Eio.Stream.t -> bytes -> unit + +(** [run f arg] will run the function [f input_stream output_stream arg], in a + dedicated domain. The communication with the process is done through + Eio.Streams. The [f] function is expected to read and write on the provided + streams. The returned value is a process manager containing the streams to + be use to communicate with the process. + As of now, only one reader and one writer are supported. There is not yet a + locking mechanism and Eio streams are publicly accessible. On parent + termination on cancellation, the signal is propagated to the process worker + associated domain. *) val run : - (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a -> unit tzresult Lwt.t) -> - 'a -> - t tzresult Lwt.t + (Bytes.t Eio.Stream.t -> Bytes.t Eio.Stream.t -> 'a -> unit) -> 'a -> t -- GitLab From c68663c60fcc35ef3c08efb6964e6bc2f5196e83 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Wed, 15 Oct 2025 16:44:05 +0200 Subject: [PATCH 3/4] DAL/Amplificator: improve amplificator shutdown logs --- src/lib_dal_node/amplificator.ml | 11 ++++++++++- src/lib_dal_node/event.ml | 1 + 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index ab60ff00002e..29800a2a90c9 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -388,7 +388,6 @@ let reply_receiver_job {process; query_store; _} node_context = Lwt.catch loop (function exn -> (* Unknown exception *) let err = [error_of_exn exn] in - let*! () = Event.emit_crypto_process_fatal ~msg:"Unexpected error" in Lwt.return (Error err)) let determine_amplification_delays node_ctxt = @@ -464,7 +463,12 @@ let make node_ctxt = let*! r = query_sender_job amplificator in match r with | Ok () -> return_unit + | Error [Exn Lwt.Canceled] -> (* Graceful cancellation *) return_unit | Error e -> + let msg = + Format.asprintf "Unexpected error: %a" Error_monad.pp_print_trace e + in + let*! () = Event.emit_crypto_process_fatal ~msg in Lwt_result_syntax.fail (Amplification_query_sender_job "Error running query sender job" :: e) in @@ -473,7 +477,12 @@ let make node_ctxt = let*! r = reply_receiver_job amplificator node_ctxt in match r with | Ok () -> return_unit + | Error [Exn Lwt.Canceled] -> (* Graceful cancellation *) return_unit | Error e -> + let msg = + Format.asprintf "Unexpected error: %a" Error_monad.pp_print_trace e + in + let*! () = Event.emit_crypto_process_fatal ~msg in Lwt_result_syntax.fail (Amplification_reply_receiver_job "Error in reply receiver job" :: e) in diff --git a/src/lib_dal_node/event.ml b/src/lib_dal_node/event.ml index d117e07aae5c..e7e5a53cd3e9 100644 --- a/src/lib_dal_node/event.ml +++ b/src/lib_dal_node/event.ml @@ -795,6 +795,7 @@ open struct ~msg:"cryptographic child process terminated unexpectedly: #{error}." ~level:Error ("error", Data_encoding.string) + ~pp1:Format.pp_print_string let crypto_process_error = declare_1 -- GitLab From 0ac0d72fbe4d48c629e999d8d2a96b8870aafbc2 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Wed, 15 Oct 2025 17:58:10 +0200 Subject: [PATCH 4/4] DAL/Amplificator: restore stopped event --- src/lib_dal_node/amplificator.ml | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index 29800a2a90c9..191923cfe3f2 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -263,14 +263,18 @@ end = struct (* send a reply with the error, and continue *) let () = reply_error_query ~output ~query_id ~error in loop () - with exn -> - let error = Printexc.to_string exn in - let () = + with + | Eio.Cancel.Cancelled _ -> Tezos_bees.Hive.async_lwt (fun () -> - Event.emit_crypto_process_error ~msg:error) - in - let () = reply_error_query ~output ~query_id ~error in - raise exn + Event.emit_crypto_process_stopped ()) + | exn -> + let error = Printexc.to_string exn in + let () = + Tezos_bees.Hive.async_lwt (fun () -> + Event.emit_crypto_process_error ~msg:error) + in + let () = reply_error_query ~output ~query_id ~error in + raise exn in loop () end -- GitLab