From 0825c7131c5e81b2f832604b589cd09769f16ae0 Mon Sep 17 00:00:00 2001 From: Eugen Zalinescu Date: Thu, 18 Jul 2024 16:31:04 +0200 Subject: [PATCH 1/3] DAL/Node: do not do work in Node_context.set_ready --- src/bin_dal_node/daemon.ml | 27 +++++++++++++++++++++++++-- src/bin_dal_node/node_context.ml | 26 ++------------------------ src/bin_dal_node/node_context.mli | 11 ++++------- 3 files changed, 31 insertions(+), 33 deletions(-) diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index a0d52bf4586f..004d4195305b 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -315,14 +315,37 @@ module Handler = struct let level = match last_notified_level with None -> level | Some level -> level in + let profile_ctxt = Node_context.get_profile_ctxt ctxt in + let*? () = + Profile_manager.validate_slot_indexes + profile_ctxt + ~number_of_slots:proto_parameters.number_of_slots + in + let* proto_plugins = + (* We resolve the plugins for all levels starting with [first_level]. It + is currently not necessary to go as far in the past, because only the + protocol parameters for these past levels are needed, and these do + not change for now (and are not retrieved for these past + levels). However, if/when they do change, it will be necessary to + retrieve them, using the right plugins. *) + let relevant_period = + Profile_manager.get_attested_data_default_store_period + profile_ctxt + proto_parameters + in + let first_level = + Int32.max 1l (Int32.sub level (Int32.of_int relevant_period)) + in + Proto_plugins.initial_plugins cctxt ~first_level ~last_level:level + in + let* () = Node_context.set_ready ctxt - cctxt cryptobox shards_proofs_precomputation proto_parameters - ~level + proto_plugins in let*! () = Event.(emit node_is_ready ()) in stopper () ; diff --git a/src/bin_dal_node/node_context.ml b/src/bin_dal_node/node_context.ml index 4475f61a9bbd..4e41b2c8800c 100644 --- a/src/bin_dal_node/node_context.ml +++ b/src/bin_dal_node/node_context.ml @@ -114,33 +114,11 @@ let wait_for_ready_state ctxt = | Ready _ -> return_unit | Starting s -> s.started_promise -let set_ready ctxt cctxt cryptobox shards_proofs_precomputation proto_parameters - ~level = +let set_ready ctxt cryptobox shards_proofs_precomputation proto_parameters + proto_plugins = let open Lwt_result_syntax in match ctxt.status with | Starting starting_status -> - let*? () = - Profile_manager.validate_slot_indexes - ctxt.profile_ctxt - ~number_of_slots:proto_parameters.Dal_plugin.number_of_slots - in - let* proto_plugins = - (* We resolve the plugins for all levels starting with [first_level]. It - is currently not necessary to go as far in the past, because only the - protocol parameters for these past levels are needed, and these do - not change for now (and are not retrieved for these past - levels). However, if/when they do change, it will be necessary to - retrieve them, using the right plugins. *) - let relevant_period = - Profile_manager.get_attested_data_default_store_period - ctxt.profile_ctxt - proto_parameters - in - let first_level = - Int32.max 1l (Int32.sub level (Int32.of_int relevant_period)) - in - Proto_plugins.initial_plugins cctxt ~first_level ~last_level:level - in ctxt.status <- Ready { diff --git a/src/bin_dal_node/node_context.mli b/src/bin_dal_node/node_context.mli index 68dc77f69a48..9fc2daf55ef0 100644 --- a/src/bin_dal_node/node_context.mli +++ b/src/bin_dal_node/node_context.mli @@ -76,20 +76,17 @@ val init : (** Raised by [set_ready] when the status is already [Ready _] *) exception Status_already_ready -(** [set_ready ctxt rpc_ctxt cryptobox - shards_proofs_precomputation proto_parameters ~level] updates in place the - status value to [Ready], and initializes the inner [ready_ctxt] value with - the given parameters, except [level] which should represent the current - level of the L1 node and is used to determine the initial protocol plugins. +(** [set_ready ctxt cryptobox shards_proofs_precomputation proto_parameters + proto_plugins] updates in place the status value to [Ready], and initializes + the inner [ready_ctxt] value with the given parameters. @raise Status_already_ready when the status is already [Ready _] *) val set_ready : t -> - Rpc_context.t -> Cryptobox.t -> Cryptobox.shards_proofs_precomputation option -> Dal_plugin.proto_parameters -> - level:Int32.t -> + Proto_plugins.t -> unit tzresult Lwt.t (** Returns all the registered plugins *) -- GitLab From 47ebaf9bcc8ff9655c637fbc88110b4faaec0679 Mon Sep 17 00:00:00 2001 From: Eugen Zalinescu Date: Fri, 19 Jul 2024 11:27:36 +0200 Subject: [PATCH 2/3] DAL/Node: move ready context into Node_context.t --- src/bin_dal_node/RPC_server.ml | 63 +++--- src/bin_dal_node/amplificator.ml | 51 +++-- src/bin_dal_node/amplificator.mli | 10 +- src/bin_dal_node/daemon.ml | 325 +++++++++++------------------- src/bin_dal_node/node_context.ml | 123 ++++------- src/bin_dal_node/node_context.mli | 88 ++------ src/bin_dal_node/slot_manager.ml | 21 +- src/bin_dal_node/slot_manager.mli | 15 +- 8 files changed, 250 insertions(+), 446 deletions(-) diff --git a/src/bin_dal_node/RPC_server.ml b/src/bin_dal_node/RPC_server.ml index d1cadde5fdb3..42151bff3a18 100644 --- a/src/bin_dal_node/RPC_server.ml +++ b/src/bin_dal_node/RPC_server.ml @@ -27,14 +27,7 @@ open Tezos_rpc_http open Tezos_rpc_http_server -let call_handler1 ctxt handler = - handler (Node_context.get_store ctxt) |> Errors.to_option_tzresult - -let call_handler2 ctxt handler = - let open Lwt_result_syntax in - let*? ready_ctxt = Node_context.get_ready ctxt in - let store = Node_context.get_store ctxt in - handler store ready_ctxt |> Errors.to_option_tzresult +let call_handler1 handler = handler () |> Errors.to_option_tzresult type error += | Cryptobox_error of string * string @@ -84,14 +77,9 @@ let () = module Slots_handlers = struct let get_slot_content ctxt slot_level slot_index () () = - call_handler2 ctxt (fun store ({cryptobox; _} as ctxt) -> + call_handler1 (fun () -> let slot_id : Types.slot_id = {slot_level; slot_index} in - Slot_manager.get_slot_content - ~reconstruct_if_missing:true - ctxt - store - cryptobox - slot_id) + Slot_manager.get_slot_content ~reconstruct_if_missing:true ctxt slot_id) let commitment_proof_from_polynomial cryptobox polynomial = let open Result_syntax in @@ -113,18 +101,17 @@ module Slots_handlers = struct | Ok proof -> return proof let get_slot_page_proof ctxt slot_level slot_index page_index () () = - call_handler2 ctxt (fun store ({cryptobox; _} as ctxt) -> + call_handler1 (fun () -> let open Lwt_result_syntax in let slot_id : Types.slot_id = {slot_level; slot_index} in let* content = Slot_manager.get_slot_content ~reconstruct_if_missing:true ctxt - store - cryptobox slot_id in let*! proof = + let cryptobox = Node_context.get_cryptobox ctxt in let*? polynomial = Cryptobox.polynomial_from_slot cryptobox content in let*? proof = Cryptobox.prove_page cryptobox polynomial page_index in return proof @@ -144,11 +131,11 @@ module Slots_handlers = struct fail (Errors.other [Cryptobox_error ("get_slot_page_proof", msg)])) let post_slot ctxt query slot = - call_handler2 - ctxt - (fun store {cryptobox; shards_proofs_precomputation; proto_parameters; _} - -> + call_handler1 (fun () -> let open Lwt_result_syntax in + let store = Node_context.get_store ctxt in + let cryptobox = Node_context.get_cryptobox ctxt in + let proto_parameters = Node_context.get_proto_parameters ctxt in if not (Profile_manager.is_prover_profile @@ -177,6 +164,9 @@ module Slots_handlers = struct let*? commitment_proof = commitment_proof_from_polynomial cryptobox polynomial in + let shards_proofs_precomputation = + Node_context.get_shards_proofs_precomputation ctxt + in let* () = Slot_manager.add_commitment_shards ~shards_proofs_precomputation @@ -189,17 +179,16 @@ module Slots_handlers = struct return (commitment, commitment_proof)) let get_slot_commitment ctxt slot_level slot_index () () = - call_handler2 ctxt (fun store ({cryptobox; _} as ctxt) -> + call_handler1 (fun () -> let open Lwt_result_syntax in let slot_id : Types.slot_id = {slot_level; slot_index} in let* content = Slot_manager.get_slot_content ~reconstruct_if_missing:true ctxt - store - cryptobox slot_id in + let cryptobox = Node_context.get_cryptobox ctxt in let*? polynomial = Slot_manager.polynomial_from_slot cryptobox content in @@ -207,31 +196,29 @@ module Slots_handlers = struct return commitment) let get_slot_status ctxt slot_level slot_index () () = - call_handler1 ctxt (fun store -> + call_handler1 (fun () -> + let store = Node_context.get_store ctxt in let slot_id : Types.slot_id = {slot_level; slot_index} in Slot_manager.get_slot_status ~slot_id store) let get_slot_shard ctxt slot_level slot_index shard_index () () = - call_handler1 ctxt (fun node_store -> + call_handler1 (fun () -> + let store = Node_context.get_store ctxt in let slot_id : Types.slot_id = {slot_level; slot_index} in - Slot_manager.get_slot_shard node_store slot_id shard_index) + Slot_manager.get_slot_shard store slot_id shard_index) let get_slot_pages ctxt slot_level slot_index () () = - call_handler2 ctxt (fun node_store ({cryptobox; _} as ctxt) -> + call_handler1 (fun () -> let slot_id : Types.slot_id = {slot_level; slot_index} in - Slot_manager.get_slot_pages - ~reconstruct_if_missing:true - cryptobox - ctxt - node_store - slot_id) + Slot_manager.get_slot_pages ~reconstruct_if_missing:true ctxt slot_id) end module Profile_handlers = struct let patch_profiles ctxt () operator_profiles = let open Lwt_result_syntax in let gs_worker = Node_context.get_gs_worker ctxt in - call_handler2 ctxt (fun _store {proto_parameters; _} -> + call_handler1 (fun () -> + let proto_parameters = Node_context.get_proto_parameters ctxt in match Profile_manager.add_operator_profiles (Node_context.get_profile_ctxt ctxt) @@ -283,7 +270,8 @@ module Profile_handlers = struct let* flags = List.map_es are_shards_stored all_slot_indexes in return (Types.Attestable_slots {slots = flags; published_level}) in - call_handler2 ctxt (fun store {proto_parameters; _} -> + call_handler1 (fun () -> + let store = Node_context.get_store ctxt in (* For retrieving the assigned shard indexes, we consider the committee at [attested_level - 1], because the (DAL) attestations in the blocks at level [attested_level] refer to the predecessor level. *) @@ -296,6 +284,7 @@ module Profile_handlers = struct ~level:attestation_level |> Errors.other_lwt_result in + let proto_parameters = Node_context.get_proto_parameters ctxt in get_attestable_slots ~shard_indices store diff --git a/src/bin_dal_node/amplificator.ml b/src/bin_dal_node/amplificator.ml index 28e776cd01e1..90d2f7afcdf8 100644 --- a/src/bin_dal_node/amplificator.ml +++ b/src/bin_dal_node/amplificator.ml @@ -86,19 +86,24 @@ let () = If the promise raises an exception, it is turned into an error using [Error_monad.fail_with_exn] and passed to the [on_error] handler after the release of the lock. *) -let with_amplification_lock (ready_ctxt : Node_context.ready_ctxt) slot_id - ~on_error (f : unit -> unit tzresult Lwt.t) = +let with_amplification_lock (ctxt : Node_context.t) slot_id ~on_error + (f : unit -> unit tzresult Lwt.t) = let open Lwt_result_syntax in let open Types.Slot_id.Set in let acquire_lock () = - ready_ctxt.ongoing_amplifications <- - add slot_id ready_ctxt.ongoing_amplifications + let ongoing_amplifications = Node_context.get_ongoing_amplifications ctxt in + Node_context.set_ongoing_amplifications + ctxt + (add slot_id ongoing_amplifications) in let release_lock () = - ready_ctxt.ongoing_amplifications <- - remove slot_id ready_ctxt.ongoing_amplifications + let ongoing_amplifications = Node_context.get_ongoing_amplifications ctxt in + Node_context.set_ongoing_amplifications + ctxt + (remove slot_id ongoing_amplifications) in - if mem slot_id ready_ctxt.ongoing_amplifications then return_unit + let ongoing_amplifications = Node_context.get_ongoing_amplifications ctxt in + if mem slot_id ongoing_amplifications then return_unit else let () = acquire_lock () in let () = @@ -370,8 +375,10 @@ let make () = in return amplificator -let init amplificator node_context proto_parameters = +let init amplificator node_context = let open Lwt_result_syntax in + let proto_parameters = Node_context.get_proto_parameters node_context in + (* Run a job enqueueing all shards calculation tasks *) let amplificator_query_sender_job = let*! r = query_sender_job amplificator in @@ -489,16 +496,11 @@ let amplify node_store commitment (slot_id : Types.slot_id) in return_unit -let try_amplification (node_store : Store.t) commitment - (slot_id : Types.slot_id) node_ctxt amplificator = +let try_amplification node_ctxt commitment (slot_id : Types.slot_id) + amplificator = let open Lwt_result_syntax in - let*! () = Node_context.wait_for_ready_state node_ctxt in - match Node_context.get_status node_ctxt with - | Starting _ -> - (* The node is supposed to be ready thanks to [wait_for_ready_state] - above. *) - assert false - | Ready {shards_proofs_precomputation = None; _} -> + match Node_context.get_shards_proofs_precomputation node_ctxt with + | None -> (* The prover SRS is not loaded so we cannot reconstruct slots yet. *) let*! () = @@ -508,10 +510,15 @@ let try_amplification (node_store : Store.t) commitment (slot_id.slot_level, slot_id.slot_index)) in return_unit - | Ready ({cryptobox; proto_parameters; _} as ready_ctxt) -> - let dal_parameters = Cryptobox.parameters cryptobox in - let number_of_shards = dal_parameters.number_of_shards in - let redundancy_factor = dal_parameters.redundancy_factor in + | Some _ -> + let node_store = Node_context.get_store node_ctxt in + let proto_parameters = Node_context.get_proto_parameters node_ctxt in + let number_of_shards = + proto_parameters.cryptobox_parameters.number_of_shards + in + let redundancy_factor = + proto_parameters.cryptobox_parameters.redundancy_factor + in let number_of_needed_shards = number_of_shards / redundancy_factor in let* number_of_already_stored_shards = Store.Shards.count_values node_store.shards slot_id @@ -526,7 +533,7 @@ let try_amplification (node_store : Store.t) commitment else (* We have enough shards to reconstruct the whole slot. *) with_amplification_lock - ready_ctxt + node_ctxt slot_id ~on_error: Event.( diff --git a/src/bin_dal_node/amplificator.mli b/src/bin_dal_node/amplificator.mli index e7d21181cbaf..bcbb6cdb9e1b 100644 --- a/src/bin_dal_node/amplificator.mli +++ b/src/bin_dal_node/amplificator.mli @@ -17,7 +17,7 @@ (** An amplificator process. *) type t -(** [try_amplification node_store commitment slot_id node_ctxt amplificator] +(** [try_amplification node_ctxt commitment slot_id amplificator] triggers an amplification, ie the reconstruction and publication of a partial set of shards. It is called each time a new shard is received by an observer node, after being added to the shard store @@ -37,10 +37,9 @@ type t main process of the DAL node but by the process provided in the [amplificator] argument. *) val try_amplification : - Store.t -> + Node_context.t -> Cryptobox.Commitment.t -> Types.slot_id -> - Node_context.t -> t option -> unit tzresult Lwt.t @@ -48,6 +47,5 @@ val try_amplification : val make : unit -> t tzresult Lwt.t (** [init amplificator node_ctxt params] Initializes the amplificator [t] with - the current context and proto parameters *) -val init : - t -> Node_context.t -> Dal_plugin.proto_parameters -> unit tzresult Lwt.t + the current context. *) +val init : t -> Node_context.t -> unit tzresult Lwt.t diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index 004d4195305b..b47ed59e81cf 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -61,32 +61,6 @@ let init_cryptobox config (proto_parameters : Dal_plugin.proto_parameters) = | Error (`Fail msg) -> fail [Errors.Cryptobox_initialisation_failed msg] module Handler = struct - (** [make_stream_daemon handler streamed_call] calls [handler] on each newly - received value from [streamed_call]. - - It returns a couple [(p, stopper)] where [p] is a promise resolving when the - stream closes and [stopper] a function closing the stream. - *) - let make_stream_daemon handle streamed_call = - let open Lwt_result_syntax in - let* stream, stopper = streamed_call in - let rec go () = - let*! tok = Lwt_stream.get stream in - match tok with - | None -> return_unit - | Some element -> - let*! r = handle stopper element in - let*! () = - match r with - | Ok () -> Lwt.return_unit - | Error trace -> - let*! () = Event.(emit daemon_error) trace in - Lwt.return_unit - in - go () - in - return (go (), stopper) - (* [gossipsub_app_message_payload_validation cryptobox message message_id] allows checking whether the given [message] identified by [message_id] is valid with the current [cryptobox] parameters. The validity check is done @@ -265,101 +239,62 @@ module Handler = struct let*! () = Event.(emit loading_profiles_failed err) in return config.Configuration_file.profile - (* Set the profile context once we have the protocol plugin. This is supposed + (* Registers the attester profile context once we have the protocol plugin. This is supposed to be called only once. *) - let set_profile_context ctxt config proto_parameters = + let update_and_register_profiles ctxt = let open Lwt_result_syntax in - let* profile = build_profile_context config in - let pctxt_opt = + let profile_ctxt = Node_context.get_profile_ctxt ctxt in + let gs_worker = Node_context.get_gs_worker ctxt in + let proto_parameters = Node_context.get_proto_parameters ctxt in + let profile_ctxt_opt = Profile_manager.add_profiles Profile_manager.empty proto_parameters - (Node_context.get_gs_worker ctxt) - profile + gs_worker + profile_ctxt in - match pctxt_opt with + match profile_ctxt_opt with | None -> fail Errors.[Profile_incompatibility] - | Some pctxt -> - let*! () = Node_context.set_profile_ctxt ctxt pctxt in + | Some profile_ctxt -> + let*! () = Node_context.set_profile_ctxt ctxt profile_ctxt in return_unit - let resolve_plugin_and_set_ready config ctxt cctxt ?last_notified_level - amplificator () = - (* Monitor heads and try resolve the DAL protocol plugin corresponding to - the protocol of the targeted node. *) + let set_ready cctxt config profile_ctxt last_processed_level + (head_level, (module Plugin : Dal_plugin.T), proto_parameters) = let open Lwt_result_syntax in - let handler stopper (block_hash, block_header) = - let block = `Hash (block_hash, 0) in - let level = block_header.Block_header.shell.level in - let* (module Dal_plugin : Dal_plugin.T) = - Proto_plugins.resolve_plugin_for_level cctxt ~level - in - let* proto_parameters = Dal_plugin.get_constants `Main block cctxt in - (* Initialize the crypto process *) - let* () = - match amplificator with - | None -> return_unit - | Some amplificator -> - Amplificator.init amplificator ctxt proto_parameters - in - (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5743 - Instead of recomputing these parameters, they could be stored - (for a given cryptobox). *) - let* cryptobox, shards_proofs_precomputation = - init_cryptobox config proto_parameters - in - Value_size_hooks.set_share_size - (Cryptobox.Internal_for_tests.encoded_share_size cryptobox) ; - Value_size_hooks.set_number_of_slots proto_parameters.number_of_slots ; - let* () = set_profile_context ctxt config proto_parameters in + (* Initialize the crypto process *) + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5743 + Instead of recomputing these parameters, they could be stored + (for a given cryptobox). *) + let* cryptobox, shards_proofs_precomputation = + init_cryptobox config proto_parameters + in + Value_size_hooks.set_share_size + (Cryptobox.Internal_for_tests.encoded_share_size cryptobox) ; + Value_size_hooks.set_number_of_slots proto_parameters.number_of_slots ; + let* proto_plugins = + (* We resolve the plugins for all levels starting with [first_level]. It + is currently not necessary to go as far in the past, because only the + protocol parameters for these past levels are needed, and these do + not change for now (and are not retrieved for these past + levels). However, if/when they do change, it will be necessary to + retrieve them, using the right plugins. *) let level = - match last_notified_level with None -> level | Some level -> level + match last_processed_level with + | None -> head_level + | Some level -> level in - let profile_ctxt = Node_context.get_profile_ctxt ctxt in - let*? () = - Profile_manager.validate_slot_indexes + let relevant_period = + Profile_manager.get_attested_data_default_store_period profile_ctxt - ~number_of_slots:proto_parameters.number_of_slots - in - let* proto_plugins = - (* We resolve the plugins for all levels starting with [first_level]. It - is currently not necessary to go as far in the past, because only the - protocol parameters for these past levels are needed, and these do - not change for now (and are not retrieved for these past - levels). However, if/when they do change, it will be necessary to - retrieve them, using the right plugins. *) - let relevant_period = - Profile_manager.get_attested_data_default_store_period - profile_ctxt - proto_parameters - in - let first_level = - Int32.max 1l (Int32.sub level (Int32.of_int relevant_period)) - in - Proto_plugins.initial_plugins cctxt ~first_level ~last_level:level - in - - let* () = - Node_context.set_ready - ctxt - cryptobox - shards_proofs_precomputation proto_parameters - proto_plugins in - let*! () = Event.(emit node_is_ready ()) in - stopper () ; - return_unit - in - let handler stopper el = - match Node_context.get_status ctxt with - | Starting _ -> handler stopper el - | Ready _ -> return_unit + let first_level = + Int32.max 1l (Int32.sub level (Int32.of_int relevant_period)) + in + Proto_plugins.initial_plugins cctxt ~first_level ~last_level:level in - let*! () = Event.(emit layer1_node_tracking_started_for_plugin ()) in - make_stream_daemon - handler - (Tezos_shell_services.Monitor_services.heads cctxt `Main) + return (cryptobox, shards_proofs_precomputation, proto_plugins) let supports_refutations ctxt = let profile = Node_context.get_profile_ctxt ctxt in @@ -655,81 +590,63 @@ module Handler = struct let new_finalized_head ctxt cctxt crawler = let open Lwt_result_syntax in let stream = Crawler.finalized_heads_stream crawler in - let*! () = Node_context.wait_for_ready_state ctxt in let rec loop () = - match Node_context.get_status ctxt with - | Starting _ -> - (* The node is supposed to be ready thanks to [wait_for_ready_state] - above. *) - assert false - | Ready ready_ctxt -> ( - let Node_context. - { - proto_plugins = _; - proto_parameters; - cryptobox; - shards_proofs_precomputation = _; - ongoing_amplifications = _; - slots_under_reconstruction = _; - } = - ready_ctxt - in - let*! next_final_head = Lwt_stream.get stream in - match next_final_head with - | None -> Lwt.fail_with "L1 crawler lib shut down" - | Some (_finalized_hash, finalized_shell_header) -> - let level = finalized_shell_header.level in - (* At each potential published_level [level], we prefetch the - committee for its corresponding attestation_level (that is: - level + attestation_lag - 1). This is in particular used by GS - messages ids validation that cannot depend on Lwt. *) - let* () = - if not proto_parameters.feature_enable then return_unit - else - let attestation_level = - Int32.( - pred - @@ add - level - (of_int proto_parameters.Dal_plugin.attestation_lag)) - in - let* _committee = - Node_context.fetch_committee ctxt ~level:attestation_level - in - return_unit + let proto_parameters = Node_context.get_proto_parameters ctxt in + let cryptobox = Node_context.get_cryptobox ctxt in + let*! next_final_head = Lwt_stream.get stream in + match next_final_head with + | None -> Lwt.fail_with "L1 crawler lib shut down" + | Some (_finalized_hash, finalized_shell_header) -> + let level = finalized_shell_header.level in + (* At each potential published_level [level], we prefetch the + committee for its corresponding attestation_level (that is: + level + attestation_lag - 1). This is in particular used by GS + messages ids validation that cannot depend on Lwt. *) + let* () = + if not proto_parameters.feature_enable then return_unit + else + let attestation_level = + Int32.( + pred + @@ add + level + (of_int proto_parameters.Dal_plugin.attestation_lag)) in - let* () = - (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7291 - We should use the head level instead. *) - Node_context.may_add_plugin - ctxt - cctxt - ~proto_level:finalized_shell_header.proto_level - ~block_level:level - in - Gossipsub.Worker.Validate_message_hook.set - (gossipsub_app_messages_validation - ctxt - cryptobox - level - proto_parameters) ; - let*! () = - remove_old_level_stored_data proto_parameters ctxt level - in - let* () = - if level = 1l then - (* We do not process the block at level 1, as it will not - contain DAL information, and it has no round. *) - return_unit - else - try_process_block - ~retries:Constants.crawler_retries_on_disconnection - ctxt - cctxt - proto_parameters - finalized_shell_header + let* _committee = + Node_context.fetch_committee ctxt ~level:attestation_level in - loop ()) + return_unit + in + let* () = + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7291 + We should use the head level instead. *) + Node_context.may_add_plugin + ctxt + cctxt + ~proto_level:finalized_shell_header.proto_level + ~block_level:level + in + Gossipsub.Worker.Validate_message_hook.set + (gossipsub_app_messages_validation + ctxt + cryptobox + level + proto_parameters) ; + let*! () = remove_old_level_stored_data proto_parameters ctxt level in + let* () = + if level = 1l then + (* We do not process the block at level 1, as it will not + contain DAL information, and it has no round. *) + return_unit + else + try_process_block + ~retries:Constants.crawler_retries_on_disconnection + ctxt + cctxt + proto_parameters + finalized_shell_header + in + loop () in let*! () = Event.(emit layer1_node_tracking_started ()) in loop () @@ -769,10 +686,9 @@ let connect_gossipsub_with_p2p gs_worker transport_layer node_store node_ctxt | Operator profile when Operator_profile.is_observed_slot slot_index profile -> Amplificator.try_amplification - node_store + node_ctxt commitment slot_id - node_ctxt amplificator | _ -> return_unit in @@ -928,13 +844,6 @@ let run ~data_dir ~configuration_override = in let*! () = Event.(emit configuration_loaded) () in - let* amplificator = - if Profile_manager.is_prover_profile config.Configuration_file.profile then - let* amplificator = Amplificator.make () in - return_some amplificator - else return_none - in - let cctxt = Rpc_context.make endpoint in let* dal_config = fetch_dal_config cctxt in (* Resolve: @@ -1008,16 +917,24 @@ let run ~data_dir ~configuration_override = let* last_processed_level_store = Last_processed_level.init ~root_dir:(Configuration_file.store_path config) in - let* last_notified_level = + let* last_processed_level = Last_processed_level.load_last_processed_level last_processed_level_store in (* First wait for the L1 node to be bootstrapped. *) let* () = wait_for_l1_bootstrapped cctxt in (* Check the DAL node's and L1 node's history mode. *) - let* ((_, _, proto_parameters) as _plugin_info) = get_head_plugin cctxt in + let* ((_, _, proto_parameters) as plugin_info) = get_head_plugin cctxt in let* profile_ctxt = Handler.build_profile_context config in + let*? () = + Profile_manager.validate_slot_indexes + profile_ctxt + ~number_of_slots:proto_parameters.number_of_slots + in let* () = check_history_mode config profile_ctxt proto_parameters in let* () = check_l1_history_mode profile_ctxt cctxt proto_parameters in + let* cryptobox, shards_proofs_precomputation, proto_plugins = + Handler.set_ready cctxt config profile_ctxt last_processed_level plugin_info + in let*! crawler = let open Constants in Crawler.start @@ -1025,12 +942,16 @@ let run ~data_dir ~configuration_override = ~chain:`Main ~reconnection_delay:initial_l1_crawler_reconnection_delay ~l1_blocks_cache_size:crawler_l1_blocks_cache_size - ?last_notified_level + ?last_notified_level:last_processed_level cctxt in let ctxt = Node_context.init config + cryptobox + shards_proofs_precomputation + proto_parameters + proto_plugins store gs_worker transport_layer @@ -1039,6 +960,18 @@ let run ~data_dir ~configuration_override = crawler last_processed_level_store in + let* amplificator = + if Profile_manager.is_prover_profile config.Configuration_file.profile then + let* amplificator = Amplificator.make () in + return_some amplificator + else return_none + in + let* () = + match amplificator with + | None -> return_unit + | Some amplificator -> Amplificator.init amplificator ctxt + in + let*! () = Event.(emit node_is_ready ()) in (* Start RPC server. *) let* rpc_server = RPC_server.(start config ctxt) in let _ = RPC_server.install_finalizer rpc_server in @@ -1049,21 +982,9 @@ let run ~data_dir ~configuration_override = Gossipsub.Transport_layer.activate ~additional_points:points transport_layer in let*! () = Event.(emit p2p_server_is_ready listen_addr) in + let* () = Handler.update_and_register_profiles ctxt in (* Start collecting stats related to the Gossipsub worker. *) Dal_metrics.collect_gossipsub_metrics gs_worker ; - (* Start daemon to resolve current protocol plugin *) - let* () = - daemonize - [ - Handler.resolve_plugin_and_set_ready - config - ctxt - cctxt - ?last_notified_level - amplificator - (); - ] - in (* Start never-ending monitoring daemons *) let* () = daemonize [Handler.new_finalized_head ctxt cctxt crawler] in return_unit diff --git a/src/bin_dal_node/node_context.ml b/src/bin_dal_node/node_context.ml index 4e41b2c8800c..4df1de2458dd 100644 --- a/src/bin_dal_node/node_context.ml +++ b/src/bin_dal_node/node_context.ml @@ -24,28 +24,15 @@ (* *) (*****************************************************************************) -exception Status_already_ready - -type ready_ctxt = { +type t = { + config : Configuration_file.t; cryptobox : Cryptobox.t; - proto_parameters : Dal_plugin.proto_parameters; - proto_plugins : Proto_plugins.t; shards_proofs_precomputation : Cryptobox.shards_proofs_precomputation option; + proto_parameters : Dal_plugin.proto_parameters; + mutable proto_plugins : Proto_plugins.t; mutable ongoing_amplifications : Types.Slot_id.Set.t; mutable slots_under_reconstruction : (bytes, Errors.other) result Lwt.t Types.Slot_id.Map.t; -} - -type starting_status = { - started_promise : unit Lwt.t; - started_resolver : unit Lwt.u; -} - -type status = Ready of ready_ctxt | Starting of starting_status - -type t = { - mutable status : status; - config : Configuration_file.t; store : Store.t; tezos_node_cctxt : Tezos_rpc.Context.generic; neighbors_cctxts : Dal_node_client.cctxt list; @@ -58,7 +45,8 @@ type t = { last_processed_level_store : Last_processed_level.t; } -let init config store gs_worker transport_layer cctxt metrics_server crawler +let init config cryptobox shards_proofs_precomputation proto_parameters + proto_plugins store gs_worker transport_layer cctxt metrics_server crawler last_processed_level_store = let neighbors_cctxts = List.map @@ -69,10 +57,14 @@ let init config store gs_worker transport_layer cctxt metrics_server crawler Dal_node_client.make_unix_cctxt endpoint) config.Configuration_file.neighbors in - let started_promise, started_resolver = Lwt.task () in { - status = Starting {started_promise; started_resolver}; config; + cryptobox; + shards_proofs_precomputation; + proto_parameters; + proto_plugins; + ongoing_amplifications = Types.Slot_id.Set.empty; + slots_under_reconstruction = Types.Slot_id.Map.empty; store; tezos_node_cctxt = cctxt; neighbors_cctxts; @@ -86,53 +78,6 @@ let init config store gs_worker transport_layer cctxt metrics_server crawler last_processed_level_store; } -type error += Node_not_ready - -let () = - register_error_kind - `Permanent - ~id:"dal.node.not.ready" - ~title:"DAL Node not ready" - ~description:"DAL node is starting. It's not ready to respond to RPCs." - ~pp:(fun ppf () -> - Format.fprintf - ppf - "DAL node is starting. It's not ready to respond to RPCs.") - Data_encoding.(unit) - (function Node_not_ready -> Some () | _ -> None) - (fun () -> Node_not_ready) - -let get_ready ctxt = - let open Result_syntax in - match ctxt.status with - | Ready ctxt -> Ok ctxt - | Starting _ -> fail [Node_not_ready] - -let wait_for_ready_state ctxt = - let open Lwt_syntax in - match ctxt.status with - | Ready _ -> return_unit - | Starting s -> s.started_promise - -let set_ready ctxt cryptobox shards_proofs_precomputation proto_parameters - proto_plugins = - let open Lwt_result_syntax in - match ctxt.status with - | Starting starting_status -> - ctxt.status <- - Ready - { - proto_plugins; - cryptobox; - proto_parameters; - shards_proofs_precomputation; - ongoing_amplifications = Types.Slot_id.Set.empty; - slots_under_reconstruction = Types.Slot_id.Map.empty; - } ; - Lwt.wakeup starting_status.started_resolver () ; - return_unit - | Ready _ -> raise Status_already_ready - let may_reconstruct ~reconstruct slot_id t = let open Lwt_result_syntax in let p = @@ -153,28 +98,20 @@ let may_reconstruct ~reconstruct slot_id t = let may_add_plugin ctxt cctxt ~block_level ~proto_level = let open Lwt_result_syntax in - match ctxt.status with - | Starting _ -> return_unit - | Ready ready_ctxt -> - let* proto_plugins = - Proto_plugins.may_add - cctxt - ready_ctxt.proto_plugins - ~first_level:block_level - ~proto_level - in - ctxt.status <- Ready {ready_ctxt with proto_plugins} ; - return_unit + let* proto_plugins = + Proto_plugins.may_add + cctxt + ctxt.proto_plugins + ~first_level:block_level + ~proto_level + in + ctxt.proto_plugins <- proto_plugins ; + return_unit let get_plugin_for_level ctxt ~level = - let open Result_syntax in - let* ready_ctxt = get_ready ctxt in - Proto_plugins.get_plugin_for_level ready_ctxt.proto_plugins ~level + Proto_plugins.get_plugin_for_level ctxt.proto_plugins ~level -let get_all_plugins ctxt = - match ctxt.status with - | Starting _ -> [] - | Ready {proto_plugins; _} -> Proto_plugins.to_list proto_plugins +let get_all_plugins ctxt = Proto_plugins.to_list ctxt.proto_plugins let storage_period ctxt proto_parameters = match ctxt.config.history_mode with @@ -220,7 +157,11 @@ let set_profile_ctxt ctxt ?(save = true) pctxt = let get_config ctxt = ctxt.config -let get_status ctxt = ctxt.status +let get_cryptobox ctxt = ctxt.cryptobox + +let get_proto_parameters ctxt = ctxt.proto_parameters + +let get_shards_proofs_precomputation ctxt = ctxt.shards_proofs_precomputation let get_last_processed_level_store ctxt = ctxt.last_processed_level_store @@ -232,15 +173,19 @@ let get_tezos_node_cctxt ctxt = ctxt.tezos_node_cctxt let get_neighbors_cctxts ctxt = ctxt.neighbors_cctxts +let get_ongoing_amplifications ctxt = ctxt.ongoing_amplifications + +let set_ongoing_amplifications ctxt ongoing_amplifications = + ctxt.ongoing_amplifications <- ongoing_amplifications + let fetch_committee ctxt ~level = let open Lwt_result_syntax in let {tezos_node_cctxt = cctxt; committee_cache = cache; _} = ctxt in match Committee_cache.find cache ~level with | Some committee -> return committee | None -> - let*? ready_ctxt = get_ready ctxt in let*? (module Plugin) = - Proto_plugins.get_plugin_for_level ready_ctxt.proto_plugins ~level + Proto_plugins.get_plugin_for_level ctxt.proto_plugins ~level in let+ committee = Plugin.get_committee cctxt ~level in Committee_cache.add cache ~level ~committee ; diff --git a/src/bin_dal_node/node_context.mli b/src/bin_dal_node/node_context.mli index 9fc2daf55ef0..a6872d555eef 100644 --- a/src/bin_dal_node/node_context.mli +++ b/src/bin_dal_node/node_context.mli @@ -24,46 +24,17 @@ (* *) (*****************************************************************************) -(** A [ready_ctx] value contains globally needed informations for a running dal - node. It is available when both cryptobox is initialized and the plugin - for dal has been loaded. - - A [ready_ctx] also has a field [shards_proofs_precomputation] that contains - the (costly) precomputation needed to get shard proofs. -*) -type ready_ctxt = { - cryptobox : Cryptobox.t; - proto_parameters : Dal_plugin.proto_parameters; - proto_plugins : Proto_plugins.t; - shards_proofs_precomputation : Cryptobox.shards_proofs_precomputation option; - mutable ongoing_amplifications : Types.Slot_id.Set.t; - (** The slot identifiers of the commitments currently being - amplified. This set is used to prevent concurrent amplifications - of the same slot. *) - mutable slots_under_reconstruction : - (bytes, Errors.other) result Lwt.t Types.Slot_id.Map.t; - (** Promises kept in a map to avoid parallel reconstructions in - the [Slot_manager] module. *) -} - -(** A promise that is fullfilled only when the status of the node evolves from - [Starting] to [Ready] state. *) -type starting_status - -(** The status of the dal node *) -type status = Ready of ready_ctxt | Starting of starting_status - -(** A [t] value contains both the status and the dal node configuration. It's - field are available through accessors *) +(** A [t] value contains the needed informations for a running a DAL node. Its + fields are available through accessors. *) type t -(** [init config store gs_worker transport_layer cctx crawler - last_processed_level_store] creates a [t] with a status set to [Starting] - using the given dal node configuration [config], node store [store], - gossipsub worker instance [gs_worker], transport layer instance - [transport_layer], and tezos node client context [cctx]. *) +(** [init] creates a [t] value based on the given arguments. *) val init : Configuration_file.t -> + Cryptobox.t -> + Cryptobox.shards_proofs_precomputation option -> + Dal_plugin.proto_parameters -> + Proto_plugins.t -> Store.t -> Gossipsub.Worker.t -> Gossipsub.Transport_layer.t -> @@ -73,22 +44,6 @@ val init : Last_processed_level.t -> t -(** Raised by [set_ready] when the status is already [Ready _] *) -exception Status_already_ready - -(** [set_ready ctxt cryptobox shards_proofs_precomputation proto_parameters - proto_plugins] updates in place the status value to [Ready], and initializes - the inner [ready_ctxt] value with the given parameters. - - @raise Status_already_ready when the status is already [Ready _] *) -val set_ready : - t -> - Cryptobox.t -> - Cryptobox.shards_proofs_precomputation option -> - Dal_plugin.proto_parameters -> - Proto_plugins.t -> - unit tzresult Lwt.t - (** Returns all the registered plugins *) val get_all_plugins : t -> (module Dal_plugin.T) list @@ -122,21 +77,9 @@ val may_add_plugin : val may_reconstruct : reconstruct:(Types.slot_id -> (bytes, Errors.other) result Lwt.t) -> Types.slot_id -> - ready_ctxt -> + t -> (bytes, Errors.other) result Lwt.t -type error += Node_not_ready - -(** [get_ready ctxt] extracts the [ready_ctxt] value from a context [t]. It - propagates [Node_not_ready] if status is not ready yet. If called multiple - times, it replaces current values for [ready_ctxt] with new ones *) -val get_ready : t -> ready_ctxt tzresult - -(** returns a promise that resolves when the ready state evolves from [Starting] - to [Ready]. The returned promise is already resolved if the node is already - in [Ready] state. *) -val wait_for_ready_state : t -> unit Lwt.t - (** [get_profile_ctxt ctxt] returns the profile context. *) val get_profile_ctxt : t -> Profile_manager.t @@ -151,8 +94,15 @@ val set_profile_ctxt : t -> ?save:bool -> Profile_manager.t -> unit Lwt.t (** [get_config ctxt] returns the dal node configuration *) val get_config : t -> Configuration_file.t -(** [get_status ctxt] returns the dal node status *) -val get_status : t -> status +(** [get_cryptobox ctxt] returns the DAL node's cryptobox *) +val get_cryptobox : t -> Cryptobox.t + +(** [get_proto_parameters ctxt] returns the DAL node's current protocol parameters. *) +val get_proto_parameters : t -> Dal_plugin.proto_parameters + +(** [get_shards_proofs_precomputation ctxt] returns the shards proof's precomputation. *) +val get_shards_proofs_precomputation : + t -> Cryptobox.shards_proofs_precomputation option (** [get_store ctxt] returns the dal node store. *) val get_store : t -> Store.t @@ -170,6 +120,10 @@ val get_tezos_node_cctxt : t -> Tezos_rpc.Context.generic (** [get_neighbors_cctxts ctxt] returns the dal node neighbors client contexts *) val get_neighbors_cctxts : t -> Dal_node_client.cctxt list +val get_ongoing_amplifications : t -> Types.Slot_id.Set.t + +val set_ongoing_amplifications : t -> Types.Slot_id.Set.t -> unit + (** [storage_period ctxt proto_parameters] returns for how many levels should the node store data about attested slots. This depends on the node's profile and its history mode. *) diff --git a/src/bin_dal_node/slot_manager.ml b/src/bin_dal_node/slot_manager.ml index e22b50d95096..b5ab1709e1f7 100644 --- a/src/bin_dal_node/slot_manager.ml +++ b/src/bin_dal_node/slot_manager.ml @@ -145,9 +145,11 @@ let get_slot_content_from_shards cryptobox store slot_id = let*! () = Event.(emit fetched_slot (Bytes.length slot, Seq.length shards)) in return slot -let get_slot_content ~reconstruct_if_missing ctxt store cryptobox slot_id = +let get_slot_content ~reconstruct_if_missing ctxt slot_id = let open Lwt_result_syntax in (* First attempt to get the slot from the slot store. *) + let store = Node_context.get_store ctxt in + let cryptobox = Node_context.get_cryptobox ctxt in let Cryptobox.{slot_size; _} = Cryptobox.parameters cryptobox in let*! res_slot_store = Store.Slots.find_slot store.Store.slots ~slot_size slot_id @@ -347,24 +349,17 @@ let get_slot_status ~slot_id node_store = let get_slot_shard (store : Store.t) (slot_id : Types.slot_id) shard_index = Store.Shards.read store.shards slot_id shard_index -let get_slot_pages ~reconstruct_if_missing cryptobox store node_context slot_id - = +let get_slot_pages ~reconstruct_if_missing node_context slot_id = let open Lwt_result_syntax in - let dal_parameters = Cryptobox.parameters cryptobox in - let* slot = - get_slot_content - ~reconstruct_if_missing - store - node_context - cryptobox - slot_id - in + let proto_parameters = Node_context.get_proto_parameters node_context in + let page_size = proto_parameters.cryptobox_parameters.page_size in + let* slot = get_slot_content ~reconstruct_if_missing node_context slot_id in (* The slot size `Bytes.length slot` should be an exact multiple of `page_size`. If this is not the case, we throw an `Illformed_pages` error. *) let*? pages = Bytes.chunk_bytes - dal_parameters.page_size + page_size slot ~error_on_partial_chunk:(Errors.other @@ TzTrace.make Illformed_pages) in diff --git a/src/bin_dal_node/slot_manager.mli b/src/bin_dal_node/slot_manager.mli index 66af4e5915f9..bf6dae168fc4 100644 --- a/src/bin_dal_node/slot_manager.mli +++ b/src/bin_dal_node/slot_manager.mli @@ -53,9 +53,7 @@ type slot = bytes the page-size specified in the [Cryptobox.parameters] argument. *) val get_slot_pages : reconstruct_if_missing:bool -> - Cryptobox.t -> - Node_context.ready_ctxt -> - Store.t -> + Node_context.t -> Types.slot_id -> (bytes list, [> Errors.not_found | Errors.other]) result Lwt.t @@ -87,23 +85,20 @@ val commit : Cryptobox.polynomial -> (Cryptobox.commitment, [> Errors.other]) result -(** [get_slot_content ~reconstruct_if_missing node_ctxt node_store - cryptobox slot_id] returns the slot content associated with the - given [slot_id] in [node_store]. +(** [get_slot_content ~reconstruct_if_missing node_ctxt slot_id] returns the + slot content associated with the given [slot_id] in the node's store. If the slot is not found in the store and [reconstruct_if_missing] is true, the slot is reconstructed from the stored shards. In addition to decoding errors, the function returns [`Not_found] - if there is no slot content for [slot_id] in [node_store] or if + if there is no slot content for [slot_id] in the node's store or if [reconstruct_if_missing] is true and not enough shards are stored to reconstruct the slot. *) val get_slot_content : reconstruct_if_missing:bool -> - Node_context.ready_ctxt -> - Store.t -> - Cryptobox.t -> + Node_context.t -> Types.slot_id -> (slot, [> Errors.other | Errors.not_found]) result Lwt.t -- GitLab From f938de9fd4cb2cf4b4ba1916f8b5d3efecc627cf Mon Sep 17 00:00:00 2001 From: Eugen Zalinescu Date: Fri, 19 Jul 2024 11:42:52 +0200 Subject: [PATCH 3/3] DAL/Node: refactor Handler.set_ready --- src/bin_dal_node/daemon.ml | 174 +++++++++++++++--------------- src/bin_dal_node/event.ml | 8 -- src/bin_dal_node/node_context.ml | 10 +- src/bin_dal_node/node_context.mli | 11 +- tezt/tests/dal.ml | 6 -- 5 files changed, 97 insertions(+), 112 deletions(-) diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index b47ed59e81cf..6b752433955e 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -223,79 +223,6 @@ module Handler = struct (* 4. In the case the message id is not Valid. *) other - let build_profile_context config = - let open Lwt_result_syntax in - let base_dir = Configuration_file.store_path config in - let*! res = Profile_manager.load_profile_ctxt ~base_dir in - match res with - | Ok loaded_profile -> - (* The profiles from the loaded context are prioritized over the - profiles provided in the config file. *) - Profile_manager.merge_profiles - ~lower_prio:config.Configuration_file.profile - ~higher_prio:loaded_profile - |> return - | Error err -> - let*! () = Event.(emit loading_profiles_failed err) in - return config.Configuration_file.profile - - (* Registers the attester profile context once we have the protocol plugin. This is supposed - to be called only once. *) - let update_and_register_profiles ctxt = - let open Lwt_result_syntax in - let profile_ctxt = Node_context.get_profile_ctxt ctxt in - let gs_worker = Node_context.get_gs_worker ctxt in - let proto_parameters = Node_context.get_proto_parameters ctxt in - let profile_ctxt_opt = - Profile_manager.add_profiles - Profile_manager.empty - proto_parameters - gs_worker - profile_ctxt - in - match profile_ctxt_opt with - | None -> fail Errors.[Profile_incompatibility] - | Some profile_ctxt -> - let*! () = Node_context.set_profile_ctxt ctxt profile_ctxt in - return_unit - - let set_ready cctxt config profile_ctxt last_processed_level - (head_level, (module Plugin : Dal_plugin.T), proto_parameters) = - let open Lwt_result_syntax in - (* Initialize the crypto process *) - (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5743 - Instead of recomputing these parameters, they could be stored - (for a given cryptobox). *) - let* cryptobox, shards_proofs_precomputation = - init_cryptobox config proto_parameters - in - Value_size_hooks.set_share_size - (Cryptobox.Internal_for_tests.encoded_share_size cryptobox) ; - Value_size_hooks.set_number_of_slots proto_parameters.number_of_slots ; - let* proto_plugins = - (* We resolve the plugins for all levels starting with [first_level]. It - is currently not necessary to go as far in the past, because only the - protocol parameters for these past levels are needed, and these do - not change for now (and are not retrieved for these past - levels). However, if/when they do change, it will be necessary to - retrieve them, using the right plugins. *) - let level = - match last_processed_level with - | None -> head_level - | Some level -> level - in - let relevant_period = - Profile_manager.get_attested_data_default_store_period - profile_ctxt - proto_parameters - in - let first_level = - Int32.max 1l (Int32.sub level (Int32.of_int relevant_period)) - in - Proto_plugins.initial_plugins cctxt ~first_level ~last_level:level - in - return (cryptobox, shards_proofs_precomputation, proto_plugins) - let supports_refutations ctxt = let profile = Node_context.get_profile_ctxt ctxt in Profile_manager.supports_refutations profile @@ -803,6 +730,63 @@ let check_l1_history_mode profile_ctxt cctxt proto_parameters = in handle ~dal_blocks:b ~l1_cycles:c +let build_profile_context config = + let open Lwt_result_syntax in + let base_dir = Configuration_file.store_path config in + let*! res = Profile_manager.load_profile_ctxt ~base_dir in + match res with + | Ok loaded_profile -> + (* The profiles from the loaded context are prioritized over the + profiles provided in the config file. *) + Profile_manager.merge_profiles + ~lower_prio:config.Configuration_file.profile + ~higher_prio:loaded_profile + |> return + | Error err -> + let*! () = Event.(emit loading_profiles_failed err) in + return config.Configuration_file.profile + +(* Registers the attester profile context once we have the protocol plugin. This is supposed + to be called only once. *) +let update_and_register_profiles ctxt = + let open Lwt_result_syntax in + let profile_ctxt = Node_context.get_profile_ctxt ctxt in + let gs_worker = Node_context.get_gs_worker ctxt in + let proto_parameters = Node_context.get_proto_parameters ctxt in + let profile_ctxt_opt = + Profile_manager.add_profiles + Profile_manager.empty + proto_parameters + gs_worker + profile_ctxt + in + match profile_ctxt_opt with + | None -> fail Errors.[Profile_incompatibility] + | Some profile_ctxt -> + let*! () = Node_context.set_profile_ctxt ctxt profile_ctxt in + return_unit + +let get_proto_plugins cctxt profile_ctxt last_processed_level + (head_level, (module Plugin : Dal_plugin.T), proto_parameters) = + (* We resolve the plugins for all levels starting with [first_level]. It + is currently not necessary to go as far in the past, because only the + protocol parameters for these past levels are needed, and these do + not change for now (and are not retrieved for these past + levels). However, if/when they do change, it will be necessary to + retrieve them, using the right plugins. *) + let level = + match last_processed_level with None -> head_level | Some level -> level + in + let relevant_period = + Profile_manager.get_attested_data_default_store_period + profile_ctxt + proto_parameters + in + let first_level = + Int32.max 1l (Int32.sub level (Int32.of_int relevant_period)) + in + Proto_plugins.initial_plugins cctxt ~first_level ~last_level:level + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/3605 Improve general architecture, handle L1 disconnection etc *) @@ -924,7 +908,7 @@ let run ~data_dir ~configuration_override = let* () = wait_for_l1_bootstrapped cctxt in (* Check the DAL node's and L1 node's history mode. *) let* ((_, _, proto_parameters) as plugin_info) = get_head_plugin cctxt in - let* profile_ctxt = Handler.build_profile_context config in + let* profile_ctxt = build_profile_context config in let*? () = Profile_manager.validate_slot_indexes profile_ctxt @@ -932,22 +916,24 @@ let run ~data_dir ~configuration_override = in let* () = check_history_mode config profile_ctxt proto_parameters in let* () = check_l1_history_mode profile_ctxt cctxt proto_parameters in - let* cryptobox, shards_proofs_precomputation, proto_plugins = - Handler.set_ready cctxt config profile_ctxt last_processed_level plugin_info + (* Initialize the crypto process *) + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5743 + Instead of recomputing these parameters, they could be stored + (for a given cryptobox). *) + let* cryptobox, shards_proofs_precomputation = + init_cryptobox config proto_parameters in - let*! crawler = - let open Constants in - Crawler.start - ~name:"dal_node_crawler" - ~chain:`Main - ~reconnection_delay:initial_l1_crawler_reconnection_delay - ~l1_blocks_cache_size:crawler_l1_blocks_cache_size - ?last_notified_level:last_processed_level - cctxt + (* Set value size hooks. *) + Value_size_hooks.set_share_size + (Cryptobox.Internal_for_tests.encoded_share_size cryptobox) ; + Value_size_hooks.set_number_of_slots proto_parameters.number_of_slots ; + let* proto_plugins = + get_proto_plugins cctxt profile_ctxt last_processed_level plugin_info in let ctxt = Node_context.init config + profile_ctxt cryptobox shards_proofs_precomputation proto_parameters @@ -957,11 +943,20 @@ let run ~data_dir ~configuration_override = transport_layer cctxt metrics_server - crawler last_processed_level_store in + let*! crawler = + let open Constants in + Crawler.start + ~name:"dal_node_crawler" + ~chain:`Main + ~reconnection_delay:initial_l1_crawler_reconnection_delay + ~l1_blocks_cache_size:crawler_l1_blocks_cache_size + ?last_notified_level:last_processed_level + cctxt + in let* amplificator = - if Profile_manager.is_prover_profile config.Configuration_file.profile then + if Profile_manager.is_prover_profile profile_ctxt then let* amplificator = Amplificator.make () in return_some amplificator else return_none @@ -971,7 +966,6 @@ let run ~data_dir ~configuration_override = | None -> return_unit | Some amplificator -> Amplificator.init amplificator ctxt in - let*! () = Event.(emit node_is_ready ()) in (* Start RPC server. *) let* rpc_server = RPC_server.(start config ctxt) in let _ = RPC_server.install_finalizer rpc_server in @@ -982,9 +976,11 @@ let run ~data_dir ~configuration_override = Gossipsub.Transport_layer.activate ~additional_points:points transport_layer in let*! () = Event.(emit p2p_server_is_ready listen_addr) in - let* () = Handler.update_and_register_profiles ctxt in (* Start collecting stats related to the Gossipsub worker. *) Dal_metrics.collect_gossipsub_metrics gs_worker ; + (* Register topics with gossipsub worker. *) + let* () = update_and_register_profiles ctxt in (* Start never-ending monitoring daemons *) + let*! () = Event.(emit node_is_ready ()) in let* () = daemonize [Handler.new_finalized_head ctxt cctxt crawler] in return_unit diff --git a/src/bin_dal_node/event.ml b/src/bin_dal_node/event.ml index ea57adb71fd4..92893b9d2eb1 100644 --- a/src/bin_dal_node/event.ml +++ b/src/bin_dal_node/event.ml @@ -131,14 +131,6 @@ let layer1_node_tracking_started = ~level:Notice () -let layer1_node_tracking_started_for_plugin = - declare_0 - ~section - ~name:"dal_node_layer_1_start_tracking_for_plugin" - ~msg:"started tracking layer 1's node to determine plugin" - ~level:Notice - () - let protocol_plugin_resolved = declare_1 ~section diff --git a/src/bin_dal_node/node_context.ml b/src/bin_dal_node/node_context.ml index 4df1de2458dd..acbc5b599bb5 100644 --- a/src/bin_dal_node/node_context.ml +++ b/src/bin_dal_node/node_context.ml @@ -41,13 +41,12 @@ type t = { transport_layer : Gossipsub.Transport_layer.t; mutable profile_ctxt : Profile_manager.t; metrics_server : Metrics.t; - crawler : Crawler.t; last_processed_level_store : Last_processed_level.t; } -let init config cryptobox shards_proofs_precomputation proto_parameters - proto_plugins store gs_worker transport_layer cctxt metrics_server crawler - last_processed_level_store = +let init config profile_ctxt cryptobox shards_proofs_precomputation + proto_parameters proto_plugins store gs_worker transport_layer cctxt + metrics_server last_processed_level_store = let neighbors_cctxts = List.map (fun Configuration_file.{addr; port} -> @@ -72,9 +71,8 @@ let init config cryptobox shards_proofs_precomputation proto_parameters Committee_cache.create ~max_size:Constants.committee_cache_size; gs_worker; transport_layer; - profile_ctxt = Profile_manager.empty; + profile_ctxt; metrics_server; - crawler; last_processed_level_store; } diff --git a/src/bin_dal_node/node_context.mli b/src/bin_dal_node/node_context.mli index a6872d555eef..d51b8b7b5f77 100644 --- a/src/bin_dal_node/node_context.mli +++ b/src/bin_dal_node/node_context.mli @@ -31,6 +31,7 @@ type t (** [init] creates a [t] value based on the given arguments. *) val init : Configuration_file.t -> + Profile_manager.t -> Cryptobox.t -> Cryptobox.shards_proofs_precomputation option -> Dal_plugin.proto_parameters -> @@ -40,7 +41,6 @@ val init : Gossipsub.Transport_layer.t -> Tezos_rpc.Context.generic -> Metrics.t -> - Crawler.t -> Last_processed_level.t -> t @@ -114,14 +114,19 @@ val get_last_processed_level_store : t -> Last_processed_level.t (** [get_gs_worker ctxt] returns the Gossipsub worker state. *) val get_gs_worker : t -> Gossipsub.Worker.t -(** [get_tezos_node_cctxt ctxt] returns the Tezos node's client context *) +(** [get_tezos_node_cctxt ctxt] returns the Tezos node's client context. *) val get_tezos_node_cctxt : t -> Tezos_rpc.Context.generic -(** [get_neighbors_cctxts ctxt] returns the dal node neighbors client contexts *) +(** [get_neighbors_cctxts ctxt] returns the client contexts of the DAL node's + neighbors. *) val get_neighbors_cctxts : t -> Dal_node_client.cctxt list +(** [get_ongoing_amplification ctxt] returns the slot ids for which there are + ongoing amplifications. *) val get_ongoing_amplifications : t -> Types.Slot_id.Set.t +(** [set_ongoing_amplification ctxt ongoing_amplifications] set the slot ids for + which there are ongoing amplifications. *) val set_ongoing_amplifications : t -> Types.Slot_id.Set.t -> unit (** [storage_period ctxt proto_parameters] returns for how many levels should diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index f50a4b58417d..ad19b08c1c65 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -2023,12 +2023,6 @@ let test_dal_node_startup = let dal_node = Dal_node.create ~node () in let* () = Dal_node.init_config dal_node in let* () = run_dal dal_node in - let* () = - Dal_node.wait_for - dal_node - "dal_node_layer_1_start_tracking_for_plugin.v0" - (fun _ -> Some ()) - in assert (Dal_node.is_running_not_ready dal_node) ; let* () = Dal_node.terminate dal_node in let* () = Node.terminate node in -- GitLab