diff --git a/src/bin_dal_node/amplificator.ml b/src/bin_dal_node/amplificator.ml index 90d2f7afcdf8686b04b769dc45a8c06a16493cfb..f148bcd27631f2b7dd290a7eb27013bb5bba7e96 100644 --- a/src/bin_dal_node/amplificator.ml +++ b/src/bin_dal_node/amplificator.ml @@ -5,6 +5,8 @@ (* *) (*****************************************************************************) +let welcome = Bytes.of_string "0 Ready" + (* A Lwt worker maintening a queue of calculation jobs to send to the crypto process *) type query = | Query of { @@ -19,6 +21,7 @@ type query_msg = Query_msg of {query_id : int; shards : bytes} module Query_store = Stdlib.Hashtbl type t = { + node_ctxt : Node_context.t; process : Process_worker.t; query_pipe : query_msg Lwt_pipe.Unbounded.t; mutable query_id : int; @@ -134,41 +137,9 @@ let proved_shards_encoding = (* This module contains the logic for a shard proving process worker running in background. It communicates with the main dal process via pipe ipc. *) module Reconstruction_process_worker = struct - let reconstruct_process_init_cryptobox - (proto_parameters : Dal_plugin.proto_parameters) = - let open Lwt_result_syntax in - let* () = - let find_srs_files () = Tezos_base.Dal_srs.find_trusted_setup_files () in - Cryptobox.init_prover_dal ~find_srs_files () - in - match Cryptobox.make proto_parameters.cryptobox_parameters with - | Ok cryptobox -> ( - match Cryptobox.precompute_shards_proofs cryptobox with - | Ok precomputation -> return (cryptobox, precomputation) - | Error (`Invalid_degree_strictly_less_than_expected {given; expected}) - -> - fail - [ - Errors.Cryptobox_initialisation_failed - (Printf.sprintf - "Cryptobox.precompute_shards_proofs: SRS size (= %d) \ - smaller than expected (= %d)" - given - expected); - ]) - | Error (`Fail msg) -> - fail - [ - Errors.Cryptobox_initialisation_failed - (Format.asprintf - "Error initializing cryptobox in crypto process \ - (Cryptobox.make): %s" - msg); - ] - let read_init_message_from_parent ic = let open Lwt_result_syntax in - let* bytes_proto_parameters = + let* () = let* r = Process_worker.read_message ic in match r with | `End_of_file -> @@ -177,14 +148,10 @@ module Reconstruction_process_worker = struct Reconstruction_process_worker_error "Invalid initialization message"; ] - | `Message msg -> return msg + | `Message b when Bytes.equal b welcome -> return_unit + | `Message b -> fail_with_exn (Invalid_argument (String.of_bytes b)) in - let proto_parameters = - Data_encoding.Binary.of_bytes_exn - Dal_plugin.proto_parameters_encoding - bytes_proto_parameters - in - return proto_parameters + return_unit let reconstruct cryptobox precomputation shards = let open Lwt_result_syntax in @@ -212,14 +179,12 @@ module Reconstruction_process_worker = struct - receive shards - calculate all shards and their proofs - sent shards and proofs *) - let reconstruct_process_worker ic oc () = + let reconstruct_process_worker ic oc (cryptobox, shards_proofs_precomputation) + = let open Lwt_result_syntax in (* Read init message from parent with parameters required to initialize cryptobox *) - let* proto_parameters = read_init_message_from_parent ic in - let* cryptobox, shards_proofs_precomputation = - reconstruct_process_init_cryptobox proto_parameters - in + let* () = read_init_message_from_parent ic in let*! () = Event.(emit crypto_process_started (Unix.getpid ())) in let rec loop () = let*! query_id = Lwt_io.read_int ic in @@ -354,18 +319,28 @@ let reply_receiver_job {process; query_store; _} node_context = let err = [error_of_exn exn] in Lwt.return (Error err)) -let make () = +let start_amplificator node_ctxt = let open Lwt_result_syntax in + let cryptobox = Node_context.get_cryptobox node_ctxt in + let shards_proofs_precomputation = + Node_context.get_shards_proofs_precomputation node_ctxt + in + let* shards_proofs_precomputation = + match shards_proofs_precomputation with + | None -> fail [Errors.Amplificator_initialization_failed] + | Some v -> return v + in (* Fork a process to offload cryptographic calculations *) let* process = Process_worker.run Reconstruction_process_worker.reconstruct_process_worker - () + (cryptobox, shards_proofs_precomputation) in let query_pipe = Lwt_pipe.Unbounded.create () in let query_store = Query_store.create 23 in - let amplificator = {process; query_pipe; query_store; query_id = 0} in - + let amplificator = + {node_ctxt; process; query_pipe; query_store; query_id = 0} + in let (_ : Lwt_exit.clean_up_callback_id) = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _exit_code -> let pid = Process_worker.pid process in @@ -375,10 +350,9 @@ let make () = in return amplificator -let init amplificator node_context = +let make node_ctxt = let open Lwt_result_syntax in - let proto_parameters = Node_context.get_proto_parameters node_context in - + let* amplificator = start_amplificator node_ctxt in (* Run a job enqueueing all shards calculation tasks *) let amplificator_query_sender_job = let*! r = query_sender_job amplificator in @@ -388,32 +362,24 @@ let init amplificator node_context = Lwt_result_syntax.fail (Amplification_query_sender_job "Error running query sender job" :: e) in - (* Run a job retrieving all shards and their proof and publish them *) let amplificator_reply_receiver_job = - let*! r = reply_receiver_job amplificator node_context in + let*! r = reply_receiver_job amplificator node_ctxt in match r with | Ok () -> return_unit | Error e -> Lwt_result_syntax.fail (Amplification_reply_receiver_job "Error in reply receiver job" :: e) in - let (_ : Lwt_exit.clean_up_callback_id) = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _exit_code -> let () = Lwt.cancel amplificator_query_sender_job in let () = Lwt.cancel amplificator_reply_receiver_job in Lwt.return_unit) in - - let bytes_proto_parameters = - Data_encoding.Binary.to_bytes_exn - Dal_plugin.proto_parameters_encoding - proto_parameters - in let* () = let oc = Process_worker.output_channel amplificator.process in - let* r = Process_worker.write_message oc bytes_proto_parameters in + let* r = Process_worker.write_message oc (Bytes.of_string "0 Ready") in match r with | `End_of_file -> fail @@ -423,7 +389,7 @@ let init amplificator node_context = ] | `Write_ok -> return_unit in - return_unit + return amplificator let enqueue_job_shards_proof amplificator commitment slot_id proto_parameters shards = @@ -432,12 +398,10 @@ let enqueue_job_shards_proof amplificator commitment slot_id proto_parameters let shards = Data_encoding.(Binary.to_bytes_exn (list Cryptobox.shard_encoding)) shards in - (* Should be atomic incr in the context of a lwt concurrency * Could overflow some day on 32 bits machines, more difficult on 64 *) let query_id = amplificator.query_id in amplificator.query_id <- amplificator.query_id + 1 ; - (* Store some arguments in a query table, because they are necessary to publish, but not to calculate, so avoid the cost of serializing them *) let () = @@ -458,131 +422,111 @@ let amplify node_store commitment (slot_id : Types.slot_id) ~number_of_already_stored_shards ~number_of_shards ~number_of_needed_shards proto_parameters amplificator = let open Lwt_result_syntax in - match amplificator with - | None -> - let*! () = Event.(emit amplificator_uninitialized ()) in - return_unit - | Some amplificator -> - Dal_metrics.reconstruction_started () ; - let*! () = - Event.( - emit - reconstruct_started - ( slot_id.slot_level, - slot_id.slot_index, - number_of_already_stored_shards, - number_of_shards )) - in - let shards = - Store.(Shards.read_all node_store.shards slot_id ~number_of_shards) - |> Seq_s.filter_map (function - | _, index, Ok share -> Some Cryptobox.{index; share} - | _ -> None) - in - let*? shards = - Seq_s.take - ~when_negative_length:[error_of_exn (Invalid_argument "Seq_s.take")] - number_of_needed_shards - shards - in - (* Enqueue this reconstruction in a query_pipe to be sent to a reconstruction process *) - let* () = - enqueue_job_shards_proof - amplificator - commitment - slot_id - proto_parameters - shards - in - return_unit + Dal_metrics.reconstruction_started () ; + let*! () = + Event.( + emit + reconstruct_started + ( slot_id.slot_level, + slot_id.slot_index, + number_of_already_stored_shards, + number_of_shards )) + in + let shards = + Store.(Shards.read_all node_store.shards slot_id ~number_of_shards) + |> Seq_s.filter_map (function + | _, index, Ok share -> Some Cryptobox.{index; share} + | _ -> None) + in + let*? shards = + Seq_s.take + ~when_negative_length:[error_of_exn (Invalid_argument "Seq_s.take")] + number_of_needed_shards + shards + in + (* Enqueue this reconstruction in a query_pipe to be sent to a reconstruction process *) + let* () = + enqueue_job_shards_proof + amplificator + commitment + slot_id + proto_parameters + shards + in + return_unit -let try_amplification node_ctxt commitment (slot_id : Types.slot_id) - amplificator = +let try_amplification commitment (slot_id : Types.slot_id) amplificator = let open Lwt_result_syntax in - match Node_context.get_shards_proofs_precomputation node_ctxt with - | None -> - (* The prover SRS is not loaded so we cannot reconstruct slots - yet. *) + let node_ctxt = amplificator.node_ctxt in + let node_store = Node_context.get_store node_ctxt in + let proto_parameters = Node_context.get_proto_parameters node_ctxt in + let number_of_shards = + proto_parameters.cryptobox_parameters.number_of_shards + in + let redundancy_factor = + proto_parameters.cryptobox_parameters.redundancy_factor + in + let number_of_needed_shards = number_of_shards / redundancy_factor in + let* number_of_already_stored_shards = + Store.Shards.count_values node_store.shards slot_id + in + (* There are two situations where we don't want to reconstruct: + if we don't have enough shards or if we already have all the + shards. *) + if + number_of_already_stored_shards < number_of_needed_shards + || number_of_already_stored_shards = number_of_shards + then return_unit + else + (* We have enough shards to reconstruct the whole slot. *) + with_amplification_lock + node_ctxt + slot_id + ~on_error: + Event.( + fun err -> + emit reconstruct_error (slot_id.slot_level, slot_id.slot_index, err)) + @@ fun () -> + (* Wait a random delay between 1 and 2 seconds before starting + the reconstruction; this is to give some slack to receive + all the shards so that the reconstruction is not needed, and + also avoids having multiple nodes reconstruct at once. *) + let random_delay = + Constants.( + amplification_random_delay_min + +. Random.float + (amplification_random_delay_max -. amplification_random_delay_min)) + in + let*! () = + Event.( + emit + reconstruct_starting_in + (slot_id.slot_level, slot_id.slot_index, random_delay)) + in + let*! () = Lwt_unix.sleep random_delay in + (* Count again the stored shards because we may have received + more shards during the random delay. *) + let* number_of_already_stored_shards = + Store.Shards.count_values node_store.shards slot_id + in + (* If we have received all the shards while waiting the random + delay, there is no point in reconstructing anymore *) + if number_of_already_stored_shards = number_of_shards then ( let*! () = Event.( emit - reconstruct_missing_prover_srs + reconstruct_no_missing_shard (slot_id.slot_level, slot_id.slot_index)) in - return_unit - | Some _ -> - let node_store = Node_context.get_store node_ctxt in - let proto_parameters = Node_context.get_proto_parameters node_ctxt in - let number_of_shards = - proto_parameters.cryptobox_parameters.number_of_shards - in - let redundancy_factor = - proto_parameters.cryptobox_parameters.redundancy_factor - in - let number_of_needed_shards = number_of_shards / redundancy_factor in - let* number_of_already_stored_shards = - Store.Shards.count_values node_store.shards slot_id - in - (* There are two situations where we don't want to reconstruct: - if we don't have enough shards or if we already have all the - shards. *) - if - number_of_already_stored_shards < number_of_needed_shards - || number_of_already_stored_shards = number_of_shards - then return_unit - else - (* We have enough shards to reconstruct the whole slot. *) - with_amplification_lock - node_ctxt - slot_id - ~on_error: - Event.( - fun err -> - emit - reconstruct_error - (slot_id.slot_level, slot_id.slot_index, err)) - @@ fun () -> - (* Wait a random delay between 1 and 2 seconds before starting - the reconstruction; this is to give some slack to receive - all the shards so that the reconstruction is not needed, and - also avoids having multiple nodes reconstruct at once. *) - let random_delay = - Constants.( - amplification_random_delay_min - +. Random.float - (amplification_random_delay_max - -. amplification_random_delay_min)) - in - let*! () = - Event.( - emit - reconstruct_starting_in - (slot_id.slot_level, slot_id.slot_index, random_delay)) - in - let*! () = Lwt_unix.sleep random_delay in - (* Count again the stored shards because we may have received - more shards during the random delay. *) - let* number_of_already_stored_shards = - Store.Shards.count_values node_store.shards slot_id - in - (* If we have received all the shards while waiting the random - delay, there is no point in reconstructing anymore *) - if number_of_already_stored_shards = number_of_shards then ( - let*! () = - Event.( - emit - reconstruct_no_missing_shard - (slot_id.slot_level, slot_id.slot_index)) - in - Dal_metrics.reconstruction_aborted () ; - return_unit) - else - amplify - node_store - commitment - slot_id - ~number_of_already_stored_shards - ~number_of_shards - ~number_of_needed_shards - proto_parameters - amplificator + Dal_metrics.reconstruction_aborted () ; + return_unit) + else + amplify + node_store + commitment + slot_id + ~number_of_already_stored_shards + ~number_of_shards + ~number_of_needed_shards + proto_parameters + amplificator diff --git a/src/bin_dal_node/amplificator.mli b/src/bin_dal_node/amplificator.mli index bcbb6cdb9e1b245c575700f77992a228f51b1ff4..e779ad6c16b67e607d301bd4c45fcc73e7d10163 100644 --- a/src/bin_dal_node/amplificator.mli +++ b/src/bin_dal_node/amplificator.mli @@ -37,15 +37,7 @@ type t main process of the DAL node but by the process provided in the [amplificator] argument. *) val try_amplification : - Node_context.t -> - Cryptobox.Commitment.t -> - Types.slot_id -> - t option -> - unit tzresult Lwt.t + Cryptobox.Commitment.t -> Types.slot_id -> t -> unit tzresult Lwt.t (** Creates a new amplificator process *) -val make : unit -> t tzresult Lwt.t - -(** [init amplificator node_ctxt params] Initializes the amplificator [t] with - the current context. *) -val init : t -> Node_context.t -> unit tzresult Lwt.t +val make : Node_context.t -> t tzresult Lwt.t diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index 7b442f45bc55faa68c4693478140ea85fb9514dc..435698b382b64dd9ef0d543a5ec70ac6a57522d7 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -636,12 +636,13 @@ let connect_gossipsub_with_p2p gs_worker transport_layer node_store node_ctxt Profile_manager.get_profiles @@ Node_context.get_profile_ctxt node_ctxt with | Operator profile - when Operator_profile.is_observed_slot slot_index profile -> - Amplificator.try_amplification - node_ctxt - commitment - slot_id - amplificator + when Operator_profile.is_observed_slot slot_index profile -> ( + match amplificator with + | None -> + let*! () = Event.(emit amplificator_uninitialized ()) in + return_unit + | Some amplificator -> + Amplificator.try_amplification commitment slot_id amplificator) | _ -> return_unit in Lwt.dont_wait @@ -1083,28 +1084,12 @@ let run ~data_dir ~configuration_override = head_level first_seen_level in - (* Initialize the crypto process *) (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5743 Instead of recomputing these parameters, they could be stored (for a given cryptobox). *) let* cryptobox, shards_proofs_precomputation = init_cryptobox config proto_parameters in - let* amplificator = - (* TODO: https://gitlab.com/tezos/tezos/-/issues/7452 - The amplificator also initializes the crypto. This could be avoided. - Also, is this the right moment to start the amplificator? *) - if Profile_manager.is_prover_profile profile_ctxt then - let* amplificator = Amplificator.make () in - return_some amplificator - else return_none - in - (* This must be done after the amplificator starts. *) - let*! metrics_server = Metrics.launch config.metrics_addr in - (* Set value size hooks. *) - Value_size_hooks.set_share_size - (Cryptobox.Internal_for_tests.encoded_share_size cryptobox) ; - Value_size_hooks.set_number_of_slots proto_parameters.number_of_slots ; let ctxt = Node_context.init config @@ -1117,8 +1102,24 @@ let run ~data_dir ~configuration_override = gs_worker transport_layer cctxt - metrics_server in + let is_prover_profile = Profile_manager.is_prover_profile profile_ctxt in + (* Initialize amplificator if in prover profile. + This forks a process and should be kept early to avoid copying opened file + descriptors. *) + let* amplificator = + if not is_prover_profile then return_none + else + let* amplificator = Amplificator.make ctxt in + return_some amplificator + in + (* Starts the metrics *after* the amplificator fork, to avoid forked opened + sockets *) + let*! _metrics_server = Metrics.launch config.metrics_addr in + (* Set value size hooks. *) + Value_size_hooks.set_share_size + (Cryptobox.Internal_for_tests.encoded_share_size cryptobox) ; + Value_size_hooks.set_number_of_slots proto_parameters.number_of_slots ; (* Start RPC server. We do that before the waiting for the L1 node to be bootstrapped so that queries can already be issued. Note that that the node will thus already respond to the baker about shards status if queried. *) @@ -1167,11 +1168,6 @@ let run ~data_dir ~configuration_override = in return crawler in - let* () = - match amplificator with - | None -> return_unit - | Some amplificator -> Amplificator.init amplificator ctxt - in (* Activate the p2p instance. *) connect_gossipsub_with_p2p gs_worker transport_layer store ctxt amplificator ; let*! () = diff --git a/src/bin_dal_node/errors.ml b/src/bin_dal_node/errors.ml index 5c03fb05f3b711ba486b29c06c9b8925513ba506..7827e8dbe62c7025505984016d6b786cc4700968 100644 --- a/src/bin_dal_node/errors.ml +++ b/src/bin_dal_node/errors.ml @@ -23,7 +23,7 @@ (* *) (*****************************************************************************) -(** Extention of the open type [error] with the errors that could be raised by +(** Extension of the open type [error] with the errors that could be raised by the DAL node. *) type error += | Decoding_failed of Types.Store.kind @@ -32,6 +32,7 @@ type error += | Cryptobox_initialisation_failed of string | Not_enough_history of {stored_levels : int; minimal_levels : int} | Not_enough_l1_history of {stored_cycles : int; minimal_cycles : int} + | Amplificator_initialization_failed (* TODO: https://gitlab.com/tezos/tezos/-/issues/4622 @@ -136,7 +137,15 @@ let () = Some (stored_cycles, minimal_cycles) | _ -> None) (fun (stored_cycles, minimal_cycles) -> - Not_enough_l1_history {stored_cycles; minimal_cycles}) + Not_enough_l1_history {stored_cycles; minimal_cycles}) ; + register_error_kind + `Permanent + ~id:"dal.node.amplificator_initialization_failed" + ~title:"Amplificator initialization failed" + ~description:"Amplificator initialization failed" + Data_encoding.empty + (function Amplificator_initialization_failed -> Some () | _ -> None) + (fun () -> Amplificator_initialization_failed) (** This part defines and handles more elaborate errors for the DAL node. *) diff --git a/src/bin_dal_node/errors.mli b/src/bin_dal_node/errors.mli index fc5e70cfb55f872051a6f0d2b1176f91b85fd8b1..82a54694b39244e512dbb06e80024aa476c948d1 100644 --- a/src/bin_dal_node/errors.mli +++ b/src/bin_dal_node/errors.mli @@ -32,6 +32,7 @@ type error += | Cryptobox_initialisation_failed of string | Not_enough_history of {stored_levels : int; minimal_levels : int} | Not_enough_l1_history of {stored_cycles : int; minimal_cycles : int} + | Amplificator_initialization_failed (** The errors below are used to extend tzresult/tztrace monad/errors with Some specific errors on which we'd like to match in the DAL node's code. *) diff --git a/src/bin_dal_node/event.ml b/src/bin_dal_node/event.ml index a58aaf659a4e74e02b9901bcf35ca6e8c589647f..f648c560607d4f25b79fd85d99342fb7e4a4e8e8 100644 --- a/src/bin_dal_node/event.ml +++ b/src/bin_dal_node/event.ml @@ -399,19 +399,6 @@ let saving_profiles_failed = ~level:Error ("error", Error_monad.trace_encoding) -let reconstruct_missing_prover_srs = - declare_2 - ~section - ~name:"reconstruct_missing_prover_srs" - ~msg: - "Missing prover SRS, reconstruction for the level {level} and slot index \ - {slot_index} was skipped." - ~level:Warning - ~pp1:(fun fmt -> Format.fprintf fmt "%ld") - ("level", Data_encoding.int32) - ~pp2:Format.pp_print_int - ("slot_index", Data_encoding.int31) - let reconstruct_starting_in = declare_3 ~section diff --git a/src/bin_dal_node/node_context.ml b/src/bin_dal_node/node_context.ml index 156f845da5037323b2b979fe7a241f1d1f5e6507..4a831ffa3a0c39cb8c2af7b6fefd5a50df26b5a5 100644 --- a/src/bin_dal_node/node_context.ml +++ b/src/bin_dal_node/node_context.ml @@ -40,12 +40,10 @@ type t = { gs_worker : Gossipsub.Worker.t; transport_layer : Gossipsub.Transport_layer.t; mutable profile_ctxt : Profile_manager.t; - metrics_server : Metrics.t; } let init config profile_ctxt cryptobox shards_proofs_precomputation - proto_parameters proto_plugins store gs_worker transport_layer cctxt - metrics_server = + proto_parameters proto_plugins store gs_worker transport_layer cctxt = let neighbors_cctxts = List.map (fun Configuration_file.{addr; port} -> @@ -71,7 +69,6 @@ let init config profile_ctxt cryptobox shards_proofs_precomputation gs_worker; transport_layer; profile_ctxt; - metrics_server; } let may_reconstruct ~reconstruct slot_id t = diff --git a/src/bin_dal_node/node_context.mli b/src/bin_dal_node/node_context.mli index 011ef32b5741791d221f98cc1708a4adea472285..83c051fd8744525c899ef38560b0b181f9cbefb7 100644 --- a/src/bin_dal_node/node_context.mli +++ b/src/bin_dal_node/node_context.mli @@ -40,7 +40,6 @@ val init : Gossipsub.Worker.t -> Gossipsub.Transport_layer.t -> Tezos_rpc.Context.generic -> - Metrics.t -> t (** Returns all the registered plugins *)