From ccdb0c0d1e23c612d4eab98e09b62d39eb7e2a97 Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Thu, 26 Sep 2024 15:38:55 +0200 Subject: [PATCH 1/6] Dal/Node move metrics outside context --- src/bin_dal_node/daemon.ml | 6 +++--- src/bin_dal_node/node_context.ml | 5 +---- src/bin_dal_node/node_context.mli | 1 - 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index 7b442f45bc55..9b6470bce93a 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -1099,8 +1099,9 @@ let run ~data_dir ~configuration_override = return_some amplificator else return_none in - (* This must be done after the amplificator starts. *) - let*! metrics_server = Metrics.launch config.metrics_addr in + (* Starts the metrics *after* the amplificator fork, to avoid forked opened + sockets *) + let*! _metrics_server = Metrics.launch config.metrics_addr in (* Set value size hooks. *) Value_size_hooks.set_share_size (Cryptobox.Internal_for_tests.encoded_share_size cryptobox) ; @@ -1117,7 +1118,6 @@ let run ~data_dir ~configuration_override = gs_worker transport_layer cctxt - metrics_server in (* Start RPC server. We do that before the waiting for the L1 node to be bootstrapped so that queries can already be issued. Note that that the node diff --git a/src/bin_dal_node/node_context.ml b/src/bin_dal_node/node_context.ml index 156f845da503..4a831ffa3a0c 100644 --- a/src/bin_dal_node/node_context.ml +++ b/src/bin_dal_node/node_context.ml @@ -40,12 +40,10 @@ type t = { gs_worker : Gossipsub.Worker.t; transport_layer : Gossipsub.Transport_layer.t; mutable profile_ctxt : Profile_manager.t; - metrics_server : Metrics.t; } let init config profile_ctxt cryptobox shards_proofs_precomputation - proto_parameters proto_plugins store gs_worker transport_layer cctxt - metrics_server = + proto_parameters proto_plugins store gs_worker transport_layer cctxt = let neighbors_cctxts = List.map (fun Configuration_file.{addr; port} -> @@ -71,7 +69,6 @@ let init config profile_ctxt cryptobox shards_proofs_precomputation gs_worker; transport_layer; profile_ctxt; - metrics_server; } let may_reconstruct ~reconstruct slot_id t = diff --git a/src/bin_dal_node/node_context.mli b/src/bin_dal_node/node_context.mli index 011ef32b5741..83c051fd8744 100644 --- a/src/bin_dal_node/node_context.mli +++ b/src/bin_dal_node/node_context.mli @@ -40,7 +40,6 @@ val init : Gossipsub.Worker.t -> Gossipsub.Transport_layer.t -> Tezos_rpc.Context.generic -> - Metrics.t -> t (** Returns all the registered plugins *) -- GitLab From d606d2cda51485d7d753132e86dbaeed3c7272ce Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Thu, 26 Sep 2024 17:59:29 +0200 Subject: [PATCH 2/6] Dal/Node: refactor: move context in amplificator --- src/bin_dal_node/amplificator.ml | 87 ++++++++++++++----------------- src/bin_dal_node/amplificator.mli | 8 +-- src/bin_dal_node/daemon.ml | 47 +++++++++-------- 3 files changed, 67 insertions(+), 75 deletions(-) diff --git a/src/bin_dal_node/amplificator.ml b/src/bin_dal_node/amplificator.ml index 90d2f7afcdf8..b4d0f6a32afc 100644 --- a/src/bin_dal_node/amplificator.ml +++ b/src/bin_dal_node/amplificator.ml @@ -19,6 +19,7 @@ type query_msg = Query_msg of {query_id : int; shards : bytes} module Query_store = Stdlib.Hashtbl type t = { + node_ctxt : Node_context.t; process : Process_worker.t; query_pipe : query_msg Lwt_pipe.Unbounded.t; mutable query_id : int; @@ -354,7 +355,7 @@ let reply_receiver_job {process; query_store; _} node_context = let err = [error_of_exn exn] in Lwt.return (Error err)) -let make () = +let make node_ctxt = let open Lwt_result_syntax in (* Fork a process to offload cryptographic calculations *) let* process = @@ -364,8 +365,9 @@ let make () = in let query_pipe = Lwt_pipe.Unbounded.create () in let query_store = Query_store.create 23 in - let amplificator = {process; query_pipe; query_store; query_id = 0} in - + let amplificator = + {node_ctxt; process; query_pipe; query_store; query_id = 0} + in let (_ : Lwt_exit.clean_up_callback_id) = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _exit_code -> let pid = Process_worker.pid process in @@ -378,7 +380,6 @@ let make () = let init amplificator node_context = let open Lwt_result_syntax in let proto_parameters = Node_context.get_proto_parameters node_context in - (* Run a job enqueueing all shards calculation tasks *) let amplificator_query_sender_job = let*! r = query_sender_job amplificator in @@ -432,12 +433,10 @@ let enqueue_job_shards_proof amplificator commitment slot_id proto_parameters let shards = Data_encoding.(Binary.to_bytes_exn (list Cryptobox.shard_encoding)) shards in - (* Should be atomic incr in the context of a lwt concurrency * Could overflow some day on 32 bits machines, more difficult on 64 *) let query_id = amplificator.query_id in amplificator.query_id <- amplificator.query_id + 1 ; - (* Store some arguments in a query table, because they are necessary to publish, but not to calculate, so avoid the cost of serializing them *) let () = @@ -458,51 +457,45 @@ let amplify node_store commitment (slot_id : Types.slot_id) ~number_of_already_stored_shards ~number_of_shards ~number_of_needed_shards proto_parameters amplificator = let open Lwt_result_syntax in - match amplificator with - | None -> - let*! () = Event.(emit amplificator_uninitialized ()) in - return_unit - | Some amplificator -> - Dal_metrics.reconstruction_started () ; - let*! () = - Event.( - emit - reconstruct_started - ( slot_id.slot_level, - slot_id.slot_index, - number_of_already_stored_shards, - number_of_shards )) - in - let shards = - Store.(Shards.read_all node_store.shards slot_id ~number_of_shards) - |> Seq_s.filter_map (function - | _, index, Ok share -> Some Cryptobox.{index; share} - | _ -> None) - in - let*? shards = - Seq_s.take - ~when_negative_length:[error_of_exn (Invalid_argument "Seq_s.take")] - number_of_needed_shards - shards - in - (* Enqueue this reconstruction in a query_pipe to be sent to a reconstruction process *) - let* () = - enqueue_job_shards_proof - amplificator - commitment - slot_id - proto_parameters - shards - in - return_unit + Dal_metrics.reconstruction_started () ; + let*! () = + Event.( + emit + reconstruct_started + ( slot_id.slot_level, + slot_id.slot_index, + number_of_already_stored_shards, + number_of_shards )) + in + let shards = + Store.(Shards.read_all node_store.shards slot_id ~number_of_shards) + |> Seq_s.filter_map (function + | _, index, Ok share -> Some Cryptobox.{index; share} + | _ -> None) + in + let*? shards = + Seq_s.take + ~when_negative_length:[error_of_exn (Invalid_argument "Seq_s.take")] + number_of_needed_shards + shards + in + (* Enqueue this reconstruction in a query_pipe to be sent to a reconstruction process *) + let* () = + enqueue_job_shards_proof + amplificator + commitment + slot_id + proto_parameters + shards + in + return_unit -let try_amplification node_ctxt commitment (slot_id : Types.slot_id) - amplificator = +let try_amplification commitment (slot_id : Types.slot_id) amplificator = let open Lwt_result_syntax in + let node_ctxt = amplificator.node_ctxt in match Node_context.get_shards_proofs_precomputation node_ctxt with | None -> - (* The prover SRS is not loaded so we cannot reconstruct slots - yet. *) + (* The prover SRS is not loaded so we cannot reconstruct slots yet. *) let*! () = Event.( emit diff --git a/src/bin_dal_node/amplificator.mli b/src/bin_dal_node/amplificator.mli index bcbb6cdb9e1b..d11dad4e369b 100644 --- a/src/bin_dal_node/amplificator.mli +++ b/src/bin_dal_node/amplificator.mli @@ -37,14 +37,10 @@ type t main process of the DAL node but by the process provided in the [amplificator] argument. *) val try_amplification : - Node_context.t -> - Cryptobox.Commitment.t -> - Types.slot_id -> - t option -> - unit tzresult Lwt.t + Cryptobox.Commitment.t -> Types.slot_id -> t -> unit tzresult Lwt.t (** Creates a new amplificator process *) -val make : unit -> t tzresult Lwt.t +val make : Node_context.t -> t tzresult Lwt.t (** [init amplificator node_ctxt params] Initializes the amplificator [t] with the current context. *) diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index 9b6470bce93a..676ad93fb670 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -636,12 +636,13 @@ let connect_gossipsub_with_p2p gs_worker transport_layer node_store node_ctxt Profile_manager.get_profiles @@ Node_context.get_profile_ctxt node_ctxt with | Operator profile - when Operator_profile.is_observed_slot slot_index profile -> - Amplificator.try_amplification - node_ctxt - commitment - slot_id - amplificator + when Operator_profile.is_observed_slot slot_index profile -> ( + match amplificator with + | None -> + let*! () = Event.(emit amplificator_uninitialized ()) in + return_unit + | Some amplificator -> + Amplificator.try_amplification commitment slot_id amplificator) | _ -> return_unit in Lwt.dont_wait @@ -1090,22 +1091,6 @@ let run ~data_dir ~configuration_override = let* cryptobox, shards_proofs_precomputation = init_cryptobox config proto_parameters in - let* amplificator = - (* TODO: https://gitlab.com/tezos/tezos/-/issues/7452 - The amplificator also initializes the crypto. This could be avoided. - Also, is this the right moment to start the amplificator? *) - if Profile_manager.is_prover_profile profile_ctxt then - let* amplificator = Amplificator.make () in - return_some amplificator - else return_none - in - (* Starts the metrics *after* the amplificator fork, to avoid forked opened - sockets *) - let*! _metrics_server = Metrics.launch config.metrics_addr in - (* Set value size hooks. *) - Value_size_hooks.set_share_size - (Cryptobox.Internal_for_tests.encoded_share_size cryptobox) ; - Value_size_hooks.set_number_of_slots proto_parameters.number_of_slots ; let ctxt = Node_context.init config @@ -1119,6 +1104,24 @@ let run ~data_dir ~configuration_override = transport_layer cctxt in + (* TODO: https://gitlab.com/tezos/tezos/-/issues/7452 + The amplificator also initializes the crypto. This could be avoided. Also, + is this the right moment to start the amplificator? *) + let profile_manager = Node_context.get_profile_ctxt ctxt in + let is_prover_profile = Profile_manager.is_prover_profile profile_manager in + let* amplificator = + if not is_prover_profile then return_none + else + let* amplificator = Amplificator.make ctxt in + return_some amplificator + in + (* Starts the metrics *after* the amplificator fork, to avoid forked opened + sockets *) + let*! _metrics_server = Metrics.launch config.metrics_addr in + (* Set value size hooks. *) + Value_size_hooks.set_share_size + (Cryptobox.Internal_for_tests.encoded_share_size cryptobox) ; + Value_size_hooks.set_number_of_slots proto_parameters.number_of_slots ; (* Start RPC server. We do that before the waiting for the L1 node to be bootstrapped so that queries can already be issued. Note that that the node will thus already respond to the baker about shards status if queried. *) -- GitLab From df758311a521aec8825326d03d83c1bf87b27e5e Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Thu, 26 Sep 2024 16:28:20 +0200 Subject: [PATCH 3/6] Dal/Node: use same cryptobox between processes Fixes https://gitlab.com/tezos/tezos/-/issues/7452 --- src/bin_dal_node/amplificator.ml | 203 +++++++++++++------------------ src/bin_dal_node/daemon.ml | 3 - src/bin_dal_node/errors.ml | 13 +- src/bin_dal_node/errors.mli | 1 + src/bin_dal_node/event.ml | 11 +- 5 files changed, 99 insertions(+), 132 deletions(-) diff --git a/src/bin_dal_node/amplificator.ml b/src/bin_dal_node/amplificator.ml index b4d0f6a32afc..e9ae9390feff 100644 --- a/src/bin_dal_node/amplificator.ml +++ b/src/bin_dal_node/amplificator.ml @@ -135,38 +135,6 @@ let proved_shards_encoding = (* This module contains the logic for a shard proving process worker running in background. It communicates with the main dal process via pipe ipc. *) module Reconstruction_process_worker = struct - let reconstruct_process_init_cryptobox - (proto_parameters : Dal_plugin.proto_parameters) = - let open Lwt_result_syntax in - let* () = - let find_srs_files () = Tezos_base.Dal_srs.find_trusted_setup_files () in - Cryptobox.init_prover_dal ~find_srs_files () - in - match Cryptobox.make proto_parameters.cryptobox_parameters with - | Ok cryptobox -> ( - match Cryptobox.precompute_shards_proofs cryptobox with - | Ok precomputation -> return (cryptobox, precomputation) - | Error (`Invalid_degree_strictly_less_than_expected {given; expected}) - -> - fail - [ - Errors.Cryptobox_initialisation_failed - (Printf.sprintf - "Cryptobox.precompute_shards_proofs: SRS size (= %d) \ - smaller than expected (= %d)" - given - expected); - ]) - | Error (`Fail msg) -> - fail - [ - Errors.Cryptobox_initialisation_failed - (Format.asprintf - "Error initializing cryptobox in crypto process \ - (Cryptobox.make): %s" - msg); - ] - let read_init_message_from_parent ic = let open Lwt_result_syntax in let* bytes_proto_parameters = @@ -213,14 +181,13 @@ module Reconstruction_process_worker = struct - receive shards - calculate all shards and their proofs - sent shards and proofs *) - let reconstruct_process_worker ic oc () = + let reconstruct_process_worker ic oc (cryptobox, shards_proofs_precomputation) + = let open Lwt_result_syntax in (* Read init message from parent with parameters required to initialize cryptobox *) - let* proto_parameters = read_init_message_from_parent ic in - let* cryptobox, shards_proofs_precomputation = - reconstruct_process_init_cryptobox proto_parameters - in + (* FIXME: it is not necessary anymore to initialize the cryptobox *) + let* _proto_parameters = read_init_message_from_parent ic in let*! () = Event.(emit crypto_process_started (Unix.getpid ())) in let rec loop () = let*! query_id = Lwt_io.read_int ic in @@ -357,17 +324,29 @@ let reply_receiver_job {process; query_store; _} node_context = let make node_ctxt = let open Lwt_result_syntax in + let cryptobox = Node_context.get_cryptobox node_ctxt in + let shards_proofs_precomputation = + Node_context.get_shards_proofs_precomputation node_ctxt + in + let* shards_proofs_precomputation = + match shards_proofs_precomputation with + | None -> + let*! () = Event.(emit reconstruct_missing_prover_srs ()) in + fail [Errors.Amplificator_initialization_failed] + | Some v -> return v + in (* Fork a process to offload cryptographic calculations *) let* process = Process_worker.run Reconstruction_process_worker.reconstruct_process_worker - () + (cryptobox, shards_proofs_precomputation) in let query_pipe = Lwt_pipe.Unbounded.create () in let query_store = Query_store.create 23 in let amplificator = {node_ctxt; process; query_pipe; query_store; query_id = 0} in + let (_ : Lwt_exit.clean_up_callback_id) = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _exit_code -> let pid = Process_worker.pid process in @@ -493,89 +472,75 @@ let amplify node_store commitment (slot_id : Types.slot_id) let try_amplification commitment (slot_id : Types.slot_id) amplificator = let open Lwt_result_syntax in let node_ctxt = amplificator.node_ctxt in - match Node_context.get_shards_proofs_precomputation node_ctxt with - | None -> - (* The prover SRS is not loaded so we cannot reconstruct slots yet. *) + let node_store = Node_context.get_store node_ctxt in + let proto_parameters = Node_context.get_proto_parameters node_ctxt in + let number_of_shards = + proto_parameters.cryptobox_parameters.number_of_shards + in + let redundancy_factor = + proto_parameters.cryptobox_parameters.redundancy_factor + in + let number_of_needed_shards = number_of_shards / redundancy_factor in + let* number_of_already_stored_shards = + Store.Shards.count_values node_store.shards slot_id + in + (* There are two situations where we don't want to reconstruct: + if we don't have enough shards or if we already have all the + shards. *) + if + number_of_already_stored_shards < number_of_needed_shards + || number_of_already_stored_shards = number_of_shards + then return_unit + else + (* We have enough shards to reconstruct the whole slot. *) + with_amplification_lock + node_ctxt + slot_id + ~on_error: + Event.( + fun err -> + emit reconstruct_error (slot_id.slot_level, slot_id.slot_index, err)) + @@ fun () -> + (* Wait a random delay between 1 and 2 seconds before starting + the reconstruction; this is to give some slack to receive + all the shards so that the reconstruction is not needed, and + also avoids having multiple nodes reconstruct at once. *) + let random_delay = + Constants.( + amplification_random_delay_min + +. Random.float + (amplification_random_delay_max -. amplification_random_delay_min)) + in + let*! () = + Event.( + emit + reconstruct_starting_in + (slot_id.slot_level, slot_id.slot_index, random_delay)) + in + let*! () = Lwt_unix.sleep random_delay in + (* Count again the stored shards because we may have received + more shards during the random delay. *) + let* number_of_already_stored_shards = + Store.Shards.count_values node_store.shards slot_id + in + (* If we have received all the shards while waiting the random + delay, there is no point in reconstructing anymore *) + if number_of_already_stored_shards = number_of_shards then ( let*! () = Event.( emit - reconstruct_missing_prover_srs + reconstruct_no_missing_shard (slot_id.slot_level, slot_id.slot_index)) in - return_unit - | Some _ -> - let node_store = Node_context.get_store node_ctxt in - let proto_parameters = Node_context.get_proto_parameters node_ctxt in - let number_of_shards = - proto_parameters.cryptobox_parameters.number_of_shards - in - let redundancy_factor = - proto_parameters.cryptobox_parameters.redundancy_factor - in - let number_of_needed_shards = number_of_shards / redundancy_factor in - let* number_of_already_stored_shards = - Store.Shards.count_values node_store.shards slot_id - in - (* There are two situations where we don't want to reconstruct: - if we don't have enough shards or if we already have all the - shards. *) - if - number_of_already_stored_shards < number_of_needed_shards - || number_of_already_stored_shards = number_of_shards - then return_unit - else - (* We have enough shards to reconstruct the whole slot. *) - with_amplification_lock - node_ctxt - slot_id - ~on_error: - Event.( - fun err -> - emit - reconstruct_error - (slot_id.slot_level, slot_id.slot_index, err)) - @@ fun () -> - (* Wait a random delay between 1 and 2 seconds before starting - the reconstruction; this is to give some slack to receive - all the shards so that the reconstruction is not needed, and - also avoids having multiple nodes reconstruct at once. *) - let random_delay = - Constants.( - amplification_random_delay_min - +. Random.float - (amplification_random_delay_max - -. amplification_random_delay_min)) - in - let*! () = - Event.( - emit - reconstruct_starting_in - (slot_id.slot_level, slot_id.slot_index, random_delay)) - in - let*! () = Lwt_unix.sleep random_delay in - (* Count again the stored shards because we may have received - more shards during the random delay. *) - let* number_of_already_stored_shards = - Store.Shards.count_values node_store.shards slot_id - in - (* If we have received all the shards while waiting the random - delay, there is no point in reconstructing anymore *) - if number_of_already_stored_shards = number_of_shards then ( - let*! () = - Event.( - emit - reconstruct_no_missing_shard - (slot_id.slot_level, slot_id.slot_index)) - in - Dal_metrics.reconstruction_aborted () ; - return_unit) - else - amplify - node_store - commitment - slot_id - ~number_of_already_stored_shards - ~number_of_shards - ~number_of_needed_shards - proto_parameters - amplificator + Dal_metrics.reconstruction_aborted () ; + return_unit) + else + amplify + node_store + commitment + slot_id + ~number_of_already_stored_shards + ~number_of_shards + ~number_of_needed_shards + proto_parameters + amplificator diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index 676ad93fb670..442358cab8bf 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -1104,9 +1104,6 @@ let run ~data_dir ~configuration_override = transport_layer cctxt in - (* TODO: https://gitlab.com/tezos/tezos/-/issues/7452 - The amplificator also initializes the crypto. This could be avoided. Also, - is this the right moment to start the amplificator? *) let profile_manager = Node_context.get_profile_ctxt ctxt in let is_prover_profile = Profile_manager.is_prover_profile profile_manager in let* amplificator = diff --git a/src/bin_dal_node/errors.ml b/src/bin_dal_node/errors.ml index 5c03fb05f3b7..7827e8dbe62c 100644 --- a/src/bin_dal_node/errors.ml +++ b/src/bin_dal_node/errors.ml @@ -23,7 +23,7 @@ (* *) (*****************************************************************************) -(** Extention of the open type [error] with the errors that could be raised by +(** Extension of the open type [error] with the errors that could be raised by the DAL node. *) type error += | Decoding_failed of Types.Store.kind @@ -32,6 +32,7 @@ type error += | Cryptobox_initialisation_failed of string | Not_enough_history of {stored_levels : int; minimal_levels : int} | Not_enough_l1_history of {stored_cycles : int; minimal_cycles : int} + | Amplificator_initialization_failed (* TODO: https://gitlab.com/tezos/tezos/-/issues/4622 @@ -136,7 +137,15 @@ let () = Some (stored_cycles, minimal_cycles) | _ -> None) (fun (stored_cycles, minimal_cycles) -> - Not_enough_l1_history {stored_cycles; minimal_cycles}) + Not_enough_l1_history {stored_cycles; minimal_cycles}) ; + register_error_kind + `Permanent + ~id:"dal.node.amplificator_initialization_failed" + ~title:"Amplificator initialization failed" + ~description:"Amplificator initialization failed" + Data_encoding.empty + (function Amplificator_initialization_failed -> Some () | _ -> None) + (fun () -> Amplificator_initialization_failed) (** This part defines and handles more elaborate errors for the DAL node. *) diff --git a/src/bin_dal_node/errors.mli b/src/bin_dal_node/errors.mli index fc5e70cfb55f..82a54694b392 100644 --- a/src/bin_dal_node/errors.mli +++ b/src/bin_dal_node/errors.mli @@ -32,6 +32,7 @@ type error += | Cryptobox_initialisation_failed of string | Not_enough_history of {stored_levels : int; minimal_levels : int} | Not_enough_l1_history of {stored_cycles : int; minimal_cycles : int} + | Amplificator_initialization_failed (** The errors below are used to extend tzresult/tztrace monad/errors with Some specific errors on which we'd like to match in the DAL node's code. *) diff --git a/src/bin_dal_node/event.ml b/src/bin_dal_node/event.ml index a58aaf659a4e..f6e00fc8aee2 100644 --- a/src/bin_dal_node/event.ml +++ b/src/bin_dal_node/event.ml @@ -400,17 +400,12 @@ let saving_profiles_failed = ("error", Error_monad.trace_encoding) let reconstruct_missing_prover_srs = - declare_2 + declare_0 ~section ~name:"reconstruct_missing_prover_srs" - ~msg: - "Missing prover SRS, reconstruction for the level {level} and slot index \ - {slot_index} was skipped." + ~msg:"Missing prover SRS, reconstruction failed" ~level:Warning - ~pp1:(fun fmt -> Format.fprintf fmt "%ld") - ("level", Data_encoding.int32) - ~pp2:Format.pp_print_int - ("slot_index", Data_encoding.int31) + () let reconstruct_starting_in = declare_3 -- GitLab From 863269dd323a5d5207f52a4982b019c068c61a76 Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Thu, 26 Sep 2024 16:53:18 +0200 Subject: [PATCH 4/6] Dal/Node amplificator cleaning --- src/bin_dal_node/amplificator.ml | 15 ++++++--------- src/bin_dal_node/amplificator.mli | 4 ---- src/bin_dal_node/daemon.ml | 12 ++++-------- 3 files changed, 10 insertions(+), 21 deletions(-) diff --git a/src/bin_dal_node/amplificator.ml b/src/bin_dal_node/amplificator.ml index e9ae9390feff..661cdcede4a2 100644 --- a/src/bin_dal_node/amplificator.ml +++ b/src/bin_dal_node/amplificator.ml @@ -322,7 +322,7 @@ let reply_receiver_job {process; query_store; _} node_context = let err = [error_of_exn exn] in Lwt.return (Error err)) -let make node_ctxt = +let start_amplificator node_ctxt = let open Lwt_result_syntax in let cryptobox = Node_context.get_cryptobox node_ctxt in let shards_proofs_precomputation = @@ -346,7 +346,6 @@ let make node_ctxt = let amplificator = {node_ctxt; process; query_pipe; query_store; query_id = 0} in - let (_ : Lwt_exit.clean_up_callback_id) = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _exit_code -> let pid = Process_worker.pid process in @@ -356,9 +355,10 @@ let make node_ctxt = in return amplificator -let init amplificator node_context = +let make node_ctxt = let open Lwt_result_syntax in - let proto_parameters = Node_context.get_proto_parameters node_context in + let* amplificator = start_amplificator node_ctxt in + let proto_parameters = Node_context.get_proto_parameters node_ctxt in (* Run a job enqueueing all shards calculation tasks *) let amplificator_query_sender_job = let*! r = query_sender_job amplificator in @@ -368,24 +368,21 @@ let init amplificator node_context = Lwt_result_syntax.fail (Amplification_query_sender_job "Error running query sender job" :: e) in - (* Run a job retrieving all shards and their proof and publish them *) let amplificator_reply_receiver_job = - let*! r = reply_receiver_job amplificator node_context in + let*! r = reply_receiver_job amplificator node_ctxt in match r with | Ok () -> return_unit | Error e -> Lwt_result_syntax.fail (Amplification_reply_receiver_job "Error in reply receiver job" :: e) in - let (_ : Lwt_exit.clean_up_callback_id) = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _exit_code -> let () = Lwt.cancel amplificator_query_sender_job in let () = Lwt.cancel amplificator_reply_receiver_job in Lwt.return_unit) in - let bytes_proto_parameters = Data_encoding.Binary.to_bytes_exn Dal_plugin.proto_parameters_encoding @@ -403,7 +400,7 @@ let init amplificator node_context = ] | `Write_ok -> return_unit in - return_unit + return amplificator let enqueue_job_shards_proof amplificator commitment slot_id proto_parameters shards = diff --git a/src/bin_dal_node/amplificator.mli b/src/bin_dal_node/amplificator.mli index d11dad4e369b..e779ad6c16b6 100644 --- a/src/bin_dal_node/amplificator.mli +++ b/src/bin_dal_node/amplificator.mli @@ -41,7 +41,3 @@ val try_amplification : (** Creates a new amplificator process *) val make : Node_context.t -> t tzresult Lwt.t - -(** [init amplificator node_ctxt params] Initializes the amplificator [t] with - the current context. *) -val init : t -> Node_context.t -> unit tzresult Lwt.t diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index 442358cab8bf..435698b382b6 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -1084,7 +1084,6 @@ let run ~data_dir ~configuration_override = head_level first_seen_level in - (* Initialize the crypto process *) (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5743 Instead of recomputing these parameters, they could be stored (for a given cryptobox). *) @@ -1104,8 +1103,10 @@ let run ~data_dir ~configuration_override = transport_layer cctxt in - let profile_manager = Node_context.get_profile_ctxt ctxt in - let is_prover_profile = Profile_manager.is_prover_profile profile_manager in + let is_prover_profile = Profile_manager.is_prover_profile profile_ctxt in + (* Initialize amplificator if in prover profile. + This forks a process and should be kept early to avoid copying opened file + descriptors. *) let* amplificator = if not is_prover_profile then return_none else @@ -1167,11 +1168,6 @@ let run ~data_dir ~configuration_override = in return crawler in - let* () = - match amplificator with - | None -> return_unit - | Some amplificator -> Amplificator.init amplificator ctxt - in (* Activate the p2p instance. *) connect_gossipsub_with_p2p gs_worker transport_layer store ctxt amplificator ; let*! () = -- GitLab From 55eed42593c8f747fd8c1b997bd16db2fe63b4d2 Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Thu, 26 Sep 2024 17:00:54 +0200 Subject: [PATCH 5/6] Dal/Node remove now useless event --- src/bin_dal_node/amplificator.ml | 4 +--- src/bin_dal_node/event.ml | 8 -------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/bin_dal_node/amplificator.ml b/src/bin_dal_node/amplificator.ml index 661cdcede4a2..22a5ec2368b6 100644 --- a/src/bin_dal_node/amplificator.ml +++ b/src/bin_dal_node/amplificator.ml @@ -330,9 +330,7 @@ let start_amplificator node_ctxt = in let* shards_proofs_precomputation = match shards_proofs_precomputation with - | None -> - let*! () = Event.(emit reconstruct_missing_prover_srs ()) in - fail [Errors.Amplificator_initialization_failed] + | None -> fail [Errors.Amplificator_initialization_failed] | Some v -> return v in (* Fork a process to offload cryptographic calculations *) diff --git a/src/bin_dal_node/event.ml b/src/bin_dal_node/event.ml index f6e00fc8aee2..f648c560607d 100644 --- a/src/bin_dal_node/event.ml +++ b/src/bin_dal_node/event.ml @@ -399,14 +399,6 @@ let saving_profiles_failed = ~level:Error ("error", Error_monad.trace_encoding) -let reconstruct_missing_prover_srs = - declare_0 - ~section - ~name:"reconstruct_missing_prover_srs" - ~msg:"Missing prover SRS, reconstruction failed" - ~level:Warning - () - let reconstruct_starting_in = declare_3 ~section -- GitLab From 7e3537b785d1dd4859d3012c64d97d6cb5e416e7 Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Thu, 26 Sep 2024 19:12:24 +0200 Subject: [PATCH 6/6] Dal/Amplificator: change ipc message We do not need to send parameters to child, but we need the parent to wait for the process to be ready. Hence a welcome message. --- src/bin_dal_node/amplificator.ml | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/src/bin_dal_node/amplificator.ml b/src/bin_dal_node/amplificator.ml index 22a5ec2368b6..f148bcd27631 100644 --- a/src/bin_dal_node/amplificator.ml +++ b/src/bin_dal_node/amplificator.ml @@ -5,6 +5,8 @@ (* *) (*****************************************************************************) +let welcome = Bytes.of_string "0 Ready" + (* A Lwt worker maintening a queue of calculation jobs to send to the crypto process *) type query = | Query of { @@ -137,7 +139,7 @@ let proved_shards_encoding = module Reconstruction_process_worker = struct let read_init_message_from_parent ic = let open Lwt_result_syntax in - let* bytes_proto_parameters = + let* () = let* r = Process_worker.read_message ic in match r with | `End_of_file -> @@ -146,14 +148,10 @@ module Reconstruction_process_worker = struct Reconstruction_process_worker_error "Invalid initialization message"; ] - | `Message msg -> return msg - in - let proto_parameters = - Data_encoding.Binary.of_bytes_exn - Dal_plugin.proto_parameters_encoding - bytes_proto_parameters + | `Message b when Bytes.equal b welcome -> return_unit + | `Message b -> fail_with_exn (Invalid_argument (String.of_bytes b)) in - return proto_parameters + return_unit let reconstruct cryptobox precomputation shards = let open Lwt_result_syntax in @@ -186,8 +184,7 @@ module Reconstruction_process_worker = struct let open Lwt_result_syntax in (* Read init message from parent with parameters required to initialize cryptobox *) - (* FIXME: it is not necessary anymore to initialize the cryptobox *) - let* _proto_parameters = read_init_message_from_parent ic in + let* () = read_init_message_from_parent ic in let*! () = Event.(emit crypto_process_started (Unix.getpid ())) in let rec loop () = let*! query_id = Lwt_io.read_int ic in @@ -356,7 +353,6 @@ let start_amplificator node_ctxt = let make node_ctxt = let open Lwt_result_syntax in let* amplificator = start_amplificator node_ctxt in - let proto_parameters = Node_context.get_proto_parameters node_ctxt in (* Run a job enqueueing all shards calculation tasks *) let amplificator_query_sender_job = let*! r = query_sender_job amplificator in @@ -381,14 +377,9 @@ let make node_ctxt = let () = Lwt.cancel amplificator_reply_receiver_job in Lwt.return_unit) in - let bytes_proto_parameters = - Data_encoding.Binary.to_bytes_exn - Dal_plugin.proto_parameters_encoding - proto_parameters - in let* () = let oc = Process_worker.output_channel amplificator.process in - let* r = Process_worker.write_message oc bytes_proto_parameters in + let* r = Process_worker.write_message oc (Bytes.of_string "0 Ready") in match r with | `End_of_file -> fail -- GitLab