From 16db269685ce91e9e1edc636290d4a2354d54f04 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 7 Oct 2025 14:40:11 +0100 Subject: [PATCH 01/19] Alpha/Baker: Node_rpc: Change dal_attestable_slots signature --- src/proto_alpha/lib_delegate/baking_actions.ml | 4 ++-- src/proto_alpha/lib_delegate/baking_scheduling.ml | 4 ++-- src/proto_alpha/lib_delegate/baking_state.ml | 6 ++++++ src/proto_alpha/lib_delegate/baking_state.mli | 3 +++ src/proto_alpha/lib_delegate/node_rpc.ml | 9 ++------- src/proto_alpha/lib_delegate/node_rpc.mli | 6 +++--- 6 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/proto_alpha/lib_delegate/baking_actions.ml b/src/proto_alpha/lib_delegate/baking_actions.ml index 0ee00d77bcd1..884f9cc48273 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.ml +++ b/src/proto_alpha/lib_delegate/baking_actions.ml @@ -1136,13 +1136,13 @@ let update_to_level state level_update = Node_rpc.dal_attestable_slots dal_node_rpc_ctxt ~attestation_level:new_level - (Delegate_infos.own_delegates delegate_infos) + (Delegate_infos.own_delegate_ids delegate_infos) in let next_level_dal_attestable_slots = Node_rpc.dal_attestable_slots dal_node_rpc_ctxt ~attestation_level:(Int32.succ new_level) - (Delegate_infos.own_delegates next_level_delegate_infos) + (Delegate_infos.own_delegate_ids next_level_delegate_infos) in Lwt.return (dal_attestable_slots, next_level_dal_attestable_slots)) in diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index b01c51a65f7e..4c78ede3d313 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -701,7 +701,7 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain Node_rpc.dal_attestable_slots dal_node_rpc_ctxt ~attestation_level:current_level - (Delegate_infos.own_delegates delegate_infos)) + (Delegate_infos.own_delegate_ids delegate_infos)) dal_node_rpc_ctxt in let next_level_dal_attestable_slots = @@ -711,7 +711,7 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain Node_rpc.dal_attestable_slots dal_node_rpc_ctxt ~attestation_level:(Int32.succ current_level) - (Delegate_infos.own_delegates next_level_delegate_infos)) + (Delegate_infos.own_delegate_ids next_level_delegate_infos)) dal_node_rpc_ctxt in let level_state = diff --git a/src/proto_alpha/lib_delegate/baking_state.ml b/src/proto_alpha/lib_delegate/baking_state.ml index 76111296ef38..9ab641c45e1e 100644 --- a/src/proto_alpha/lib_delegate/baking_state.ml +++ b/src/proto_alpha/lib_delegate/baking_state.ml @@ -158,6 +158,12 @@ module Delegate_infos = struct let own_delegates t = t.own_delegates + let own_delegate_ids t = + List.map + (fun delegate_info -> + Baking_state_types.Delegate.delegate_id delegate_info.delegate) + t.own_delegates + let own_round_owner t ~committee_size ~round = let open Result_syntax in let* round_int = Round.to_int round |> Environment.wrap_tzresult in diff --git a/src/proto_alpha/lib_delegate/baking_state.mli b/src/proto_alpha/lib_delegate/baking_state.mli index 02bee5b64cc8..6f8d7d1c70a6 100644 --- a/src/proto_alpha/lib_delegate/baking_state.mli +++ b/src/proto_alpha/lib_delegate/baking_state.mli @@ -45,6 +45,9 @@ module Delegate_infos : sig no duplicates, the associated slot is the first one. *) val own_delegates : t -> delegate_info list + (** Returns the list of the delegate ids from [own_delegates]. *) + val own_delegate_ids : t -> Delegate_id.t list + (** Returns, among our *own* delegates, the delegate (together with its first attesting slot) that owns the given round, if any. *) val own_round_owner : diff --git a/src/proto_alpha/lib_delegate/node_rpc.ml b/src/proto_alpha/lib_delegate/node_rpc.ml index a54bb32cd117..9374dc402dbd 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.ml +++ b/src/proto_alpha/lib_delegate/node_rpc.ml @@ -421,16 +421,11 @@ let get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level = () let dal_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) - ~attestation_level delegate_infos = + ~attestation_level = let attested_level = Int32.succ attestation_level in - List.map - (fun (delegate_info : delegate_info) -> - let delegate_id = - Baking_state_types.Delegate.delegate_id delegate_info.delegate - in + List.map (fun delegate_id -> ( delegate_id, get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level )) - delegate_infos let get_dal_profiles dal_node_rpc_ctxt = Tezos_rpc.Context.make_call diff --git a/src/proto_alpha/lib_delegate/node_rpc.mli b/src/proto_alpha/lib_delegate/node_rpc.mli index a45fe3666095..c50cddf70681 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.mli +++ b/src/proto_alpha/lib_delegate/node_rpc.mli @@ -86,14 +86,14 @@ val await_protocol_activation : val fetch_dal_config : #Protocol_client_context.rpc_context -> Cryptobox.Config.t tzresult Lwt.t -(** [dal_attestable_slots ctxt ~attestation_level delegates_slots] calls the DAL +(** [dal_attestable_slots ctxt ~attestation_level delegate_ids] calls the DAL node RPC GET /profiles//attested_levels//attestable_slots/ - for each of the delegates in [delegate_infos] and returns the corresponding + for each of the delegates in [delegate_ids] and returns the corresponding promises. *) val dal_attestable_slots : Tezos_rpc.Context.generic -> attestation_level:int32 -> - Baking_state.delegate_info list -> + Baking_state_types.Delegate_id.t list -> Baking_state.dal_attestable_slots (** [get_dal_profiles ctxt delegates] calls the DAL node RPC GET -- GitLab From d68ac3f18c6a2c96e60fa3cc6abb39c6b539cce4 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 8 Oct 2025 15:29:09 +0100 Subject: [PATCH 02/19] Alpha/Baker: Add Dal attestable_slots worker Currently, it does not serve any purpose, but it provides the creation and destruction capabilities. --- .../lib_delegate/baking_actions.ml | 9 +-- .../lib_delegate/baking_actions.mli | 5 +- src/proto_alpha/lib_delegate/baking_lib.ml | 6 ++ .../lib_delegate/baking_scheduling.ml | 17 ++++- .../lib_delegate/baking_scheduling.mli | 5 +- src/proto_alpha/lib_delegate/baking_state.ml | 14 ++-- src/proto_alpha/lib_delegate/baking_state.mli | 14 ++-- .../dal_attestable_slots_worker.ml | 71 +++++++++++++++++++ .../dal_attestable_slots_worker.mli | 59 +++++++++++++++ src/proto_alpha/lib_delegate/node_rpc.ml | 17 ----- src/proto_alpha/lib_delegate/node_rpc.mli | 10 --- 11 files changed, 171 insertions(+), 56 deletions(-) create mode 100644 src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml create mode 100644 src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli diff --git a/src/proto_alpha/lib_delegate/baking_actions.ml b/src/proto_alpha/lib_delegate/baking_actions.ml index 884f9cc48273..197b9012ada4 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.ml +++ b/src/proto_alpha/lib_delegate/baking_actions.ml @@ -148,8 +148,9 @@ and level_update = { current_round:Round.t -> delegate_infos:delegate_infos -> next_level_delegate_infos:delegate_infos -> - dal_attestable_slots:dal_attestable_slots -> - next_level_dal_attestable_slots:dal_attestable_slots -> + dal_attestable_slots:Dal_attestable_slots_worker.dal_attestable_slots -> + next_level_dal_attestable_slots: + Dal_attestable_slots_worker.dal_attestable_slots -> (state * action) Lwt.t; } @@ -1133,13 +1134,13 @@ let update_to_level state level_update = if Int32.(new_level = succ state.level_state.current_level) then state.level_state.next_level_dal_attestable_slots else - Node_rpc.dal_attestable_slots + Dal_attestable_slots_worker.dal_attestable_slots dal_node_rpc_ctxt ~attestation_level:new_level (Delegate_infos.own_delegate_ids delegate_infos) in let next_level_dal_attestable_slots = - Node_rpc.dal_attestable_slots + Dal_attestable_slots_worker.dal_attestable_slots dal_node_rpc_ctxt ~attestation_level:(Int32.succ new_level) (Delegate_infos.own_delegate_ids next_level_delegate_infos) diff --git a/src/proto_alpha/lib_delegate/baking_actions.mli b/src/proto_alpha/lib_delegate/baking_actions.mli index 0ab92ab20d2b..0775bb56fc96 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.mli +++ b/src/proto_alpha/lib_delegate/baking_actions.mli @@ -56,8 +56,9 @@ and level_update = { current_round:Round.t -> delegate_infos:delegate_infos -> next_level_delegate_infos:delegate_infos -> - dal_attestable_slots:dal_attestable_slots -> - next_level_dal_attestable_slots:dal_attestable_slots -> + dal_attestable_slots:Dal_attestable_slots_worker.dal_attestable_slots -> + next_level_dal_attestable_slots: + Dal_attestable_slots_worker.dal_attestable_slots -> (state * action) Lwt.t; } diff --git a/src/proto_alpha/lib_delegate/baking_lib.ml b/src/proto_alpha/lib_delegate/baking_lib.ml index 46cba2c0d65a..56c77c164b3c 100644 --- a/src/proto_alpha/lib_delegate/baking_lib.ml +++ b/src/proto_alpha/lib_delegate/baking_lib.ml @@ -66,6 +66,11 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool let*! operation_worker = Operation_worker.run ?monitor_node_operations ~round_durations cctxt in + let dal_attestable_slots_worker = + Dal_attestable_slots_worker.create + ~attestation_lag:constants.parametric.dal.attestation_lag + ~number_of_slots:constants.parametric.dal.number_of_slots + in Baking_scheduling.create_initial_state cctxt ?dal_node_rpc_ctxt @@ -74,6 +79,7 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool config operation_worker round_durations + dal_attestable_slots_worker ~current_proposal ~constants delegates diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index 4c78ede3d313..249569bd2665 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -617,7 +617,7 @@ let create_round_durations constants = (Round.Durations.create ~first_round_duration ~delay_increment_per_round) let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain - config operation_worker round_durations + config operation_worker round_durations dal_attestable_slots_worker ~(current_proposal : Baking_state.proposal) ?constants delegates = let open Lwt_result_syntax in (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7391 @@ -647,6 +647,7 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain constants; round_durations; operation_worker; + dal_attestable_slots_worker; forge_worker_hooks = { push_request = (fun _ -> assert false); @@ -698,7 +699,7 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain Option.fold ~none:[] ~some:(fun dal_node_rpc_ctxt -> - Node_rpc.dal_attestable_slots + Dal_attestable_slots_worker.dal_attestable_slots dal_node_rpc_ctxt ~attestation_level:current_level (Delegate_infos.own_delegate_ids delegate_infos)) @@ -708,7 +709,7 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain Option.fold ~none:[] ~some:(fun dal_node_rpc_ctxt -> - Node_rpc.dal_attestable_slots + Dal_attestable_slots_worker.dal_attestable_slots dal_node_rpc_ctxt ~attestation_level:(Int32.succ current_level) (Delegate_infos.own_delegate_ids next_level_delegate_infos)) @@ -1016,10 +1017,19 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) in let*? round_durations = create_round_durations constants in let*! operation_worker = Operation_worker.run ~round_durations cctxt in + let dal_attestable_slots_worker = + Dal_attestable_slots_worker.create + ~attestation_lag:constants.parametric.dal.attestation_lag + ~number_of_slots:constants.parametric.dal.number_of_slots + in Option.iter (fun canceler -> Lwt_canceler.on_cancel canceler (fun () -> let*! _ = Operation_worker.shutdown_worker operation_worker in + let*! () = + Dal_attestable_slots_worker.shutdown_worker + dal_attestable_slots_worker + in Lwt.return_unit)) canceler ; let* initial_state = @@ -1030,6 +1040,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) config operation_worker round_durations + dal_attestable_slots_worker ~current_proposal ~constants delegates diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.mli b/src/proto_alpha/lib_delegate/baking_scheduling.mli index 41030065dc63..e3fc81c04a69 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.mli +++ b/src/proto_alpha/lib_delegate/baking_scheduling.mli @@ -131,8 +131,8 @@ val create_loop_state : loop_state (** [create_initial_state context ?synchronize chain baking_configuration - operation_worker current_proposal ?constants consensus_keys] creates an - initial {!Baking_state.t} by initializing a + operation_worker dal_attestable_slots_worker current_proposal ?constants + consensus_keys] creates an initial {!Baking_state.t} by initializing a {!type-Baking_state.global_state}, a {!type-Baking_state.level_state} and a {!type-Baking_state.round_state}. @@ -155,6 +155,7 @@ val create_initial_state : Baking_configuration.t -> Operation_worker.t -> Round.round_durations -> + Dal_attestable_slots_worker.t -> current_proposal:proposal -> ?constants:Constants.t -> Baking_state_types.Key.t list -> diff --git a/src/proto_alpha/lib_delegate/baking_state.ml b/src/proto_alpha/lib_delegate/baking_state.ml index 9ab641c45e1e..cb009a075229 100644 --- a/src/proto_alpha/lib_delegate/baking_state.ml +++ b/src/proto_alpha/lib_delegate/baking_state.ml @@ -186,11 +186,6 @@ end type delegate_infos = Delegate_infos.t -type dal_attestable_slots = - (Delegate_id.t - * Tezos_dal_node_services.Types.attestable_slots tzresult Lwt.t) - list - type proposal = {block : block_info; predecessor : block_info} let proposal_encoding = @@ -265,8 +260,9 @@ type level_state = { delegate_infos : delegate_infos; next_level_delegate_infos : delegate_infos; next_level_latest_forge_request : Round.t option; - dal_attestable_slots : dal_attestable_slots; - next_level_dal_attestable_slots : dal_attestable_slots; + dal_attestable_slots : Dal_attestable_slots_worker.dal_attestable_slots; + next_level_dal_attestable_slots : + Dal_attestable_slots_worker.dal_attestable_slots; } type phase = @@ -531,8 +527,10 @@ type global_state = { constants : Constants.t; (* round durations *) round_durations : Round.round_durations; - (* worker that monitor and aggregates new operations *) + (* worker that monitors and aggregates new operations *) operation_worker : Operation_worker.t; + (* worker that retrieves DAL attestable slots from the DAL node *) + dal_attestable_slots_worker : Dal_attestable_slots_worker.t; (* hooks to the consensus and block forge worker *) mutable forge_worker_hooks : forge_worker_hooks; (* the validation mode used by the baker*) diff --git a/src/proto_alpha/lib_delegate/baking_state.mli b/src/proto_alpha/lib_delegate/baking_state.mli index 6f8d7d1c70a6..51e09292ae45 100644 --- a/src/proto_alpha/lib_delegate/baking_state.mli +++ b/src/proto_alpha/lib_delegate/baking_state.mli @@ -91,14 +91,6 @@ val compute_delegate_infos : (** {2 Consensus operations types functions} *) -(* An association list between delegates and promises for their DAL attestations - at some level (as obtained through the [get_attestable_slots] RPC). See usage - in {!level_state}. *) -type dal_attestable_slots = - (Delegate_id.t - * Tezos_dal_node_services.Types.attestable_slots tzresult Lwt.t) - list - type consensus_vote_kind = Attestation | Preattestation val consensus_vote_kind_encoding : consensus_vote_kind Data_encoding.t @@ -325,10 +317,11 @@ type level_state = { next_level_latest_forge_request : Round.t option; (** Some if a forge request has been sent for the next level on the given round *) - dal_attestable_slots : dal_attestable_slots; + dal_attestable_slots : Dal_attestable_slots_worker.dal_attestable_slots; (** For each (own) delegate having a DAL slot at the current level, store a promise to obtain the attestable slots for that level. *) - next_level_dal_attestable_slots : dal_attestable_slots; + next_level_dal_attestable_slots : + Dal_attestable_slots_worker.dal_attestable_slots; (** and similarly for the next level *) } @@ -476,6 +469,7 @@ type global_state = { constants : Constants.t; round_durations : Round.round_durations; operation_worker : Operation_worker.t; + dal_attestable_slots_worker : Dal_attestable_slots_worker.t; mutable forge_worker_hooks : forge_worker_hooks; validation_mode : validation_mode; delegates : Key.t list; diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml new file mode 100644 index 000000000000..f3d818cc501d --- /dev/null +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -0,0 +1,71 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +open Baking_state_types +open Tezos_dal_node_services + +type dal_attestable_slots = + (Delegate_id.t * Types.attestable_slots tzresult Lwt.t) list + +(** A handle to a single delegate’s DAL monitoring subscription. *) +type stream_handle = { + stream : Types.Attestable_event.t Lwt_stream.t; + stopper : Tezos_rpc.Context.stopper; +} + +type slots_by_delegate = Types.attestable_slots Delegate_id.Table.t + +type t = { + lock : Lwt_mutex.t; (** Global mutex protecting [streams] and [cache]. *) + attestation_lag : int; + number_of_slots : int; + streams : stream_handle Delegate_id.Table.t; + (** Active per-delegate subscriptions. *) + cache : (int32, slots_by_delegate) Stdlib.Hashtbl.t; + (** Cache of attestable slots, keyed by attested levels. *) +} + +let get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level = + let pkh = Delegate_id.to_pkh delegate_id in + Tezos_rpc.Context.make_call + Tezos_dal_node_services.Services.get_attestable_slots + dal_node_rpc_ctxt + (((), Tezos_crypto.Signature.Of_V2.public_key_hash pkh), attested_level) + () + () + +let dal_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) + ~attestation_level = + let attested_level = Int32.succ attestation_level in + List.map (fun delegate_id -> + ( delegate_id, + get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level )) + +let create ~attestation_lag ~number_of_slots = + { + lock = Lwt_mutex.create (); + attestation_lag; + number_of_slots; + streams = Delegate_id.Table.create 10; + cache = Stdlib.Hashtbl.create 16; + } + +let shutdown_worker state = + let open Lwt_syntax in + let* stoppers = + Lwt_mutex.with_lock state.lock @@ fun () -> + let stoppers = + Delegate_id.Table.to_seq state.streams + |> Seq.map (fun (_delegate_id, {stopper; _}) -> stopper) + |> List.of_seq + in + Delegate_id.Table.clear state.streams ; + Stdlib.Hashtbl.reset state.cache ; + return stoppers + in + List.iter (fun stopper -> stopper ()) stoppers ; + return_unit diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli new file mode 100644 index 000000000000..9b02be96fab1 --- /dev/null +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -0,0 +1,59 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +(** The DAL attestable slots worker keeps per-delegate subscriptions to + the DAL node’s streaming RPC [GET /profiles//monitor/attestable_slots)]. + Each stream emits attestable slots events as soon as a delegate’s shards for a + published level become fully available or when we know the delegate is not in + a particular DAL committee. + + Incoming events are folded into an in-memory cache keyed by attested level + (published_level + attestation_lag) and delegate id. For each (attested_level, + delegate), the worker maintains a boolean bitset of attestable slots. + This cache is filled continuously and independently of the baker’s main + loop so that consensus code never waits on the network. + + The worker's purpose is to decouple the critical consensus path from DAL + RPC latency: streams advance in the background, therefore the cache can serve + DAL information instantly. +*) + +(** {1 Datatypes} *) + +(* An association list between delegates and promises for their DAL attestations + at some level (as obtained through the [get_attestable_slots] RPC). See usage + in {!level_state}. *) +type dal_attestable_slots = + (Baking_state_types.Delegate_id.t + * Tezos_dal_node_services.Types.attestable_slots tzresult Lwt.t) + list + +type t + +(** {1 RPC utilities} *) + +(** [dal_attestable_slots ctxt ~attestation_level delegate_ids] calls the DAL + node RPC GET /profiles//attested_levels//attestable_slots/ + for each of the delegates in [delegate_ids] and returns the corresponding + promises. *) +val dal_attestable_slots : + Tezos_rpc.Context.generic -> + attestation_level:int32 -> + Baking_state_types.Delegate_id.t list -> + dal_attestable_slots + +(** {1 Constructors} *) + +(** [create ~attestation_lag ~number_of_slots] creates a new worker state. This + does not start any background thread, as streams are opened lazily when + someone asks for attestable slots via [get_dal_attestable_slots]. *) +val create : attestation_lag:int -> number_of_slots:int -> t + +(** [shutdown_worker state] stops all active delegate subscriptions and clears + the worker’s in-memory state. The worker will no longer hold any references + to live streams and the cache will become empty. *) +val shutdown_worker : t -> unit Lwt.t diff --git a/src/proto_alpha/lib_delegate/node_rpc.ml b/src/proto_alpha/lib_delegate/node_rpc.ml index 9374dc402dbd..a6f1c7ac554f 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.ml +++ b/src/proto_alpha/lib_delegate/node_rpc.ml @@ -27,7 +27,6 @@ open Protocol open Alpha_context open Baking_cache open Baking_state -open Baking_state_types module Block_services = Block_services.Make (Protocol) (Protocol) module Events = Baking_events.Node_rpc @@ -411,22 +410,6 @@ let fetch_dal_config cctxt = | Error e -> return_error e | Ok dal_config -> return_ok dal_config -let get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level = - let pkh = Delegate_id.to_pkh delegate_id in - Tezos_rpc.Context.make_call - Tezos_dal_node_services.Services.get_attestable_slots - dal_node_rpc_ctxt - (((), Tezos_crypto.Signature.Of_V2.public_key_hash pkh), attested_level) - () - () - -let dal_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) - ~attestation_level = - let attested_level = Int32.succ attestation_level in - List.map (fun delegate_id -> - ( delegate_id, - get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level )) - let get_dal_profiles dal_node_rpc_ctxt = Tezos_rpc.Context.make_call Tezos_dal_node_services.Services.get_profiles diff --git a/src/proto_alpha/lib_delegate/node_rpc.mli b/src/proto_alpha/lib_delegate/node_rpc.mli index c50cddf70681..13bc982e8670 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.mli +++ b/src/proto_alpha/lib_delegate/node_rpc.mli @@ -86,16 +86,6 @@ val await_protocol_activation : val fetch_dal_config : #Protocol_client_context.rpc_context -> Cryptobox.Config.t tzresult Lwt.t -(** [dal_attestable_slots ctxt ~attestation_level delegate_ids] calls the DAL - node RPC GET /profiles//attested_levels//attestable_slots/ - for each of the delegates in [delegate_ids] and returns the corresponding - promises. *) -val dal_attestable_slots : - Tezos_rpc.Context.generic -> - attestation_level:int32 -> - Baking_state_types.Delegate_id.t list -> - Baking_state.dal_attestable_slots - (** [get_dal_profiles ctxt delegates] calls the DAL node RPC GET /profiles/ to retrieve the DAL node's profiles. *) val get_dal_profiles : -- GitLab From 6a7667e3ccbb8ca1baf3393384d85f3c1041053d Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 10 Nov 2025 10:38:32 +0000 Subject: [PATCH 03/19] Tezt: Dal: Fix type referenced in documentation --- tezt/lib_tezos/dal_node.ml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tezt/lib_tezos/dal_node.ml b/tezt/lib_tezos/dal_node.ml index d86e9977e6f0..b0fab86cbef2 100644 --- a/tezt/lib_tezos/dal_node.ml +++ b/tezt/lib_tezos/dal_node.ml @@ -575,8 +575,7 @@ module Proxy = struct [~published_level]. To do this, we need to add the missing slot indices for each level in the [Backfill] element that is always the first one sent in the monitoring stream. Note: this function heavily depends on the types of elements that are in the returned RPC - stream, information which can be found at - {!Tezos_dal_node_services.Types.Attestable_slots_watcher_table.Attestable_event.t}. *) + stream, information which can be found at {!Tezos_dal_node_services.Types.Attestable_event.t}. *) let rewrite_attestable_stream_line ~number_of_slots ~published_level line = let open Ezjsonm in let make_slot_id slot_level slot_index = -- GitLab From aec1fc3177f69439355dc1224141cf7fa4285a03 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 8 Oct 2025 16:20:14 +0100 Subject: [PATCH 04/19] Alpha/Baker: Dal_attestable_slots_worker: Add get_attestable_slots Currently, no streams are created, so we always fallback to the static RPCs to get the attestable slots. Consequently, this commit should be a no-op. --- .../dal_attestable_slots_worker.ml | 57 +++++++++++++++++++ .../dal_attestable_slots_worker.mli | 16 +++++- 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index f3d818cc501d..1c9265a4e6b4 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -8,6 +8,33 @@ open Baking_state_types open Tezos_dal_node_services +module Events = struct + include Internal_event.Simple + + let section = [Protocol.name; "baker"; "attestable_slots_worker"] + + let no_attestable_slot_at_level = + declare_1 + ~section + ~name:"no_attestable_slot_at_level" + ~level:Warning + ~msg: + "No attestable slots found at level {attested_level} in cache, falling \ + back to static RPC" + ("attested_level", Data_encoding.int32) + + let no_attestable_slot_at_level_for_delegate = + declare_2 + ~section + ~name:"no_attestable_slot_at_level_for_delegate" + ~level:Warning + ~msg: + "No attestable slots found at level {attested_level} for \ + {delegate_id}, falling back to static RPC" + ("attested_level", Data_encoding.int32) + ("delegate_id", Delegate_id.encoding) +end + type dal_attestable_slots = (Delegate_id.t * Types.attestable_slots tzresult Lwt.t) list @@ -45,6 +72,36 @@ let dal_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) ( delegate_id, get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level )) +let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level + delegate_ids = + let open Lwt_syntax in + let attested_level = Int32.succ attestation_level in + Lwt_mutex.with_lock state.lock @@ fun () -> + match Stdlib.Hashtbl.find_opt state.cache attested_level with + | None -> + let* () = Events.(emit no_attestable_slot_at_level attested_level) in + return + @@ dal_attestable_slots dal_node_rpc_ctxt ~attestation_level delegate_ids + | Some slots_by_delegate -> + List.map_s + (fun delegate_id -> + match Delegate_id.Table.find_opt slots_by_delegate delegate_id with + | None -> + let* () = + Events.( + emit + no_attestable_slot_at_level_for_delegate + (attested_level, delegate_id)) + in + return + ( delegate_id, + get_attestable_slots + dal_node_rpc_ctxt + delegate_id + ~attested_level ) + | Some slots -> return (delegate_id, Lwt_result_syntax.return @@ slots)) + delegate_ids + let create ~attestation_lag ~number_of_slots = { lock = Lwt_mutex.create (); diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli index 9b02be96fab1..a550fece9f20 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -22,13 +22,15 @@ DAL information instantly. *) +open Baking_state_types + (** {1 Datatypes} *) (* An association list between delegates and promises for their DAL attestations at some level (as obtained through the [get_attestable_slots] RPC). See usage in {!level_state}. *) type dal_attestable_slots = - (Baking_state_types.Delegate_id.t + (Delegate_id.t * Tezos_dal_node_services.Types.attestable_slots tzresult Lwt.t) list @@ -46,6 +48,18 @@ val dal_attestable_slots : Baking_state_types.Delegate_id.t list -> dal_attestable_slots +(** {1 Accessors} *) + +(** [get_dal_attestable_slots t ctxt ~attestation_level delegate_ids] + returns, for each delegate, the current bitset for attested level + [~attestation_level] + 1 and its [published_level]. *) +val get_dal_attestable_slots : + t -> + Tezos_rpc.Context.generic -> + attestation_level:int32 -> + Delegate_id.t list -> + dal_attestable_slots Lwt.t + (** {1 Constructors} *) (** [create ~attestation_lag ~number_of_slots] creates a new worker state. This -- GitLab From bbf0535a014460102bb849e4e8ec4f177b982fc6 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 17:25:51 +0000 Subject: [PATCH 05/19] Alpha/Baker: Dal_attestable_slots_worker: Add streams update --- .../dal_attestable_slots_worker.ml | 215 ++++++++++++++++++ .../dal_attestable_slots_worker.mli | 6 + 2 files changed, 221 insertions(+) diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index 1c9265a4e6b4..44094df8d8d1 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -13,6 +13,14 @@ module Events = struct let section = [Protocol.name; "baker"; "attestable_slots_worker"] + let consumed_backfill_stream = + declare_1 + ~section + ~name:"consumed_backfill_stream" + ~level:Info + ~msg:"Consumed backfill stream for {delegate_id}" + ("delegate_id", Delegate_id.encoding) + let no_attestable_slot_at_level = declare_1 ~section @@ -33,11 +41,29 @@ module Events = struct {delegate_id}, falling back to static RPC" ("attested_level", Data_encoding.int32) ("delegate_id", Delegate_id.encoding) + + let stream_ended = + declare_1 + ~section + ~name:"dal_stream_ended" + ~level:Notice + ~msg:"DAL monitor stream ended for {delegate_id}" + ("delegate_id", Delegate_id.encoding) + + let consume_streams_ended = + declare_1 + ~section + ~name:"consume_streams_ended" + ~level:Error + ~msg:"consume_streams ended with error {stacktrace}" + ("stacktrace", Data_encoding.string) end type dal_attestable_slots = (Delegate_id.t * Types.attestable_slots tzresult Lwt.t) list +module DelegateSet = Set.Make (Delegate_id) + (** A handle to a single delegate’s DAL monitoring subscription. *) type stream_handle = { stream : Types.Attestable_event.t Lwt_stream.t; @@ -72,10 +98,199 @@ let dal_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) ( delegate_id, get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level )) +(** [monitor_attestable_slots dal_node_rpc_ctxt ~delegate_id] opens a streamed RPC + to the DAL node for the given [~delegate_id]. Each item emitted on the stream contains + DAL attestable information for this delegate. *) +let monitor_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) + ~delegate_id = + let open Lwt_syntax in + let pkh = Delegate_id.to_pkh delegate_id in + let* result = + Tezos_rpc.Context.make_streamed_call + Services.monitor_attestable_slots + dal_node_rpc_ctxt + ((), Tezos_crypto.Signature.Of_V2.public_key_hash pkh) + () + () + in + match result with + | Ok result -> return result + | Error trace -> + Lwt.fail_with (Format.asprintf "%a" Error_monad.pp_print_trace trace) + +let get_slots_by_delegate state ~attested_level = + match Stdlib.Hashtbl.find_opt state.cache attested_level with + | Some slots_by_delegate -> slots_by_delegate + | None -> + let slots_by_delegate = Delegate_id.Table.create 10 in + Stdlib.Hashtbl.add state.cache attested_level slots_by_delegate ; + slots_by_delegate + +(** [update_cache_with_attestable_slot ~delegate_id ~slot_id] adds [~slot_id] into the cache, + using the keys [attested_level] = [slot_level] + [attestation_lag] and [~delegate_id]. + The function must be called under a lock, to avoid race conditions. *) +let update_cache_with_attestable_slot state ~delegate_id ~slot_id = + let Types.Slot_id.{slot_level; slot_index} = slot_id in + let attested_level = Int32.(add slot_level (of_int state.attestation_lag)) in + let slots_by_delegate = get_slots_by_delegate state ~attested_level in + let attestable_slots = + match Delegate_id.Table.find_opt slots_by_delegate delegate_id with + | Some (Types.Attestable_slots {slots; published_level}) -> + let slots_array = Array.of_list slots in + slots_array.(slot_id.slot_index) <- true ; + let slots = Array.to_list slots_array in + Types.Attestable_slots {slots; published_level} + | Some Not_in_committee -> + (* We should never reach this point, as the delegate should not have any attestable slot, + as they are not in the committee. *) + Types.Not_in_committee + | None -> + let slots = Array.make state.number_of_slots false in + slots.(slot_index) <- true ; + Types.Attestable_slots + {slots = Array.to_list slots; published_level = slot_level} + in + Delegate_id.Table.replace slots_by_delegate delegate_id attestable_slots + +let update_cache_no_shards_assigned state ~delegate_id ~attested_level = + let slots_by_delegate = get_slots_by_delegate state ~attested_level in + Delegate_id.Table.replace slots_by_delegate delegate_id Types.Not_in_committee + +(** [consume_stream state ~delegate_id stream_handler] consumes + [~delegate_id]'s stream forever, updating the cache accordingly . *) +let rec consume_stream state ~delegate_id stream_handle = + let open Lwt_syntax in + let module E = Types.Attestable_event in + let* attestable_event_opt = Lwt_stream.get stream_handle.stream in + match attestable_event_opt with + | None -> + Lwt_mutex.with_lock state.lock @@ fun () -> + let* () = Events.(emit stream_ended delegate_id) in + Delegate_id.Table.remove state.streams delegate_id ; + return_unit + | Some (E.Attestable_slot {slot_id}) -> + let* () = + Lwt_mutex.with_lock state.lock @@ fun () -> + update_cache_with_attestable_slot state ~delegate_id ~slot_id ; + return_unit + in + consume_stream state ~delegate_id stream_handle + | Some (No_shards_assigned {attestation_level}) -> + let* () = + let attested_level = Int32.succ attestation_level in + Lwt_mutex.with_lock state.lock @@ fun () -> + update_cache_no_shards_assigned state ~delegate_id ~attested_level ; + return_unit + in + consume_stream state ~delegate_id stream_handle + | Some (Slot_has_trap _slot_id) -> + (* In case of a trap, we do not need to update the cache with any information. *) + consume_stream state ~delegate_id stream_handle + | Some (Backfill _backfill_payload) -> + (* This case should never be reachable, since we only backfill once, in a synchronous call. *) + Lwt.fail_with + "Backfill should never be done in consume_streams, as that is an \ + asynchronous function." + +let update_cache_backfill_payload state ~delegate_id ~backfill_payload = + let module E = Types.Attestable_event in + let E.{slot_ids; no_shards_attestation_levels} = backfill_payload in + Lwt_mutex.with_lock state.lock @@ fun () -> + List.iter + (fun slot_id -> + update_cache_with_attestable_slot state ~delegate_id ~slot_id) + slot_ids ; + List.iter + (fun attestation_level -> + let attested_level = Int32.succ attestation_level in + update_cache_no_shards_assigned state ~delegate_id ~attested_level) + no_shards_attestation_levels ; + Lwt.return_unit + +(** [consume_backfill_stream state ~delegate_id stream_handle] consumes the initial [Backfill] + event from a freshly opened DAL monitoring stream. This function is meant to be called + immediatelly after subscribing to the DAL node stream for a [~delegate_id] and before + spawning the asynchronous consumer that handles live events. *) +let consume_backfill_stream state ~delegate_id stream_handle = + let open Lwt_syntax in + let module E = Types.Attestable_event in + let* attestable_event_opt = Lwt_stream.get stream_handle.stream in + match attestable_event_opt with + | Some (E.Backfill {backfill_payload}) -> + let* () = + update_cache_backfill_payload state ~delegate_id ~backfill_payload + in + let* () = Events.(emit consumed_backfill_stream delegate_id) in + return_unit + | None -> + Lwt_mutex.with_lock state.lock @@ fun () -> + let* () = Events.(emit stream_ended delegate_id) in + Delegate_id.Table.remove state.streams delegate_id ; + return_unit + | _ -> + Lwt.fail_with + "The stream must always start properly with a Backfill event." + +(** [subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add] + opens new monitoring DAL streams for each entry in [~delegate_ids_to_add], + and starts consuming from them, to populate the internal cache. *) +let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = + let open Lwt_syntax in + let* new_streams = + List.map_s + (fun delegate_id -> + let* stream, stopper = + monitor_attestable_slots dal_node_rpc_ctxt ~delegate_id + in + return (delegate_id, {stream; stopper})) + delegate_ids_to_add + in + let* () = + Lwt_mutex.with_lock state.lock @@ fun () -> + List.iter + (fun (delegate_id, stream_handle) -> + Delegate_id.Table.add state.streams delegate_id stream_handle) + new_streams ; + return_unit + in + let* () = + List.iter_p + (fun (delegate_id, stream_handle) -> + consume_backfill_stream state ~delegate_id stream_handle) + new_streams + in + List.iter + (fun (delegate_id, stream_handle) -> + Lwt.dont_wait + (fun () -> consume_stream state ~delegate_id stream_handle) + (fun exn -> + Events.( + emit__dont_wait__use_with_care + consume_streams_ended + (Printexc.to_string exn)))) + new_streams ; + return_unit + +(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids] reconciles + the active set of streams using [delegate_ids] by computing the list of streams + to subscribe to. *) +let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids = + let open Lwt_syntax in + let* delegate_ids_to_add = + Lwt_mutex.with_lock state.lock @@ fun () -> + let new_delegate_ids = DelegateSet.of_list delegate_ids in + let current_delegate_ids = + DelegateSet.of_seq @@ Delegate_id.Table.to_seq_keys state.streams + in + return DelegateSet.(elements (diff new_delegate_ids current_delegate_ids)) + in + subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add + let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level delegate_ids = let open Lwt_syntax in let attested_level = Int32.succ attestation_level in + let* () = update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids in Lwt_mutex.with_lock state.lock @@ fun () -> match Stdlib.Hashtbl.find_opt state.cache attested_level with | None -> diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli index a550fece9f20..5f29a028b27c 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -17,6 +17,12 @@ This cache is filled continuously and independently of the baker’s main loop so that consensus code never waits on the network. + When the baker asks for attestable slots at level L for a set of delegates, + the worker: + - reconciles subscriptions with that set (subscribing for new delegates); + - returns an immediate "snapshot" from the cache (falling back to the static + RPC to get the attestable slots). + The worker's purpose is to decouple the critical consensus path from DAL RPC latency: streams advance in the background, therefore the cache can serve DAL information instantly. -- GitLab From 10135005d5a266ea662668ba617d6b6afa794a75 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 17:27:05 +0000 Subject: [PATCH 06/19] Alpha/Baker: Dal_attestable_slots_worker: Add pruning mechanism --- .../dal_attestable_slots_worker.ml | 70 ++++++++++++------- .../dal_attestable_slots_worker.mli | 1 + 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index 44094df8d8d1..82afe359cc84 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -286,36 +286,56 @@ let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids = in subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add +(** [prune_cache_before state ~prune_level] prunes all cache entries before + [~prune_level]. This is called by the public getter once per level, keeping memory + bounded without background maintenance. *) +let prune_cache_before state ~prune_level = + Lwt_mutex.with_lock state.lock @@ fun () -> + Stdlib.Hashtbl.filter_map_inplace + (fun attested_level slots_by_delegate -> + if Int32.compare attested_level prune_level < 0 then None + else Some slots_by_delegate) + state.cache ; + Lwt.return_unit + let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level delegate_ids = let open Lwt_syntax in let attested_level = Int32.succ attestation_level in let* () = update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids in - Lwt_mutex.with_lock state.lock @@ fun () -> - match Stdlib.Hashtbl.find_opt state.cache attested_level with - | None -> - let* () = Events.(emit no_attestable_slot_at_level attested_level) in - return - @@ dal_attestable_slots dal_node_rpc_ctxt ~attestation_level delegate_ids - | Some slots_by_delegate -> - List.map_s - (fun delegate_id -> - match Delegate_id.Table.find_opt slots_by_delegate delegate_id with - | None -> - let* () = - Events.( - emit - no_attestable_slot_at_level_for_delegate - (attested_level, delegate_id)) - in - return - ( delegate_id, - get_attestable_slots - dal_node_rpc_ctxt - delegate_id - ~attested_level ) - | Some slots -> return (delegate_id, Lwt_result_syntax.return @@ slots)) - delegate_ids + let* dal_attestable_slots = + Lwt_mutex.with_lock state.lock @@ fun () -> + match Stdlib.Hashtbl.find_opt state.cache attested_level with + | None -> + let* () = Events.(emit no_attestable_slot_at_level attested_level) in + return + @@ dal_attestable_slots + dal_node_rpc_ctxt + ~attestation_level + delegate_ids + | Some slots_by_delegate -> + List.map_s + (fun delegate_id -> + match Delegate_id.Table.find_opt slots_by_delegate delegate_id with + | None -> + let* () = + Events.( + emit + no_attestable_slot_at_level_for_delegate + (attested_level, delegate_id)) + in + return + ( delegate_id, + get_attestable_slots + dal_node_rpc_ctxt + delegate_id + ~attested_level ) + | Some slots -> + return (delegate_id, Lwt_result_syntax.return @@ slots)) + delegate_ids + in + let* () = prune_cache_before state ~prune_level:attestation_level in + return dal_attestable_slots let create ~attestation_lag ~number_of_slots = { diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli index 5f29a028b27c..ee827af65301 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -20,6 +20,7 @@ When the baker asks for attestable slots at level L for a set of delegates, the worker: - reconciles subscriptions with that set (subscribing for new delegates); + - prunes cached entries strictly older than L to keep memory bounded; - returns an immediate "snapshot" from the cache (falling back to the static RPC to get the attestable slots). -- GitLab From 8ce35714d77e190d1c2fc1830b386e43b09b0888 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 17:28:05 +0000 Subject: [PATCH 07/19] Alpha/Baker: Use attestable worker instead of next_level_attestable_slots --- .../testnet_experiment_tools/tool_alpha.ml | 6 ++++ .../lib_delegate/baking_actions.ml | 28 ++++------------ .../lib_delegate/baking_actions.mli | 2 -- .../lib_delegate/baking_scheduling.ml | 18 +++------- src/proto_alpha/lib_delegate/baking_state.ml | 11 +++---- src/proto_alpha/lib_delegate/baking_state.mli | 3 -- .../dal_attestable_slots_worker.ml | 33 ++++++++++++++----- .../dal_attestable_slots_worker.mli | 16 ++------- .../lib_delegate/state_transitions.ml | 4 +-- tezt/tests/dal.ml | 6 ++-- 10 files changed, 53 insertions(+), 74 deletions(-) diff --git a/devtools/testnet_experiment_tools/tool_alpha.ml b/devtools/testnet_experiment_tools/tool_alpha.ml index ea69a2c2add5..1added7be854 100644 --- a/devtools/testnet_experiment_tools/tool_alpha.ml +++ b/devtools/testnet_experiment_tools/tool_alpha.ml @@ -261,6 +261,11 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config let*! operation_worker = Operation_worker.run ?monitor_node_operations ~round_durations cctxt in + let dal_attestable_slots_worker = + Dal_attestable_slots_worker.create + ~attestation_lag:constants.parametric.dal.attestation_lag + ~number_of_slots:constants.parametric.dal.number_of_slots + in Baking_scheduling.create_initial_state cctxt ?synchronize @@ -270,6 +275,7 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config operation_worker ~current_proposal round_durations + dal_attestable_slots_worker delegates let compute_current_round_duration round_durations diff --git a/src/proto_alpha/lib_delegate/baking_actions.ml b/src/proto_alpha/lib_delegate/baking_actions.ml index 197b9012ada4..08d100bfa6fe 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.ml +++ b/src/proto_alpha/lib_delegate/baking_actions.ml @@ -149,8 +149,6 @@ and level_update = { delegate_infos:delegate_infos -> next_level_delegate_infos:delegate_infos -> dal_attestable_slots:Dal_attestable_slots_worker.dal_attestable_slots -> - next_level_dal_attestable_slots: - Dal_attestable_slots_worker.dal_attestable_slots -> (state * action) Lwt.t; } @@ -1125,27 +1123,16 @@ let update_to_level state level_update = new_level_proposal round_durations [@profiler.record_f {verbosity = Debug} "compute round"]) in - let*! dal_attestable_slots, next_level_dal_attestable_slots = + let*! dal_attestable_slots = only_if_dal_feature_enabled state - ~default_value:([], []) + ~default_value:[] (fun dal_node_rpc_ctxt -> - let dal_attestable_slots = - if Int32.(new_level = succ state.level_state.current_level) then - state.level_state.next_level_dal_attestable_slots - else - Dal_attestable_slots_worker.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:new_level - (Delegate_infos.own_delegate_ids delegate_infos) - in - let next_level_dal_attestable_slots = - Dal_attestable_slots_worker.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:(Int32.succ new_level) - (Delegate_infos.own_delegate_ids next_level_delegate_infos) - in - Lwt.return (dal_attestable_slots, next_level_dal_attestable_slots)) + Dal_attestable_slots_worker.get_dal_attestable_slots + state.global_state.dal_attestable_slots_worker + dal_node_rpc_ctxt + ~attestation_level:new_level + (Delegate_infos.own_delegate_ids delegate_infos)) in let*! new_state = (compute_new_state @@ -1153,7 +1140,6 @@ let update_to_level state level_update = ~delegate_infos ~next_level_delegate_infos ~dal_attestable_slots - ~next_level_dal_attestable_slots [@profiler.record_s {verbosity = Debug} "compute new state"]) in return new_state diff --git a/src/proto_alpha/lib_delegate/baking_actions.mli b/src/proto_alpha/lib_delegate/baking_actions.mli index 0775bb56fc96..7b036d89ba0b 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.mli +++ b/src/proto_alpha/lib_delegate/baking_actions.mli @@ -57,8 +57,6 @@ and level_update = { delegate_infos:delegate_infos -> next_level_delegate_infos:delegate_infos -> dal_attestable_slots:Dal_attestable_slots_worker.dal_attestable_slots -> - next_level_dal_attestable_slots: - Dal_attestable_slots_worker.dal_attestable_slots -> (state * action) Lwt.t; } diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index 249569bd2665..255af53fdbfc 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -695,26 +695,17 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain else None in let current_level = current_proposal.block.shell.level in - let dal_attestable_slots = - Option.fold + let*! dal_attestable_slots = + Option.fold_s ~none:[] ~some:(fun dal_node_rpc_ctxt -> - Dal_attestable_slots_worker.dal_attestable_slots + Dal_attestable_slots_worker.get_dal_attestable_slots + global_state.dal_attestable_slots_worker dal_node_rpc_ctxt ~attestation_level:current_level (Delegate_infos.own_delegate_ids delegate_infos)) dal_node_rpc_ctxt in - let next_level_dal_attestable_slots = - Option.fold - ~none:[] - ~some:(fun dal_node_rpc_ctxt -> - Dal_attestable_slots_worker.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:(Int32.succ current_level) - (Delegate_infos.own_delegate_ids next_level_delegate_infos)) - dal_node_rpc_ctxt - in let level_state = { current_level; @@ -728,7 +719,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain next_level_delegate_infos; next_level_latest_forge_request = None; dal_attestable_slots; - next_level_dal_attestable_slots; } in let* round_state = diff --git a/src/proto_alpha/lib_delegate/baking_state.ml b/src/proto_alpha/lib_delegate/baking_state.ml index cb009a075229..cf12ea253e07 100644 --- a/src/proto_alpha/lib_delegate/baking_state.ml +++ b/src/proto_alpha/lib_delegate/baking_state.ml @@ -242,10 +242,10 @@ type prepared_block = { } (* The fields {current_level}, {delegate_infos}, {next_level_delegate_infos}, - {next_level_latest_forge_request}, {dal_attestable_slots}, - {next_level_dal_attestable_slots} are updated only when we receive a block at - a different level than {current_level}. Note that this means that there is - always a {latest_proposal}, which may be our own baked block. *) + {next_level_latest_forge_request}, {dal_attestable_slots}, are updated only + when we receive a block at a different level than {current_level}. Note that + this means that there is always a {latest_proposal}, which may be our own baked + block. *) type level_state = { current_level : int32; latest_proposal : proposal; @@ -261,8 +261,6 @@ type level_state = { next_level_delegate_infos : delegate_infos; next_level_latest_forge_request : Round.t option; dal_attestable_slots : Dal_attestable_slots_worker.dal_attestable_slots; - next_level_dal_attestable_slots : - Dal_attestable_slots_worker.dal_attestable_slots; } type phase = @@ -1261,7 +1259,6 @@ let pp_level_state fmt next_level_delegate_infos; next_level_latest_forge_request; dal_attestable_slots = _; - next_level_dal_attestable_slots = _; } = Format.fprintf fmt diff --git a/src/proto_alpha/lib_delegate/baking_state.mli b/src/proto_alpha/lib_delegate/baking_state.mli index 51e09292ae45..e75932e0269e 100644 --- a/src/proto_alpha/lib_delegate/baking_state.mli +++ b/src/proto_alpha/lib_delegate/baking_state.mli @@ -320,9 +320,6 @@ type level_state = { dal_attestable_slots : Dal_attestable_slots_worker.dal_attestable_slots; (** For each (own) delegate having a DAL slot at the current level, store a promise to obtain the attestable slots for that level. *) - next_level_dal_attestable_slots : - Dal_attestable_slots_worker.dal_attestable_slots; - (** and similarly for the next level *) } val pp_level_state : Format.formatter -> level_state -> unit diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index 82afe359cc84..84263f46a3d3 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -42,6 +42,18 @@ module Events = struct ("attested_level", Data_encoding.int32) ("delegate_id", Delegate_id.encoding) + let monitor_attestable_slots_failed = + declare_2 + ~section + ~name:"monitor_attestable_slots_failed" + ~level:Error + ~msg: + "Unable to subscribe to monitor attestable slots stream for {delegate} \ + -- {trace}" + ("delegate", Delegate_id.encoding) + ~pp2:Error_monad.pp_print_trace + ("trace", Error_monad.trace_encoding) + let stream_ended = declare_1 ~section @@ -114,9 +126,12 @@ let monitor_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) () in match result with - | Ok result -> return result + | Ok result -> return_ok result | Error trace -> - Lwt.fail_with (Format.asprintf "%a" Error_monad.pp_print_trace trace) + let* () = + Events.(emit monitor_attestable_slots_failed (delegate_id, trace)) + in + Lwt.return_error trace let get_slots_by_delegate state ~attested_level = match Stdlib.Hashtbl.find_opt state.cache attested_level with @@ -237,12 +252,14 @@ let consume_backfill_stream state ~delegate_id stream_handle = let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = let open Lwt_syntax in let* new_streams = - List.map_s - (fun delegate_id -> - let* stream, stopper = - monitor_attestable_slots dal_node_rpc_ctxt ~delegate_id - in - return (delegate_id, {stream; stopper})) + List.fold_left_s + (fun acc delegate_id -> + let* res = monitor_attestable_slots dal_node_rpc_ctxt ~delegate_id in + match res with + | Ok (stream, stopper) -> + return ((delegate_id, {stream; stopper}) :: acc) + | Error _trace -> return acc) + [] delegate_ids_to_add in let* () = diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli index ee827af65301..3f39863df76e 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -34,8 +34,8 @@ open Baking_state_types (** {1 Datatypes} *) (* An association list between delegates and promises for their DAL attestations - at some level (as obtained through the [get_attestable_slots] RPC). See usage - in {!level_state}. *) + at some level (as obtained through the [monitor_attestable_slots] RPC). See usage + in {!Baking_actions}. *) type dal_attestable_slots = (Delegate_id.t * Tezos_dal_node_services.Types.attestable_slots tzresult Lwt.t) @@ -43,18 +43,6 @@ type dal_attestable_slots = type t -(** {1 RPC utilities} *) - -(** [dal_attestable_slots ctxt ~attestation_level delegate_ids] calls the DAL - node RPC GET /profiles//attested_levels//attestable_slots/ - for each of the delegates in [delegate_ids] and returns the corresponding - promises. *) -val dal_attestable_slots : - Tezos_rpc.Context.generic -> - attestation_level:int32 -> - Baking_state_types.Delegate_id.t list -> - dal_attestable_slots - (** {1 Accessors} *) (** [get_dal_attestable_slots t ctxt ~attestation_level delegate_ids] diff --git a/src/proto_alpha/lib_delegate/state_transitions.ml b/src/proto_alpha/lib_delegate/state_transitions.ml index 070bc81e19c0..adec22dcd82a 100644 --- a/src/proto_alpha/lib_delegate/state_transitions.ml +++ b/src/proto_alpha/lib_delegate/state_transitions.ml @@ -426,8 +426,7 @@ let rec handle_proposal ~is_proposal_applied state (new_proposal : proposal) = let* () = Events.(emit new_head_with_increasing_level ()) in let new_level = new_proposal.block.shell.level in let compute_new_state ~current_round ~delegate_infos - ~next_level_delegate_infos ~dal_attestable_slots - ~next_level_dal_attestable_slots = + ~next_level_delegate_infos ~dal_attestable_slots = let round_state = { current_round; @@ -450,7 +449,6 @@ let rec handle_proposal ~is_proposal_applied state (new_proposal : proposal) = next_level_delegate_infos; next_level_latest_forge_request = None; dal_attestable_slots; - next_level_dal_attestable_slots; } in (* recursive call with the up-to-date state to handle the new diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index 3686d6ad43fe..8bed84044753 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -4251,8 +4251,10 @@ let test_baker_registers_profiles _protocol _parameters _cryptobox l1_node Agnostic_baker.create ~dal_node_rpc_endpoint l1_node client ~delegates in let wait_for_attestation_event = - Agnostic_baker.wait_for baker "failed_to_get_attestations.v0" (fun _json -> - Some ()) + Agnostic_baker.wait_for + baker + "monitor_attestable_slots_failed.v0" + (fun _json -> Some ()) in let* () = Agnostic_baker.run baker in let* () = wait_for_attestation_event in -- GitLab From acb8f489ba63e8c0ae829e1eed0e06e6fffee823 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 17:22:32 +0000 Subject: [PATCH 08/19] Alpha/Baker: Update streams subscriptions at profile registration --- src/proto_alpha/lib_delegate/baking_scheduling.ml | 14 ++++++++++++-- src/proto_alpha/lib_delegate/baking_state_types.ml | 2 ++ .../lib_delegate/baking_state_types.mli | 3 +++ .../lib_delegate/dal_attestable_slots_worker.ml | 3 --- .../lib_delegate/dal_attestable_slots_worker.mli | 9 +++++++-- 5 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index 255af53fdbfc..a9719c62f037 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -947,7 +947,8 @@ let try_resolve_consensus_keys cctxt key = in try_find_delegate_key levels_to_inspect -let register_dal_profiles cctxt dal_node_rpc_ctxt delegates = +let register_dal_profiles cctxt dal_node_rpc_ctxt dal_attestable_slots_worker + delegates = let open Lwt_result_syntax in let*! delegates = List.map_s (try_resolve_consensus_keys cctxt) delegates in let register dal_ctxt = @@ -967,7 +968,15 @@ let register_dal_profiles cctxt dal_node_rpc_ctxt delegates = warn () else Lwt.return_unit in - Node_rpc.register_dal_profiles dal_ctxt delegates + let* () = Node_rpc.register_dal_profiles dal_ctxt delegates in + let delegate_ids = List.map Delegate_id.of_pkh delegates in + let*! () = + Dal_attestable_slots_worker.update_streams_subscriptions + dal_attestable_slots_worker + dal_ctxt + delegate_ids + in + return_unit in Option.iter_es (fun dal_ctxt -> @@ -1039,6 +1048,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) register_dal_profiles cctxt initial_state.global_state.dal_node_rpc_ctxt + dal_attestable_slots_worker delegates in let cloned_block_stream = Lwt_stream.clone heads_stream in diff --git a/src/proto_alpha/lib_delegate/baking_state_types.ml b/src/proto_alpha/lib_delegate/baking_state_types.ml index b0d676f9af03..d9aa4883735e 100644 --- a/src/proto_alpha/lib_delegate/baking_state_types.ml +++ b/src/proto_alpha/lib_delegate/baking_state_types.ml @@ -8,6 +8,8 @@ module Key_id = struct type t = Signature.Public_key_hash.t + let of_pkh pkh = pkh + let to_pkh pkh = pkh let compare = Signature.Public_key_hash.compare diff --git a/src/proto_alpha/lib_delegate/baking_state_types.mli b/src/proto_alpha/lib_delegate/baking_state_types.mli index 4d6206ab6055..36522b9697ed 100644 --- a/src/proto_alpha/lib_delegate/baking_state_types.mli +++ b/src/proto_alpha/lib_delegate/baking_state_types.mli @@ -9,6 +9,9 @@ module Key_id : sig type t + (** Only use at library frontiers *) + val of_pkh : Signature.public_key_hash -> t + (** Only use at library frontiers *) val to_pkh : t -> Signature.public_key_hash diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index 84263f46a3d3..3906bc0dbf94 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -246,9 +246,6 @@ let consume_backfill_stream state ~delegate_id stream_handle = Lwt.fail_with "The stream must always start properly with a Backfill event." -(** [subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add] - opens new monitoring DAL streams for each entry in [~delegate_ids_to_add], - and starts consuming from them, to populate the internal cache. *) let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = let open Lwt_syntax in let* new_streams = diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli index 3f39863df76e..b03c4eedf14b 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -45,6 +45,12 @@ type t (** {1 Accessors} *) +(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids] reconciles + the active set of streams using [delegate_ids] by computing the list of streams + to subscribe to. *) +val update_streams_subscriptions : + t -> Tezos_rpc.Context.generic -> Delegate_id.t list -> unit Lwt.t + (** [get_dal_attestable_slots t ctxt ~attestation_level delegate_ids] returns, for each delegate, the current bitset for attested level [~attestation_level] + 1 and its [published_level]. *) @@ -58,8 +64,7 @@ val get_dal_attestable_slots : (** {1 Constructors} *) (** [create ~attestation_lag ~number_of_slots] creates a new worker state. This - does not start any background thread, as streams are opened lazily when - someone asks for attestable slots via [get_dal_attestable_slots]. *) + does not start any background thread, as streams are opened lazily. *) val create : attestation_lag:int -> number_of_slots:int -> t (** [shutdown_worker state] stops all active delegate subscriptions and clears -- GitLab From 1ad7f6b6d29f1ce1a42473757e928c1838c5374a Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 17:39:41 +0000 Subject: [PATCH 09/19] Alpha/Baker: Remove fallback to static RPC for getting attestable slots --- .../dal_attestable_slots_worker.ml | 50 ++++++++----------- .../dal_attestable_slots_worker.mli | 4 +- 2 files changed, 22 insertions(+), 32 deletions(-) diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index 3906bc0dbf94..dc92dd4e8032 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -26,9 +26,7 @@ module Events = struct ~section ~name:"no_attestable_slot_at_level" ~level:Warning - ~msg: - "No attestable slots found at level {attested_level} in cache, falling \ - back to static RPC" + ~msg:"No attestable slots found at level {attested_level} in cache" ("attested_level", Data_encoding.int32) let no_attestable_slot_at_level_for_delegate = @@ -37,8 +35,7 @@ module Events = struct ~name:"no_attestable_slot_at_level_for_delegate" ~level:Warning ~msg: - "No attestable slots found at level {attested_level} for \ - {delegate_id}, falling back to static RPC" + "No attestable slots found at level {attested_level} for {delegate_id}" ("attested_level", Data_encoding.int32) ("delegate_id", Delegate_id.encoding) @@ -94,22 +91,6 @@ type t = { (** Cache of attestable slots, keyed by attested levels. *) } -let get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level = - let pkh = Delegate_id.to_pkh delegate_id in - Tezos_rpc.Context.make_call - Tezos_dal_node_services.Services.get_attestable_slots - dal_node_rpc_ctxt - (((), Tezos_crypto.Signature.Of_V2.public_key_hash pkh), attested_level) - () - () - -let dal_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) - ~attestation_level = - let attested_level = Int32.succ attestation_level in - List.map (fun delegate_id -> - ( delegate_id, - get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level )) - (** [monitor_attestable_slots dal_node_rpc_ctxt ~delegate_id] opens a streamed RPC to the DAL node for the given [~delegate_id]. Each item emitted on the stream contains DAL attestable information for this delegate. *) @@ -318,20 +299,30 @@ let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level let attested_level = Int32.succ attestation_level in let* () = update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids in let* dal_attestable_slots = + let empty_slots = Stdlib.List.init state.number_of_slots (fun _ -> false) in Lwt_mutex.with_lock state.lock @@ fun () -> match Stdlib.Hashtbl.find_opt state.cache attested_level with | None -> let* () = Events.(emit no_attestable_slot_at_level attested_level) in - return - @@ dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level - delegate_ids + let published_level = + Int32.(sub attested_level (of_int state.attestation_lag)) + in + List.map_s + (fun delegate_id -> + return + ( delegate_id, + Lwt_result_syntax.return + @@ Types.Attestable_slots {slots = empty_slots; published_level} + )) + delegate_ids | Some slots_by_delegate -> List.map_s (fun delegate_id -> match Delegate_id.Table.find_opt slots_by_delegate delegate_id with | None -> + let published_level = + Int32.(sub attested_level (of_int state.attestation_lag)) + in let* () = Events.( emit @@ -340,10 +331,9 @@ let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level in return ( delegate_id, - get_attestable_slots - dal_node_rpc_ctxt - delegate_id - ~attested_level ) + Lwt_result_syntax.return + @@ Types.Attestable_slots + {slots = empty_slots; published_level} ) | Some slots -> return (delegate_id, Lwt_result_syntax.return @@ slots)) delegate_ids diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli index b03c4eedf14b..01cc37c33ae5 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -21,8 +21,8 @@ the worker: - reconciles subscriptions with that set (subscribing for new delegates); - prunes cached entries strictly older than L to keep memory bounded; - - returns an immediate "snapshot" from the cache (falling back to the static - RPC to get the attestable slots). + - returns an immediate "snapshot" from the cache (defaulting to an all-false + bitset if nothing has been observed yet). The worker's purpose is to decouple the critical consensus path from DAL RPC latency: streams advance in the background, therefore the cache can serve -- GitLab From 07ed8e950dab5fedc5449d210ba1c32158420247 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 3 Nov 2025 11:00:31 +0000 Subject: [PATCH 10/19] Alpha/Baker: Remove Lwt from dal_attestable_slots type --- .../lib_delegate/baking_actions.ml | 76 ++++++------------- src/proto_alpha/lib_delegate/baking_events.ml | 21 ----- .../dal_attestable_slots_worker.ml | 15 ++-- .../dal_attestable_slots_worker.mli | 4 +- 4 files changed, 31 insertions(+), 85 deletions(-) diff --git a/src/proto_alpha/lib_delegate/baking_actions.ml b/src/proto_alpha/lib_delegate/baking_actions.ml index 08d100bfa6fe..2384d362aef7 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.ml +++ b/src/proto_alpha/lib_delegate/baking_actions.ml @@ -504,42 +504,30 @@ let only_if_dal_feature_enabled state ~default_value f = let process_dal_rpc_result state delegate level round = let open Lwt_result_syntax in function - | `RPC_timeout -> - let*! () = - Events.(emit failed_to_get_dal_attestations_in_time (delegate, level)) - in + | Tezos_dal_node_services.Types.Not_in_committee -> + let*! () = Events.(emit not_in_dal_committee (delegate, level)) in return_none - | `RPC_result (Error errs) -> + | Attestable_slots {slots; published_level} -> + let number_of_slots = + state.global_state.constants.parametric.dal.number_of_slots + in + let dal_attestation = + List.fold_left_i + (fun i acc flag -> + match Dal.Slot_index.of_int_opt ~number_of_slots i with + | Some index when flag -> Dal.Attestation.commit acc index + | None | Some _ -> acc) + Dal.Attestation.empty + slots + in + let dal_content = {attestation = dal_attestation} in let*! () = - Events.(emit failed_to_get_dal_attestations (delegate, errs)) + Events.( + emit + attach_dal_attestation + (delegate, dal_content, published_level, level, round)) in - return_none - | `RPC_result (Ok res) -> ( - match res with - | Tezos_dal_node_services.Types.Not_in_committee -> - let*! () = Events.(emit not_in_dal_committee (delegate, level)) in - return_none - | Attestable_slots {slots; published_level} -> - let number_of_slots = - state.global_state.constants.parametric.dal.number_of_slots - in - let dal_attestation = - List.fold_left_i - (fun i acc flag -> - match Dal.Slot_index.of_int_opt ~number_of_slots i with - | Some index when flag -> Dal.Attestation.commit acc index - | None | Some _ -> acc) - Dal.Attestation.empty - slots - in - let dal_content = {attestation = dal_attestation} in - let*! () = - Events.( - emit - attach_dal_attestation - (delegate, dal_content, published_level, level, round)) - in - return_some dal_content) + return_some dal_content let may_get_dal_content state consensus_vote = let open Lwt_result_syntax in @@ -549,30 +537,16 @@ let may_get_dal_content state consensus_vote = vote_consensus_content.round ) in let delegate_id = Delegate.delegate_id delegate in - let promise_opt = + let attestable_slots_opt = List.assoc_opt ~equal:Delegate_id.equal delegate_id state.level_state.dal_attestable_slots in - match promise_opt with + match attestable_slots_opt with | None -> return_none - | Some promise -> - let*! res = - (* Normally we'd just check the state of the promise and return the - resolved value or an error if the promise is still pending. However, - tests that bake in the past would fail, because there would not be - sufficient time to get the answer from the DAL node. Therefore, we - wait for a bit for the DAL node to provide an answer. *) - Lwt.pick - [ - (let*! () = Lwt_unix.sleep 0.75 in - Lwt.return `RPC_timeout); - (let*! tz_res = promise in - Lwt.return (`RPC_result tz_res)); - ] - in - process_dal_rpc_result state delegate_id level round res + | Some attestable_slots -> + process_dal_rpc_result state delegate_id level round attestable_slots let is_authorized (global_state : global_state) highwatermarks consensus_vote = let {delegate; vote_consensus_content; _} = consensus_vote in diff --git a/src/proto_alpha/lib_delegate/baking_events.ml b/src/proto_alpha/lib_delegate/baking_events.ml index f45222f3ae86..38b8ad57d705 100644 --- a/src/proto_alpha/lib_delegate/baking_events.ml +++ b/src/proto_alpha/lib_delegate/baking_events.ml @@ -835,27 +835,6 @@ module Actions = struct ~pp2:Error_monad.pp_print_trace ("trace", Error_monad.trace_encoding) - let failed_to_get_dal_attestations = - declare_2 - ~section - ~name:"failed_to_get_attestations" - ~level:Error - ~msg:"unable to get DAL attestation for {delegate} -- {trace}" - ("delegate", Delegate_id.encoding) - ~pp2:Error_monad.pp_print_trace - ("trace", Error_monad.trace_encoding) - - let failed_to_get_dal_attestations_in_time = - declare_2 - ~section - ~name:"failed_to_get_attestations_in_time" - ~level:Error - ~msg: - "unable to get DAL attestation for {delegate} in time for attestation \ - level {level}" - ("delegate", Delegate_id.encoding) - ("level", Data_encoding.int32) - let failed_to_inject_consensus_vote = declare_2 ~section diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index dc92dd4e8032..65afcf0aaa6d 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -68,8 +68,7 @@ module Events = struct ("stacktrace", Data_encoding.string) end -type dal_attestable_slots = - (Delegate_id.t * Types.attestable_slots tzresult Lwt.t) list +type dal_attestable_slots = (Delegate_id.t * Types.attestable_slots) list module DelegateSet = Set.Make (Delegate_id) @@ -311,9 +310,7 @@ let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level (fun delegate_id -> return ( delegate_id, - Lwt_result_syntax.return - @@ Types.Attestable_slots {slots = empty_slots; published_level} - )) + Types.Attestable_slots {slots = empty_slots; published_level} )) delegate_ids | Some slots_by_delegate -> List.map_s @@ -331,11 +328,9 @@ let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level in return ( delegate_id, - Lwt_result_syntax.return - @@ Types.Attestable_slots - {slots = empty_slots; published_level} ) - | Some slots -> - return (delegate_id, Lwt_result_syntax.return @@ slots)) + Types.Attestable_slots + {slots = empty_slots; published_level} ) + | Some slots -> return (delegate_id, slots)) delegate_ids in let* () = prune_cache_before state ~prune_level:attestation_level in diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli index 01cc37c33ae5..ea7a313c7edc 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -37,9 +37,7 @@ open Baking_state_types at some level (as obtained through the [monitor_attestable_slots] RPC). See usage in {!Baking_actions}. *) type dal_attestable_slots = - (Delegate_id.t - * Tezos_dal_node_services.Types.attestable_slots tzresult Lwt.t) - list + (Delegate_id.t * Tezos_dal_node_services.Types.attestable_slots) list type t -- GitLab From 6b02b364bfafc1ce7767264a16366bbc95d294d7 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 17:48:09 +0000 Subject: [PATCH 11/19] Alpha/Baker: Remove dal_attestable_slots from level_state --- .../lib_delegate/baking_actions.ml | 29 ++++++++----------- .../lib_delegate/baking_actions.mli | 1 - .../lib_delegate/baking_scheduling.ml | 12 -------- src/proto_alpha/lib_delegate/baking_state.ml | 9 ++---- src/proto_alpha/lib_delegate/baking_state.mli | 3 -- .../lib_delegate/state_transitions.ml | 3 +- 6 files changed, 16 insertions(+), 41 deletions(-) diff --git a/src/proto_alpha/lib_delegate/baking_actions.ml b/src/proto_alpha/lib_delegate/baking_actions.ml index 2384d362aef7..0860f9122eb7 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.ml +++ b/src/proto_alpha/lib_delegate/baking_actions.ml @@ -148,7 +148,6 @@ and level_update = { current_round:Round.t -> delegate_infos:delegate_infos -> next_level_delegate_infos:delegate_infos -> - dal_attestable_slots:Dal_attestable_slots_worker.dal_attestable_slots -> (state * action) Lwt.t; } @@ -537,11 +536,19 @@ let may_get_dal_content state consensus_vote = vote_consensus_content.round ) in let delegate_id = Delegate.delegate_id delegate in + let*! attestable_slots = + only_if_dal_feature_enabled + state + ~default_value:[] + (fun dal_node_rpc_ctxt -> + Dal_attestable_slots_worker.get_dal_attestable_slots + state.global_state.dal_attestable_slots_worker + dal_node_rpc_ctxt + ~attestation_level:level + [delegate_id]) + in let attestable_slots_opt = - List.assoc_opt - ~equal:Delegate_id.equal - delegate_id - state.level_state.dal_attestable_slots + List.assoc_opt ~equal:Delegate_id.equal delegate_id attestable_slots in match attestable_slots_opt with | None -> return_none @@ -1097,23 +1104,11 @@ let update_to_level state level_update = new_level_proposal round_durations [@profiler.record_f {verbosity = Debug} "compute round"]) in - let*! dal_attestable_slots = - only_if_dal_feature_enabled - state - ~default_value:[] - (fun dal_node_rpc_ctxt -> - Dal_attestable_slots_worker.get_dal_attestable_slots - state.global_state.dal_attestable_slots_worker - dal_node_rpc_ctxt - ~attestation_level:new_level - (Delegate_infos.own_delegate_ids delegate_infos)) - in let*! new_state = (compute_new_state ~current_round ~delegate_infos ~next_level_delegate_infos - ~dal_attestable_slots [@profiler.record_s {verbosity = Debug} "compute new state"]) in return new_state diff --git a/src/proto_alpha/lib_delegate/baking_actions.mli b/src/proto_alpha/lib_delegate/baking_actions.mli index 7b036d89ba0b..18ac5a251616 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.mli +++ b/src/proto_alpha/lib_delegate/baking_actions.mli @@ -56,7 +56,6 @@ and level_update = { current_round:Round.t -> delegate_infos:delegate_infos -> next_level_delegate_infos:delegate_infos -> - dal_attestable_slots:Dal_attestable_slots_worker.dal_attestable_slots -> (state * action) Lwt.t; } diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index a9719c62f037..6ba8cbd3da6e 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -695,17 +695,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain else None in let current_level = current_proposal.block.shell.level in - let*! dal_attestable_slots = - Option.fold_s - ~none:[] - ~some:(fun dal_node_rpc_ctxt -> - Dal_attestable_slots_worker.get_dal_attestable_slots - global_state.dal_attestable_slots_worker - dal_node_rpc_ctxt - ~attestation_level:current_level - (Delegate_infos.own_delegate_ids delegate_infos)) - dal_node_rpc_ctxt - in let level_state = { current_level; @@ -718,7 +707,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain delegate_infos; next_level_delegate_infos; next_level_latest_forge_request = None; - dal_attestable_slots; } in let* round_state = diff --git a/src/proto_alpha/lib_delegate/baking_state.ml b/src/proto_alpha/lib_delegate/baking_state.ml index cf12ea253e07..0cdb344f3cf6 100644 --- a/src/proto_alpha/lib_delegate/baking_state.ml +++ b/src/proto_alpha/lib_delegate/baking_state.ml @@ -242,10 +242,9 @@ type prepared_block = { } (* The fields {current_level}, {delegate_infos}, {next_level_delegate_infos}, - {next_level_latest_forge_request}, {dal_attestable_slots}, are updated only - when we receive a block at a different level than {current_level}. Note that - this means that there is always a {latest_proposal}, which may be our own baked - block. *) + {next_level_latest_forge_request}, are updated only when we receive a block at + a different level than {current_level}. Note that this means that there is always + a {latest_proposal}, which may be our own baked block. *) type level_state = { current_level : int32; latest_proposal : proposal; @@ -260,7 +259,6 @@ type level_state = { delegate_infos : delegate_infos; next_level_delegate_infos : delegate_infos; next_level_latest_forge_request : Round.t option; - dal_attestable_slots : Dal_attestable_slots_worker.dal_attestable_slots; } type phase = @@ -1258,7 +1256,6 @@ let pp_level_state fmt delegate_infos; next_level_delegate_infos; next_level_latest_forge_request; - dal_attestable_slots = _; } = Format.fprintf fmt diff --git a/src/proto_alpha/lib_delegate/baking_state.mli b/src/proto_alpha/lib_delegate/baking_state.mli index e75932e0269e..11b5606a333c 100644 --- a/src/proto_alpha/lib_delegate/baking_state.mli +++ b/src/proto_alpha/lib_delegate/baking_state.mli @@ -317,9 +317,6 @@ type level_state = { next_level_latest_forge_request : Round.t option; (** Some if a forge request has been sent for the next level on the given round *) - dal_attestable_slots : Dal_attestable_slots_worker.dal_attestable_slots; - (** For each (own) delegate having a DAL slot at the current level, store - a promise to obtain the attestable slots for that level. *) } val pp_level_state : Format.formatter -> level_state -> unit diff --git a/src/proto_alpha/lib_delegate/state_transitions.ml b/src/proto_alpha/lib_delegate/state_transitions.ml index adec22dcd82a..f58e33fdbd5b 100644 --- a/src/proto_alpha/lib_delegate/state_transitions.ml +++ b/src/proto_alpha/lib_delegate/state_transitions.ml @@ -426,7 +426,7 @@ let rec handle_proposal ~is_proposal_applied state (new_proposal : proposal) = let* () = Events.(emit new_head_with_increasing_level ()) in let new_level = new_proposal.block.shell.level in let compute_new_state ~current_round ~delegate_infos - ~next_level_delegate_infos ~dal_attestable_slots = + ~next_level_delegate_infos = let round_state = { current_round; @@ -448,7 +448,6 @@ let rec handle_proposal ~is_proposal_applied state (new_proposal : proposal) = delegate_infos; next_level_delegate_infos; next_level_latest_forge_request = None; - dal_attestable_slots; } in (* recursive call with the up-to-date state to handle the new -- GitLab From 08683a4b7433ab2e47afd7eccccc37ab09976544 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 28 Oct 2025 12:20:11 +0000 Subject: [PATCH 12/19] Tallinn/Baker: Backport --- .../tool_024_PsD5wVTJ.ml | 6 + .../lib_delegate/baking_actions.ml | 118 ++---- .../lib_delegate/baking_actions.mli | 2 - .../lib_delegate/baking_events.ml | 21 - .../lib_delegate/baking_lib.ml | 6 + .../lib_delegate/baking_scheduling.ml | 49 ++- .../lib_delegate/baking_scheduling.mli | 5 +- .../lib_delegate/baking_state.ml | 26 +- .../lib_delegate/baking_state.mli | 17 +- .../lib_delegate/baking_state_types.ml | 2 + .../lib_delegate/baking_state_types.mli | 3 + .../dal_attestable_slots_worker.ml | 362 ++++++++++++++++++ .../dal_attestable_slots_worker.mli | 71 ++++ .../lib_delegate/node_rpc.ml | 22 -- .../lib_delegate/node_rpc.mli | 10 - .../lib_delegate/state_transitions.ml | 5 +- 16 files changed, 531 insertions(+), 194 deletions(-) create mode 100644 src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.ml create mode 100644 src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.mli diff --git a/devtools/testnet_experiment_tools/tool_024_PsD5wVTJ.ml b/devtools/testnet_experiment_tools/tool_024_PsD5wVTJ.ml index 07d62098f806..4e66ddfffb14 100644 --- a/devtools/testnet_experiment_tools/tool_024_PsD5wVTJ.ml +++ b/devtools/testnet_experiment_tools/tool_024_PsD5wVTJ.ml @@ -262,6 +262,11 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config let*! operation_worker = Operation_worker.run ?monitor_node_operations ~round_durations cctxt in + let dal_attestable_slots_worker = + Dal_attestable_slots_worker.create + ~attestation_lag:constants.parametric.dal.attestation_lag + ~number_of_slots:constants.parametric.dal.number_of_slots + in Baking_scheduling.create_initial_state cctxt ?synchronize @@ -271,6 +276,7 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config operation_worker ~current_proposal round_durations + dal_attestable_slots_worker delegates let compute_current_round_duration round_durations diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_actions.ml b/src/proto_024_PsD5wVTJ/lib_delegate/baking_actions.ml index 0ee00d77bcd1..0860f9122eb7 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_actions.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_actions.ml @@ -148,8 +148,6 @@ and level_update = { current_round:Round.t -> delegate_infos:delegate_infos -> next_level_delegate_infos:delegate_infos -> - dal_attestable_slots:dal_attestable_slots -> - next_level_dal_attestable_slots:dal_attestable_slots -> (state * action) Lwt.t; } @@ -505,42 +503,30 @@ let only_if_dal_feature_enabled state ~default_value f = let process_dal_rpc_result state delegate level round = let open Lwt_result_syntax in function - | `RPC_timeout -> - let*! () = - Events.(emit failed_to_get_dal_attestations_in_time (delegate, level)) - in + | Tezos_dal_node_services.Types.Not_in_committee -> + let*! () = Events.(emit not_in_dal_committee (delegate, level)) in return_none - | `RPC_result (Error errs) -> + | Attestable_slots {slots; published_level} -> + let number_of_slots = + state.global_state.constants.parametric.dal.number_of_slots + in + let dal_attestation = + List.fold_left_i + (fun i acc flag -> + match Dal.Slot_index.of_int_opt ~number_of_slots i with + | Some index when flag -> Dal.Attestation.commit acc index + | None | Some _ -> acc) + Dal.Attestation.empty + slots + in + let dal_content = {attestation = dal_attestation} in let*! () = - Events.(emit failed_to_get_dal_attestations (delegate, errs)) + Events.( + emit + attach_dal_attestation + (delegate, dal_content, published_level, level, round)) in - return_none - | `RPC_result (Ok res) -> ( - match res with - | Tezos_dal_node_services.Types.Not_in_committee -> - let*! () = Events.(emit not_in_dal_committee (delegate, level)) in - return_none - | Attestable_slots {slots; published_level} -> - let number_of_slots = - state.global_state.constants.parametric.dal.number_of_slots - in - let dal_attestation = - List.fold_left_i - (fun i acc flag -> - match Dal.Slot_index.of_int_opt ~number_of_slots i with - | Some index when flag -> Dal.Attestation.commit acc index - | None | Some _ -> acc) - Dal.Attestation.empty - slots - in - let dal_content = {attestation = dal_attestation} in - let*! () = - Events.( - emit - attach_dal_attestation - (delegate, dal_content, published_level, level, round)) - in - return_some dal_content) + return_some dal_content let may_get_dal_content state consensus_vote = let open Lwt_result_syntax in @@ -550,30 +536,24 @@ let may_get_dal_content state consensus_vote = vote_consensus_content.round ) in let delegate_id = Delegate.delegate_id delegate in - let promise_opt = - List.assoc_opt - ~equal:Delegate_id.equal - delegate_id - state.level_state.dal_attestable_slots + let*! attestable_slots = + only_if_dal_feature_enabled + state + ~default_value:[] + (fun dal_node_rpc_ctxt -> + Dal_attestable_slots_worker.get_dal_attestable_slots + state.global_state.dal_attestable_slots_worker + dal_node_rpc_ctxt + ~attestation_level:level + [delegate_id]) + in + let attestable_slots_opt = + List.assoc_opt ~equal:Delegate_id.equal delegate_id attestable_slots in - match promise_opt with + match attestable_slots_opt with | None -> return_none - | Some promise -> - let*! res = - (* Normally we'd just check the state of the promise and return the - resolved value or an error if the promise is still pending. However, - tests that bake in the past would fail, because there would not be - sufficient time to get the answer from the DAL node. Therefore, we - wait for a bit for the DAL node to provide an answer. *) - Lwt.pick - [ - (let*! () = Lwt_unix.sleep 0.75 in - Lwt.return `RPC_timeout); - (let*! tz_res = promise in - Lwt.return (`RPC_result tz_res)); - ] - in - process_dal_rpc_result state delegate_id level round res + | Some attestable_slots -> + process_dal_rpc_result state delegate_id level round attestable_slots let is_authorized (global_state : global_state) highwatermarks consensus_vote = let {delegate; vote_consensus_content; _} = consensus_vote in @@ -1124,35 +1104,11 @@ let update_to_level state level_update = new_level_proposal round_durations [@profiler.record_f {verbosity = Debug} "compute round"]) in - let*! dal_attestable_slots, next_level_dal_attestable_slots = - only_if_dal_feature_enabled - state - ~default_value:([], []) - (fun dal_node_rpc_ctxt -> - let dal_attestable_slots = - if Int32.(new_level = succ state.level_state.current_level) then - state.level_state.next_level_dal_attestable_slots - else - Node_rpc.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:new_level - (Delegate_infos.own_delegates delegate_infos) - in - let next_level_dal_attestable_slots = - Node_rpc.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:(Int32.succ new_level) - (Delegate_infos.own_delegates next_level_delegate_infos) - in - Lwt.return (dal_attestable_slots, next_level_dal_attestable_slots)) - in let*! new_state = (compute_new_state ~current_round ~delegate_infos ~next_level_delegate_infos - ~dal_attestable_slots - ~next_level_dal_attestable_slots [@profiler.record_s {verbosity = Debug} "compute new state"]) in return new_state diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_actions.mli b/src/proto_024_PsD5wVTJ/lib_delegate/baking_actions.mli index 0ab92ab20d2b..18ac5a251616 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_actions.mli +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_actions.mli @@ -56,8 +56,6 @@ and level_update = { current_round:Round.t -> delegate_infos:delegate_infos -> next_level_delegate_infos:delegate_infos -> - dal_attestable_slots:dal_attestable_slots -> - next_level_dal_attestable_slots:dal_attestable_slots -> (state * action) Lwt.t; } diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_events.ml b/src/proto_024_PsD5wVTJ/lib_delegate/baking_events.ml index f45222f3ae86..38b8ad57d705 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_events.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_events.ml @@ -835,27 +835,6 @@ module Actions = struct ~pp2:Error_monad.pp_print_trace ("trace", Error_monad.trace_encoding) - let failed_to_get_dal_attestations = - declare_2 - ~section - ~name:"failed_to_get_attestations" - ~level:Error - ~msg:"unable to get DAL attestation for {delegate} -- {trace}" - ("delegate", Delegate_id.encoding) - ~pp2:Error_monad.pp_print_trace - ("trace", Error_monad.trace_encoding) - - let failed_to_get_dal_attestations_in_time = - declare_2 - ~section - ~name:"failed_to_get_attestations_in_time" - ~level:Error - ~msg: - "unable to get DAL attestation for {delegate} in time for attestation \ - level {level}" - ("delegate", Delegate_id.encoding) - ("level", Data_encoding.int32) - let failed_to_inject_consensus_vote = declare_2 ~section diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml b/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml index 46cba2c0d65a..56c77c164b3c 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml @@ -66,6 +66,11 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool let*! operation_worker = Operation_worker.run ?monitor_node_operations ~round_durations cctxt in + let dal_attestable_slots_worker = + Dal_attestable_slots_worker.create + ~attestation_lag:constants.parametric.dal.attestation_lag + ~number_of_slots:constants.parametric.dal.number_of_slots + in Baking_scheduling.create_initial_state cctxt ?dal_node_rpc_ctxt @@ -74,6 +79,7 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool config operation_worker round_durations + dal_attestable_slots_worker ~current_proposal ~constants delegates diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml index b01c51a65f7e..6ba8cbd3da6e 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml @@ -617,7 +617,7 @@ let create_round_durations constants = (Round.Durations.create ~first_round_duration ~delay_increment_per_round) let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain - config operation_worker round_durations + config operation_worker round_durations dal_attestable_slots_worker ~(current_proposal : Baking_state.proposal) ?constants delegates = let open Lwt_result_syntax in (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7391 @@ -647,6 +647,7 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain constants; round_durations; operation_worker; + dal_attestable_slots_worker; forge_worker_hooks = { push_request = (fun _ -> assert false); @@ -694,26 +695,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain else None in let current_level = current_proposal.block.shell.level in - let dal_attestable_slots = - Option.fold - ~none:[] - ~some:(fun dal_node_rpc_ctxt -> - Node_rpc.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:current_level - (Delegate_infos.own_delegates delegate_infos)) - dal_node_rpc_ctxt - in - let next_level_dal_attestable_slots = - Option.fold - ~none:[] - ~some:(fun dal_node_rpc_ctxt -> - Node_rpc.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:(Int32.succ current_level) - (Delegate_infos.own_delegates next_level_delegate_infos)) - dal_node_rpc_ctxt - in let level_state = { current_level; @@ -726,8 +707,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain delegate_infos; next_level_delegate_infos; next_level_latest_forge_request = None; - dal_attestable_slots; - next_level_dal_attestable_slots; } in let* round_state = @@ -956,7 +935,8 @@ let try_resolve_consensus_keys cctxt key = in try_find_delegate_key levels_to_inspect -let register_dal_profiles cctxt dal_node_rpc_ctxt delegates = +let register_dal_profiles cctxt dal_node_rpc_ctxt dal_attestable_slots_worker + delegates = let open Lwt_result_syntax in let*! delegates = List.map_s (try_resolve_consensus_keys cctxt) delegates in let register dal_ctxt = @@ -976,7 +956,15 @@ let register_dal_profiles cctxt dal_node_rpc_ctxt delegates = warn () else Lwt.return_unit in - Node_rpc.register_dal_profiles dal_ctxt delegates + let* () = Node_rpc.register_dal_profiles dal_ctxt delegates in + let delegate_ids = List.map Delegate_id.of_pkh delegates in + let*! () = + Dal_attestable_slots_worker.update_streams_subscriptions + dal_attestable_slots_worker + dal_ctxt + delegate_ids + in + return_unit in Option.iter_es (fun dal_ctxt -> @@ -1016,10 +1004,19 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) in let*? round_durations = create_round_durations constants in let*! operation_worker = Operation_worker.run ~round_durations cctxt in + let dal_attestable_slots_worker = + Dal_attestable_slots_worker.create + ~attestation_lag:constants.parametric.dal.attestation_lag + ~number_of_slots:constants.parametric.dal.number_of_slots + in Option.iter (fun canceler -> Lwt_canceler.on_cancel canceler (fun () -> let*! _ = Operation_worker.shutdown_worker operation_worker in + let*! () = + Dal_attestable_slots_worker.shutdown_worker + dal_attestable_slots_worker + in Lwt.return_unit)) canceler ; let* initial_state = @@ -1030,6 +1027,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) config operation_worker round_durations + dal_attestable_slots_worker ~current_proposal ~constants delegates @@ -1038,6 +1036,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) register_dal_profiles cctxt initial_state.global_state.dal_node_rpc_ctxt + dal_attestable_slots_worker delegates in let cloned_block_stream = Lwt_stream.clone heads_stream in diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli index 41030065dc63..e3fc81c04a69 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli @@ -131,8 +131,8 @@ val create_loop_state : loop_state (** [create_initial_state context ?synchronize chain baking_configuration - operation_worker current_proposal ?constants consensus_keys] creates an - initial {!Baking_state.t} by initializing a + operation_worker dal_attestable_slots_worker current_proposal ?constants + consensus_keys] creates an initial {!Baking_state.t} by initializing a {!type-Baking_state.global_state}, a {!type-Baking_state.level_state} and a {!type-Baking_state.round_state}. @@ -155,6 +155,7 @@ val create_initial_state : Baking_configuration.t -> Operation_worker.t -> Round.round_durations -> + Dal_attestable_slots_worker.t -> current_proposal:proposal -> ?constants:Constants.t -> Baking_state_types.Key.t list -> diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_state.ml b/src/proto_024_PsD5wVTJ/lib_delegate/baking_state.ml index 76111296ef38..0cdb344f3cf6 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_state.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_state.ml @@ -158,6 +158,12 @@ module Delegate_infos = struct let own_delegates t = t.own_delegates + let own_delegate_ids t = + List.map + (fun delegate_info -> + Baking_state_types.Delegate.delegate_id delegate_info.delegate) + t.own_delegates + let own_round_owner t ~committee_size ~round = let open Result_syntax in let* round_int = Round.to_int round |> Environment.wrap_tzresult in @@ -180,11 +186,6 @@ end type delegate_infos = Delegate_infos.t -type dal_attestable_slots = - (Delegate_id.t - * Tezos_dal_node_services.Types.attestable_slots tzresult Lwt.t) - list - type proposal = {block : block_info; predecessor : block_info} let proposal_encoding = @@ -241,10 +242,9 @@ type prepared_block = { } (* The fields {current_level}, {delegate_infos}, {next_level_delegate_infos}, - {next_level_latest_forge_request}, {dal_attestable_slots}, - {next_level_dal_attestable_slots} are updated only when we receive a block at - a different level than {current_level}. Note that this means that there is - always a {latest_proposal}, which may be our own baked block. *) + {next_level_latest_forge_request}, are updated only when we receive a block at + a different level than {current_level}. Note that this means that there is always + a {latest_proposal}, which may be our own baked block. *) type level_state = { current_level : int32; latest_proposal : proposal; @@ -259,8 +259,6 @@ type level_state = { delegate_infos : delegate_infos; next_level_delegate_infos : delegate_infos; next_level_latest_forge_request : Round.t option; - dal_attestable_slots : dal_attestable_slots; - next_level_dal_attestable_slots : dal_attestable_slots; } type phase = @@ -525,8 +523,10 @@ type global_state = { constants : Constants.t; (* round durations *) round_durations : Round.round_durations; - (* worker that monitor and aggregates new operations *) + (* worker that monitors and aggregates new operations *) operation_worker : Operation_worker.t; + (* worker that retrieves DAL attestable slots from the DAL node *) + dal_attestable_slots_worker : Dal_attestable_slots_worker.t; (* hooks to the consensus and block forge worker *) mutable forge_worker_hooks : forge_worker_hooks; (* the validation mode used by the baker*) @@ -1256,8 +1256,6 @@ let pp_level_state fmt delegate_infos; next_level_delegate_infos; next_level_latest_forge_request; - dal_attestable_slots = _; - next_level_dal_attestable_slots = _; } = Format.fprintf fmt diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_state.mli b/src/proto_024_PsD5wVTJ/lib_delegate/baking_state.mli index 02bee5b64cc8..11b5606a333c 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_state.mli +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_state.mli @@ -45,6 +45,9 @@ module Delegate_infos : sig no duplicates, the associated slot is the first one. *) val own_delegates : t -> delegate_info list + (** Returns the list of the delegate ids from [own_delegates]. *) + val own_delegate_ids : t -> Delegate_id.t list + (** Returns, among our *own* delegates, the delegate (together with its first attesting slot) that owns the given round, if any. *) val own_round_owner : @@ -88,14 +91,6 @@ val compute_delegate_infos : (** {2 Consensus operations types functions} *) -(* An association list between delegates and promises for their DAL attestations - at some level (as obtained through the [get_attestable_slots] RPC). See usage - in {!level_state}. *) -type dal_attestable_slots = - (Delegate_id.t - * Tezos_dal_node_services.Types.attestable_slots tzresult Lwt.t) - list - type consensus_vote_kind = Attestation | Preattestation val consensus_vote_kind_encoding : consensus_vote_kind Data_encoding.t @@ -322,11 +317,6 @@ type level_state = { next_level_latest_forge_request : Round.t option; (** Some if a forge request has been sent for the next level on the given round *) - dal_attestable_slots : dal_attestable_slots; - (** For each (own) delegate having a DAL slot at the current level, store - a promise to obtain the attestable slots for that level. *) - next_level_dal_attestable_slots : dal_attestable_slots; - (** and similarly for the next level *) } val pp_level_state : Format.formatter -> level_state -> unit @@ -473,6 +463,7 @@ type global_state = { constants : Constants.t; round_durations : Round.round_durations; operation_worker : Operation_worker.t; + dal_attestable_slots_worker : Dal_attestable_slots_worker.t; mutable forge_worker_hooks : forge_worker_hooks; validation_mode : validation_mode; delegates : Key.t list; diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_state_types.ml b/src/proto_024_PsD5wVTJ/lib_delegate/baking_state_types.ml index b0d676f9af03..d9aa4883735e 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_state_types.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_state_types.ml @@ -8,6 +8,8 @@ module Key_id = struct type t = Signature.Public_key_hash.t + let of_pkh pkh = pkh + let to_pkh pkh = pkh let compare = Signature.Public_key_hash.compare diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_state_types.mli b/src/proto_024_PsD5wVTJ/lib_delegate/baking_state_types.mli index 4d6206ab6055..36522b9697ed 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_state_types.mli +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_state_types.mli @@ -9,6 +9,9 @@ module Key_id : sig type t + (** Only use at library frontiers *) + val of_pkh : Signature.public_key_hash -> t + (** Only use at library frontiers *) val to_pkh : t -> Signature.public_key_hash diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.ml new file mode 100644 index 000000000000..c6ede9b21c89 --- /dev/null +++ b/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.ml @@ -0,0 +1,362 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +open Baking_state_types +open Tezos_dal_node_services + +module Events = struct + include Internal_event.Simple + + let section = [Protocol.name; "baker"; "attestable_slots_worker"] + + let consumed_backfill_stream = + declare_1 + ~section + ~name:"consumed_backfill_stream" + ~level:Info + ~msg:"Consumed backfill stream for {delegate_id}" + ("delegate_id", Delegate_id.encoding) + + let no_attestable_slot_at_level = + declare_1 + ~section + ~name:"no_attestable_slot_at_level" + ~level:Warning + ~msg:"No attestable slots found at level {attested_level} in cache" + ("attested_level", Data_encoding.int32) + + let no_attestable_slot_at_level_for_delegate = + declare_2 + ~section + ~name:"no_attestable_slot_at_level_for_delegate" + ~level:Warning + ~msg: + "No attestable slots found at level {attested_level} for {delegate_id}" + ("attested_level", Data_encoding.int32) + ("delegate_id", Delegate_id.encoding) + + let monitor_attestable_slots_failed = + declare_2 + ~section + ~name:"monitor_attestable_slots_failed" + ~level:Error + ~msg: + "Unable to subscribe to monitor attestable slots stream for {delegate} \ + -- {trace}" + ("delegate", Delegate_id.encoding) + ~pp2:Error_monad.pp_print_trace + ("trace", Error_monad.trace_encoding) + + let stream_ended = + declare_1 + ~section + ~name:"dal_stream_ended" + ~level:Notice + ~msg:"DAL monitor stream ended for {delegate_id}" + ("delegate_id", Delegate_id.encoding) + + let consume_streams_ended = + declare_1 + ~section + ~name:"consume_streams_ended" + ~level:Error + ~msg:"consume_streams ended with error {stacktrace}" + ("stacktrace", Data_encoding.string) +end + +type dal_attestable_slots = (Delegate_id.t * Types.attestable_slots) list + +module DelegateSet = Set.Make (Delegate_id) + +(** A handle to a single delegate’s DAL moniTypes.Attestable_slots_watcher_table.toring subscription. *) +type stream_handle = { + stream : Types.Attestable_event.t Lwt_stream.t; + stopper : Tezos_rpc.Context.stopper; +} + +type slots_by_delegate = Types.attestable_slots Delegate_id.Table.t + +type t = { + lock : Lwt_mutex.t; (** Global mutex protecting [streams] and [cache]. *) + attestation_lag : int; + number_of_slots : int; + streams : stream_handle Delegate_id.Table.t; + (** Active per-delegate subscriptions. *) + cache : (int32, slots_by_delegate) Stdlib.Hashtbl.t; + (** Cache of attestable slots, keyed by attested levels. *) +} + +(** [monitor_attestable_slots dal_node_rpc_ctxt ~delegate_id] opens a streamed RPC + to the DAL node for the given [~delegate_id]. Each item emitted on the stream contains + DAL attestable information for this delegate. *) +let monitor_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) + ~delegate_id = + let open Lwt_syntax in + let pkh = Delegate_id.to_pkh delegate_id in + let* result = + Tezos_rpc.Context.make_streamed_call + Services.monitor_attestable_slots + dal_node_rpc_ctxt + ((), Tezos_crypto.Signature.Of_V2.public_key_hash pkh) + () + () + in + match result with + | Ok result -> return_ok result + | Error trace -> + let* () = + Events.(emit monitor_attestable_slots_failed (delegate_id, trace)) + in + Lwt.return_error trace + +let get_slots_by_delegate state ~attested_level = + match Stdlib.Hashtbl.find_opt state.cache attested_level with + | Some slots_by_delegate -> slots_by_delegate + | None -> + let slots_by_delegate = Delegate_id.Table.create 10 in + Stdlib.Hashtbl.add state.cache attested_level slots_by_delegate ; + slots_by_delegate + +(** [update_cache_with_attestable_slot ~delegate_id ~slot_id] adds [~slot_id] into the cache, + using the keys [attested_level] = [slot_level] + [attestation_lag] and [~delegate_id]. + The function must be called under a lock, to avoid race conditions. *) +let update_cache_with_attestable_slot state ~delegate_id ~slot_id = + let Types.Slot_id.{slot_level; slot_index} = slot_id in + let attested_level = Int32.(add slot_level (of_int state.attestation_lag)) in + let slots_by_delegate = get_slots_by_delegate state ~attested_level in + let attestable_slots = + match Delegate_id.Table.find_opt slots_by_delegate delegate_id with + | Some (Types.Attestable_slots {slots; published_level}) -> + let slots_array = Array.of_list slots in + slots_array.(slot_id.slot_index) <- true ; + let slots = Array.to_list slots_array in + Types.Attestable_slots {slots; published_level} + | Some Not_in_committee -> + (* We should never reach this point, as the delegate should not have any attestable slot, + as they are not in the committee. *) + Types.Not_in_committee + | None -> + let slots = Array.make state.number_of_slots false in + slots.(slot_index) <- true ; + Types.Attestable_slots + {slots = Array.to_list slots; published_level = slot_level} + in + Delegate_id.Table.replace slots_by_delegate delegate_id attestable_slots + +let update_cache_no_shards_assigned state ~delegate_id ~attested_level = + let slots_by_delegate = get_slots_by_delegate state ~attested_level in + Delegate_id.Table.replace slots_by_delegate delegate_id Types.Not_in_committee + +(** [consume_stream state ~delegate_id stream_handler] consumes + [~delegate_id]'s stream forever, updating the cache accordingly . *) +let rec consume_stream state ~delegate_id stream_handle = + let open Lwt_syntax in + let module E = Types.Attestable_event in + let* attestable_event_opt = Lwt_stream.get stream_handle.stream in + match attestable_event_opt with + | None -> + Lwt_mutex.with_lock state.lock @@ fun () -> + let* () = Events.(emit stream_ended delegate_id) in + Delegate_id.Table.remove state.streams delegate_id ; + return_unit + | Some (E.Attestable_slot {slot_id}) -> + let* () = + Lwt_mutex.with_lock state.lock @@ fun () -> + update_cache_with_attestable_slot state ~delegate_id ~slot_id ; + return_unit + in + consume_stream state ~delegate_id stream_handle + | Some (No_shards_assigned {attestation_level}) -> + let* () = + let attested_level = Int32.succ attestation_level in + Lwt_mutex.with_lock state.lock @@ fun () -> + update_cache_no_shards_assigned state ~delegate_id ~attested_level ; + return_unit + in + consume_stream state ~delegate_id stream_handle + | Some (Slot_has_trap _slot_id) -> + (* In case of a trap, we do not need to update the cache with any information. *) + consume_stream state ~delegate_id stream_handle + | Some (Backfill _backfill_payload) -> + (* This case should never be reachable, since we only backfill once, in a synchronous call. *) + Lwt.fail_with + "Backfill should never be done in consume_streams, as that is an \ + asynchronous function." + +let update_cache_backfill_payload state ~delegate_id ~backfill_payload = + let module E = Types.Attestable_event in + let E.{slot_ids; no_shards_attestation_levels} = backfill_payload in + Lwt_mutex.with_lock state.lock @@ fun () -> + List.iter + (fun slot_id -> + update_cache_with_attestable_slot state ~delegate_id ~slot_id) + slot_ids ; + List.iter + (fun attestation_level -> + let attested_level = Int32.succ attestation_level in + update_cache_no_shards_assigned state ~delegate_id ~attested_level) + no_shards_attestation_levels ; + Lwt.return_unit + +(** [consume_backfill_stream state ~delegate_id stream_handle] consumes the initial [Backfill] + event from a freshly opened DAL monitoring stream. This function is meant to be called + immediatelly after subscribing to the DAL node stream for a [~delegate_id] and before + spawning the asynchronous consumer that handles live events. *) +let consume_backfill_stream state ~delegate_id stream_handle = + let open Lwt_syntax in + let module E = Types.Attestable_event in + let* attestable_event_opt = Lwt_stream.get stream_handle.stream in + match attestable_event_opt with + | Some (E.Backfill {backfill_payload}) -> + let* () = + update_cache_backfill_payload state ~delegate_id ~backfill_payload + in + let* () = Events.(emit consumed_backfill_stream delegate_id) in + return_unit + | None -> + Lwt_mutex.with_lock state.lock @@ fun () -> + let* () = Events.(emit stream_ended delegate_id) in + Delegate_id.Table.remove state.streams delegate_id ; + return_unit + | _ -> + Lwt.fail_with + "The stream must always start properly with a Backfill event." + +let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = + let open Lwt_syntax in + let* new_streams = + List.fold_left_s + (fun acc delegate_id -> + let* res = monitor_attestable_slots dal_node_rpc_ctxt ~delegate_id in + match res with + | Ok (stream, stopper) -> + return ((delegate_id, {stream; stopper}) :: acc) + | Error _trace -> return acc) + [] + delegate_ids_to_add + in + let* () = + Lwt_mutex.with_lock state.lock @@ fun () -> + List.iter + (fun (delegate_id, stream_handle) -> + Delegate_id.Table.add state.streams delegate_id stream_handle) + new_streams ; + return_unit + in + let* () = + List.iter_p + (fun (delegate_id, stream_handle) -> + consume_backfill_stream state ~delegate_id stream_handle) + new_streams + in + List.iter + (fun (delegate_id, stream_handle) -> + Lwt.dont_wait + (fun () -> consume_stream state ~delegate_id stream_handle) + (fun exn -> + Events.( + emit__dont_wait__use_with_care + consume_streams_ended + (Printexc.to_string exn)))) + new_streams ; + return_unit + +(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids] reconciles + the active set of streams using [delegate_ids] by computing the list of streams + to subscribe to. *) +let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids = + let open Lwt_syntax in + let* delegate_ids_to_add = + Lwt_mutex.with_lock state.lock @@ fun () -> + let new_delegate_ids = DelegateSet.of_list delegate_ids in + let current_delegate_ids = + DelegateSet.of_seq @@ Delegate_id.Table.to_seq_keys state.streams + in + return DelegateSet.(elements (diff new_delegate_ids current_delegate_ids)) + in + subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add + +(** [prune_cache_before state ~prune_level] prunes all cache entries before + [~prune_level]. This is called by the public getter once per level, keeping memory + bounded without background maintenance. *) +let prune_cache_before state ~prune_level = + Lwt_mutex.with_lock state.lock @@ fun () -> + Stdlib.Hashtbl.filter_map_inplace + (fun attested_level slots_by_delegate -> + if Int32.compare attested_level prune_level < 0 then None + else Some slots_by_delegate) + state.cache ; + Lwt.return_unit + +let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level + delegate_ids = + let open Lwt_syntax in + let attested_level = Int32.succ attestation_level in + let* () = update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids in + let* dal_attestable_slots = + let empty_slots = Stdlib.List.init state.number_of_slots (fun _ -> false) in + Lwt_mutex.with_lock state.lock @@ fun () -> + match Stdlib.Hashtbl.find_opt state.cache attested_level with + | None -> + let* () = Events.(emit no_attestable_slot_at_level attested_level) in + let published_level = + Int32.(sub attested_level (of_int state.attestation_lag)) + in + List.map_s + (fun delegate_id -> + return + ( delegate_id, + Types.Attestable_slots {slots = empty_slots; published_level} )) + delegate_ids + | Some slots_by_delegate -> + List.map_s + (fun delegate_id -> + match Delegate_id.Table.find_opt slots_by_delegate delegate_id with + | None -> + let published_level = + Int32.(sub attested_level (of_int state.attestation_lag)) + in + let* () = + Events.( + emit + no_attestable_slot_at_level_for_delegate + (attested_level, delegate_id)) + in + return + ( delegate_id, + Types.Attestable_slots + {slots = empty_slots; published_level} ) + | Some slots -> return (delegate_id, slots)) + delegate_ids + in + let* () = prune_cache_before state ~prune_level:attestation_level in + return dal_attestable_slots + +let create ~attestation_lag ~number_of_slots = + { + lock = Lwt_mutex.create (); + attestation_lag; + number_of_slots; + streams = Delegate_id.Table.create 10; + cache = Stdlib.Hashtbl.create 16; + } + +let shutdown_worker state = + let open Lwt_syntax in + let* stoppers = + Lwt_mutex.with_lock state.lock @@ fun () -> + let stoppers = + Delegate_id.Table.to_seq state.streams + |> Seq.map (fun (_delegate_id, {stopper; _}) -> stopper) + |> List.of_seq + in + Delegate_id.Table.clear state.streams ; + Stdlib.Hashtbl.reset state.cache ; + return stoppers + in + List.iter (fun stopper -> stopper ()) stoppers ; + return_unit diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.mli new file mode 100644 index 000000000000..ea7a313c7edc --- /dev/null +++ b/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.mli @@ -0,0 +1,71 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +(** The DAL attestable slots worker keeps per-delegate subscriptions to + the DAL node’s streaming RPC [GET /profiles//monitor/attestable_slots)]. + Each stream emits attestable slots events as soon as a delegate’s shards for a + published level become fully available or when we know the delegate is not in + a particular DAL committee. + + Incoming events are folded into an in-memory cache keyed by attested level + (published_level + attestation_lag) and delegate id. For each (attested_level, + delegate), the worker maintains a boolean bitset of attestable slots. + This cache is filled continuously and independently of the baker’s main + loop so that consensus code never waits on the network. + + When the baker asks for attestable slots at level L for a set of delegates, + the worker: + - reconciles subscriptions with that set (subscribing for new delegates); + - prunes cached entries strictly older than L to keep memory bounded; + - returns an immediate "snapshot" from the cache (defaulting to an all-false + bitset if nothing has been observed yet). + + The worker's purpose is to decouple the critical consensus path from DAL + RPC latency: streams advance in the background, therefore the cache can serve + DAL information instantly. +*) + +open Baking_state_types + +(** {1 Datatypes} *) + +(* An association list between delegates and promises for their DAL attestations + at some level (as obtained through the [monitor_attestable_slots] RPC). See usage + in {!Baking_actions}. *) +type dal_attestable_slots = + (Delegate_id.t * Tezos_dal_node_services.Types.attestable_slots) list + +type t + +(** {1 Accessors} *) + +(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids] reconciles + the active set of streams using [delegate_ids] by computing the list of streams + to subscribe to. *) +val update_streams_subscriptions : + t -> Tezos_rpc.Context.generic -> Delegate_id.t list -> unit Lwt.t + +(** [get_dal_attestable_slots t ctxt ~attestation_level delegate_ids] + returns, for each delegate, the current bitset for attested level + [~attestation_level] + 1 and its [published_level]. *) +val get_dal_attestable_slots : + t -> + Tezos_rpc.Context.generic -> + attestation_level:int32 -> + Delegate_id.t list -> + dal_attestable_slots Lwt.t + +(** {1 Constructors} *) + +(** [create ~attestation_lag ~number_of_slots] creates a new worker state. This + does not start any background thread, as streams are opened lazily. *) +val create : attestation_lag:int -> number_of_slots:int -> t + +(** [shutdown_worker state] stops all active delegate subscriptions and clears + the worker’s in-memory state. The worker will no longer hold any references + to live streams and the cache will become empty. *) +val shutdown_worker : t -> unit Lwt.t diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/node_rpc.ml b/src/proto_024_PsD5wVTJ/lib_delegate/node_rpc.ml index a54bb32cd117..a6f1c7ac554f 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/node_rpc.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/node_rpc.ml @@ -27,7 +27,6 @@ open Protocol open Alpha_context open Baking_cache open Baking_state -open Baking_state_types module Block_services = Block_services.Make (Protocol) (Protocol) module Events = Baking_events.Node_rpc @@ -411,27 +410,6 @@ let fetch_dal_config cctxt = | Error e -> return_error e | Ok dal_config -> return_ok dal_config -let get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level = - let pkh = Delegate_id.to_pkh delegate_id in - Tezos_rpc.Context.make_call - Tezos_dal_node_services.Services.get_attestable_slots - dal_node_rpc_ctxt - (((), Tezos_crypto.Signature.Of_V2.public_key_hash pkh), attested_level) - () - () - -let dal_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) - ~attestation_level delegate_infos = - let attested_level = Int32.succ attestation_level in - List.map - (fun (delegate_info : delegate_info) -> - let delegate_id = - Baking_state_types.Delegate.delegate_id delegate_info.delegate - in - ( delegate_id, - get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level )) - delegate_infos - let get_dal_profiles dal_node_rpc_ctxt = Tezos_rpc.Context.make_call Tezos_dal_node_services.Services.get_profiles diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/node_rpc.mli b/src/proto_024_PsD5wVTJ/lib_delegate/node_rpc.mli index a45fe3666095..13bc982e8670 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/node_rpc.mli +++ b/src/proto_024_PsD5wVTJ/lib_delegate/node_rpc.mli @@ -86,16 +86,6 @@ val await_protocol_activation : val fetch_dal_config : #Protocol_client_context.rpc_context -> Cryptobox.Config.t tzresult Lwt.t -(** [dal_attestable_slots ctxt ~attestation_level delegates_slots] calls the DAL - node RPC GET /profiles//attested_levels//attestable_slots/ - for each of the delegates in [delegate_infos] and returns the corresponding - promises. *) -val dal_attestable_slots : - Tezos_rpc.Context.generic -> - attestation_level:int32 -> - Baking_state.delegate_info list -> - Baking_state.dal_attestable_slots - (** [get_dal_profiles ctxt delegates] calls the DAL node RPC GET /profiles/ to retrieve the DAL node's profiles. *) val get_dal_profiles : diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/state_transitions.ml b/src/proto_024_PsD5wVTJ/lib_delegate/state_transitions.ml index 070bc81e19c0..f58e33fdbd5b 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/state_transitions.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/state_transitions.ml @@ -426,8 +426,7 @@ let rec handle_proposal ~is_proposal_applied state (new_proposal : proposal) = let* () = Events.(emit new_head_with_increasing_level ()) in let new_level = new_proposal.block.shell.level in let compute_new_state ~current_round ~delegate_infos - ~next_level_delegate_infos ~dal_attestable_slots - ~next_level_dal_attestable_slots = + ~next_level_delegate_infos = let round_state = { current_round; @@ -449,8 +448,6 @@ let rec handle_proposal ~is_proposal_applied state (new_proposal : proposal) = delegate_infos; next_level_delegate_infos; next_level_latest_forge_request = None; - dal_attestable_slots; - next_level_dal_attestable_slots; } in (* recursive call with the up-to-date state to handle the new -- GitLab From 43a5d309601e4f1ee07df1d4394ebeebba063bb8 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 28 Oct 2025 12:29:25 +0000 Subject: [PATCH 13/19] Seoul/Baker: Backport --- .../tool_023_PtSeouLo.ml | 6 + .../lib_delegate/baking_actions.ml | 118 ++---- .../lib_delegate/baking_actions.mli | 2 - .../lib_delegate/baking_events.ml | 21 - .../lib_delegate/baking_lib.ml | 6 + .../lib_delegate/baking_scheduling.ml | 49 ++- .../lib_delegate/baking_scheduling.mli | 5 +- .../lib_delegate/baking_state.ml | 26 +- .../lib_delegate/baking_state.mli | 17 +- .../lib_delegate/baking_state_types.ml | 2 + .../lib_delegate/baking_state_types.mli | 3 + .../dal_attestable_slots_worker.ml | 362 ++++++++++++++++++ .../dal_attestable_slots_worker.mli | 71 ++++ .../lib_delegate/node_rpc.ml | 22 -- .../lib_delegate/node_rpc.mli | 10 - .../lib_delegate/state_transitions.ml | 5 +- 16 files changed, 531 insertions(+), 194 deletions(-) create mode 100644 src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.ml create mode 100644 src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.mli diff --git a/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml b/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml index 9f20c8ab86d5..fe23bfdefcb9 100644 --- a/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml +++ b/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml @@ -266,6 +266,11 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config ~constants cctxt in + let dal_attestable_slots_worker = + Dal_attestable_slots_worker.create + ~attestation_lag:constants.parametric.dal.attestation_lag + ~number_of_slots:constants.parametric.dal.number_of_slots + in Baking_scheduling.create_initial_state cctxt ?synchronize @@ -275,6 +280,7 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config ~constants ~current_proposal round_durations + dal_attestable_slots_worker delegates let compute_current_round_duration round_durations diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_actions.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_actions.ml index 010086955f62..3fd9b11ae213 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_actions.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_actions.ml @@ -148,8 +148,6 @@ and level_update = { current_round:Round.t -> delegate_slots:delegate_slots -> next_level_delegate_slots:delegate_slots -> - dal_attestable_slots:dal_attestable_slots -> - next_level_dal_attestable_slots:dal_attestable_slots -> (state * action) Lwt.t; } @@ -513,42 +511,30 @@ let only_if_dal_feature_enabled state ~default_value f = let process_dal_rpc_result state delegate level round = let open Lwt_result_syntax in function - | `RPC_timeout -> - let*! () = - Events.(emit failed_to_get_dal_attestations_in_time (delegate, level)) - in + | Tezos_dal_node_services.Types.Not_in_committee -> + let*! () = Events.(emit not_in_dal_committee (delegate, level)) in return_none - | `RPC_result (Error errs) -> + | Attestable_slots {slots; published_level} -> + let number_of_slots = + state.global_state.constants.parametric.dal.number_of_slots + in + let dal_attestation = + List.fold_left_i + (fun i acc flag -> + match Dal.Slot_index.of_int_opt ~number_of_slots i with + | Some index when flag -> Dal.Attestation.commit acc index + | None | Some _ -> acc) + Dal.Attestation.empty + slots + in + let dal_content = {attestation = dal_attestation} in let*! () = - Events.(emit failed_to_get_dal_attestations (delegate, errs)) + Events.( + emit + attach_dal_attestation + (delegate, dal_content, published_level, level, round)) in - return_none - | `RPC_result (Ok res) -> ( - match res with - | Tezos_dal_node_services.Types.Not_in_committee -> - let*! () = Events.(emit not_in_dal_committee (delegate, level)) in - return_none - | Attestable_slots {slots; published_level} -> - let number_of_slots = - state.global_state.constants.parametric.dal.number_of_slots - in - let dal_attestation = - List.fold_left_i - (fun i acc flag -> - match Dal.Slot_index.of_int_opt ~number_of_slots i with - | Some index when flag -> Dal.Attestation.commit acc index - | None | Some _ -> acc) - Dal.Attestation.empty - slots - in - let dal_content = {attestation = dal_attestation} in - let*! () = - Events.( - emit - attach_dal_attestation - (delegate, dal_content, published_level, level, round)) - in - return_some dal_content) + return_some dal_content let may_get_dal_content state consensus_vote = let open Lwt_result_syntax in @@ -558,30 +544,24 @@ let may_get_dal_content state consensus_vote = vote_consensus_content.round ) in let delegate_id = Delegate.delegate_id delegate in - let promise_opt = - List.assoc_opt - ~equal:Delegate_id.equal - delegate_id - state.level_state.dal_attestable_slots + let*! attestable_slots = + only_if_dal_feature_enabled + state + ~default_value:[] + (fun dal_node_rpc_ctxt -> + Dal_attestable_slots_worker.get_dal_attestable_slots + state.global_state.dal_attestable_slots_worker + dal_node_rpc_ctxt + ~attestation_level:level + [delegate_id]) + in + let attestable_slots_opt = + List.assoc_opt ~equal:Delegate_id.equal delegate_id attestable_slots in - match promise_opt with + match attestable_slots_opt with | None -> return_none - | Some promise -> - let*! res = - (* Normally we'd just check the state of the promise and return the - resolved value or an error if the promise is still pending. However, - tests that bake in the past would fail, because there would not be - sufficient time to get the answer from the DAL node. Therefore, we - wait for a bit for the DAL node to provide an answer. *) - Lwt.pick - [ - (let*! () = Lwt_unix.sleep 0.75 in - Lwt.return `RPC_timeout); - (let*! tz_res = promise in - Lwt.return (`RPC_result tz_res)); - ] - in - process_dal_rpc_result state delegate_id level round res + | Some attestable_slots -> + process_dal_rpc_result state delegate_id level round attestable_slots let is_authorized (global_state : global_state) highwatermarks consensus_vote = let {delegate; vote_consensus_content; _} = consensus_vote in @@ -1125,35 +1105,11 @@ let update_to_level state level_update = new_level_proposal round_durations [@profiler.record_f {verbosity = Debug} "compute round"]) in - let*! dal_attestable_slots, next_level_dal_attestable_slots = - only_if_dal_feature_enabled - state - ~default_value:([], []) - (fun dal_node_rpc_ctxt -> - let dal_attestable_slots = - if Int32.(new_level = succ state.level_state.current_level) then - state.level_state.next_level_dal_attestable_slots - else - Node_rpc.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:new_level - (Delegate_slots.own_delegates delegate_slots) - in - let next_level_dal_attestable_slots = - Node_rpc.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:(Int32.succ new_level) - (Delegate_slots.own_delegates next_level_delegate_slots) - in - Lwt.return (dal_attestable_slots, next_level_dal_attestable_slots)) - in let*! new_state = (compute_new_state ~current_round ~delegate_slots ~next_level_delegate_slots - ~dal_attestable_slots - ~next_level_dal_attestable_slots [@profiler.record_s {verbosity = Debug} "compute new state"]) in return new_state diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_actions.mli b/src/proto_023_PtSeouLo/lib_delegate/baking_actions.mli index 03c9ff3938b2..ca72c0db9989 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_actions.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_actions.mli @@ -56,8 +56,6 @@ and level_update = { current_round:Round.t -> delegate_slots:delegate_slots -> next_level_delegate_slots:delegate_slots -> - dal_attestable_slots:dal_attestable_slots -> - next_level_dal_attestable_slots:dal_attestable_slots -> (state * action) Lwt.t; } diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_events.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_events.ml index 68e2cd46a7b0..f649bce6808d 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_events.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_events.ml @@ -835,27 +835,6 @@ module Actions = struct ~pp2:Error_monad.pp_print_trace ("trace", Error_monad.trace_encoding) - let failed_to_get_dal_attestations = - declare_2 - ~section - ~name:"failed_to_get_attestations" - ~level:Error - ~msg:"unable to get DAL attestation for {delegate} -- {trace}" - ("delegate", Delegate_id.encoding) - ~pp2:Error_monad.pp_print_trace - ("trace", Error_monad.trace_encoding) - - let failed_to_get_dal_attestations_in_time = - declare_2 - ~section - ~name:"failed_to_get_attestations_in_time" - ~level:Error - ~msg: - "unable to get DAL attestation for {delegate} in time for attestation \ - level {level}" - ("delegate", Delegate_id.encoding) - ("level", Data_encoding.int32) - let failed_to_inject_consensus_vote = declare_2 ~section diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml index 7d118ae47b55..2a5da976294e 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml @@ -70,6 +70,11 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool ~round_durations cctxt in + let dal_attestable_slots_worker = + Dal_attestable_slots_worker.create + ~attestation_lag:constants.parametric.dal.attestation_lag + ~number_of_slots:constants.parametric.dal.number_of_slots + in Baking_scheduling.create_initial_state cctxt ?dal_node_rpc_ctxt @@ -78,6 +83,7 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool config operation_worker round_durations + dal_attestable_slots_worker ~current_proposal ~constants delegates diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml index 0474ce4844c1..cf96e4684451 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml @@ -619,7 +619,7 @@ let create_round_durations constants = (Round.Durations.create ~first_round_duration ~delay_increment_per_round) let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain - config operation_worker round_durations + config operation_worker round_durations dal_attestable_slots_worker ~(current_proposal : Baking_state.proposal) ?constants delegates = let open Lwt_result_syntax in (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7391 @@ -649,6 +649,7 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain constants; round_durations; operation_worker; + dal_attestable_slots_worker; forge_worker_hooks = { push_request = (fun _ -> assert false); @@ -696,26 +697,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain else None in let current_level = current_proposal.block.shell.level in - let dal_attestable_slots = - Option.fold - ~none:[] - ~some:(fun dal_node_rpc_ctxt -> - Node_rpc.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:current_level - (Delegate_slots.own_delegates delegate_slots)) - dal_node_rpc_ctxt - in - let next_level_dal_attestable_slots = - Option.fold - ~none:[] - ~some:(fun dal_node_rpc_ctxt -> - Node_rpc.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:(Int32.succ current_level) - (Delegate_slots.own_delegates next_level_delegate_slots)) - dal_node_rpc_ctxt - in let level_state = { current_level; @@ -728,8 +709,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain delegate_slots; next_level_delegate_slots; next_level_latest_forge_request = None; - dal_attestable_slots; - next_level_dal_attestable_slots; } in let* round_state = @@ -948,7 +927,8 @@ let try_resolve_consensus_keys cctxt key = in try_find_delegate_key levels_to_inspect -let register_dal_profiles cctxt dal_node_rpc_ctxt delegates = +let register_dal_profiles cctxt dal_node_rpc_ctxt dal_attestable_slots_worker + delegates = let open Lwt_result_syntax in let*! delegates = List.map_s (try_resolve_consensus_keys cctxt) delegates in let register dal_ctxt = @@ -968,7 +948,15 @@ let register_dal_profiles cctxt dal_node_rpc_ctxt delegates = warn () else Lwt.return_unit in - Node_rpc.register_dal_profiles dal_ctxt delegates + let* () = Node_rpc.register_dal_profiles dal_ctxt delegates in + let delegate_ids = List.map Delegate_id.of_pkh delegates in + let*! () = + Dal_attestable_slots_worker.update_streams_subscriptions + dal_attestable_slots_worker + dal_ctxt + delegate_ids + in + return_unit in Option.iter_es (fun dal_ctxt -> @@ -1010,10 +998,19 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) let*! operation_worker = Operation_worker.run ~constants ~round_durations cctxt in + let dal_attestable_slots_worker = + Dal_attestable_slots_worker.create + ~attestation_lag:constants.parametric.dal.attestation_lag + ~number_of_slots:constants.parametric.dal.number_of_slots + in Option.iter (fun canceler -> Lwt_canceler.on_cancel canceler (fun () -> let*! _ = Operation_worker.shutdown_worker operation_worker in + let*! () = + Dal_attestable_slots_worker.shutdown_worker + dal_attestable_slots_worker + in Lwt.return_unit)) canceler ; let* initial_state = @@ -1024,6 +1021,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) config operation_worker round_durations + dal_attestable_slots_worker ~current_proposal ~constants delegates @@ -1032,6 +1030,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) register_dal_profiles cctxt initial_state.global_state.dal_node_rpc_ctxt + dal_attestable_slots_worker delegates in let cloned_block_stream = Lwt_stream.clone heads_stream in diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli index 41030065dc63..e3fc81c04a69 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli @@ -131,8 +131,8 @@ val create_loop_state : loop_state (** [create_initial_state context ?synchronize chain baking_configuration - operation_worker current_proposal ?constants consensus_keys] creates an - initial {!Baking_state.t} by initializing a + operation_worker dal_attestable_slots_worker current_proposal ?constants + consensus_keys] creates an initial {!Baking_state.t} by initializing a {!type-Baking_state.global_state}, a {!type-Baking_state.level_state} and a {!type-Baking_state.round_state}. @@ -155,6 +155,7 @@ val create_initial_state : Baking_configuration.t -> Operation_worker.t -> Round.round_durations -> + Dal_attestable_slots_worker.t -> current_proposal:proposal -> ?constants:Constants.t -> Baking_state_types.Key.t list -> diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_state.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_state.ml index 0930f965c09d..0fda05b4a5ab 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_state.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_state.ml @@ -155,6 +155,12 @@ module Delegate_slots = struct let own_delegates slots = slots.own_delegates + let own_delegate_ids t = + List.map + (fun delegate_info -> + Baking_state_types.Delegate.delegate_id delegate_info.delegate) + t.own_delegates + let own_slot_owner slots ~slot = SlotMap.find slot slots.own_delegate_slots let find_first_slot_from slots ~slot = @@ -175,11 +181,6 @@ end type delegate_slots = Delegate_slots.t -type dal_attestable_slots = - (Delegate_id.t - * Tezos_dal_node_services.Types.attestable_slots tzresult Lwt.t) - list - type proposal = {block : block_info; predecessor : block_info} let proposal_encoding = @@ -236,10 +237,9 @@ type prepared_block = { } (* The fields {current_level}, {delegate_slots}, {next_level_delegate_slots}, - {next_level_latest_forge_request}, {dal_attestable_slots}, - {next_level_dal_attestable_slots} are updated only when we receive a block at - a different level than {current_level}. Note that this means that there is - always a {latest_proposal}, which may be our own baked block. *) + {next_level_latest_forge_request}, are updated only when we receive a block at + a different level than {current_level}. Note that this means that there is always + a {latest_proposal}, which may be our own baked block. *) type level_state = { current_level : int32; latest_proposal : proposal; @@ -254,8 +254,6 @@ type level_state = { delegate_slots : delegate_slots; next_level_delegate_slots : delegate_slots; next_level_latest_forge_request : Round.t option; - dal_attestable_slots : dal_attestable_slots; - next_level_dal_attestable_slots : dal_attestable_slots; } type phase = @@ -520,8 +518,10 @@ type global_state = { constants : Constants.t; (* round durations *) round_durations : Round.round_durations; - (* worker that monitor and aggregates new operations *) + (* worker that monitors and aggregates new operations *) operation_worker : Operation_worker.t; + (* worker that retrieves DAL attestable slots from the DAL node *) + dal_attestable_slots_worker : Dal_attestable_slots_worker.t; (* hooks to the consensus and block forge worker *) mutable forge_worker_hooks : forge_worker_hooks; (* the validation mode used by the baker*) @@ -1221,8 +1221,6 @@ let pp_level_state fmt delegate_slots; next_level_delegate_slots; next_level_latest_forge_request; - dal_attestable_slots = _; - next_level_dal_attestable_slots = _; } = Format.fprintf fmt diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_state.mli b/src/proto_023_PtSeouLo/lib_delegate/baking_state.mli index 6dbfc7eb446c..7971e5b1527a 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_state.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_state.mli @@ -45,6 +45,9 @@ module Delegate_slots : sig no duplicates, the associated slot is the first one. *) val own_delegates : t -> delegate_slot list + (** Returns the list of the delegate ids from [own_delegates]. *) + val own_delegate_ids : t -> Delegate_id.t list + (** Returns, among our *own* delegates, the delegate (together with its first attesting slot) that owns the given slot, if any (even if the given slot is not the delegate's first slot). *) @@ -86,14 +89,6 @@ val compute_delegate_slots : (** {2 Consensus operations types functions} *) -(* An association list between delegates and promises for their DAL attestations - at some level (as obtained through the [get_attestable_slots] RPC). See usage - in {!level_state}. *) -type dal_attestable_slots = - (Delegate_id.t - * Tezos_dal_node_services.Types.attestable_slots tzresult Lwt.t) - list - type consensus_vote_kind = Attestation | Preattestation val consensus_vote_kind_encoding : consensus_vote_kind Data_encoding.t @@ -320,11 +315,6 @@ type level_state = { next_level_latest_forge_request : Round.t option; (** Some if a forge request has been sent for the next level on the given round *) - dal_attestable_slots : dal_attestable_slots; - (** For each (own) delegate having a DAL slot at the current level, store - a promise to obtain the attestable slots for that level. *) - next_level_dal_attestable_slots : dal_attestable_slots; - (** and similarly for the next level *) } val pp_level_state : Format.formatter -> level_state -> unit @@ -471,6 +461,7 @@ type global_state = { constants : Constants.t; round_durations : Round.round_durations; operation_worker : Operation_worker.t; + dal_attestable_slots_worker : Dal_attestable_slots_worker.t; mutable forge_worker_hooks : forge_worker_hooks; validation_mode : validation_mode; delegates : Key.t list; diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_state_types.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_state_types.ml index 87cb0194d202..45951612aeb4 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_state_types.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_state_types.ml @@ -8,6 +8,8 @@ module Key_id = struct type t = Signature.Public_key_hash.t + let of_pkh pkh = pkh + let to_pkh pkh = pkh let compare = Signature.Public_key_hash.compare diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_state_types.mli b/src/proto_023_PtSeouLo/lib_delegate/baking_state_types.mli index 007e62c55c05..573509bd12b5 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_state_types.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_state_types.mli @@ -9,6 +9,9 @@ module Key_id : sig type t + (** Only use at library frontiers *) + val of_pkh : Signature.public_key_hash -> t + (** Only use at library frontiers *) val to_pkh : t -> Signature.public_key_hash diff --git a/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.ml new file mode 100644 index 000000000000..65afcf0aaa6d --- /dev/null +++ b/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.ml @@ -0,0 +1,362 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +open Baking_state_types +open Tezos_dal_node_services + +module Events = struct + include Internal_event.Simple + + let section = [Protocol.name; "baker"; "attestable_slots_worker"] + + let consumed_backfill_stream = + declare_1 + ~section + ~name:"consumed_backfill_stream" + ~level:Info + ~msg:"Consumed backfill stream for {delegate_id}" + ("delegate_id", Delegate_id.encoding) + + let no_attestable_slot_at_level = + declare_1 + ~section + ~name:"no_attestable_slot_at_level" + ~level:Warning + ~msg:"No attestable slots found at level {attested_level} in cache" + ("attested_level", Data_encoding.int32) + + let no_attestable_slot_at_level_for_delegate = + declare_2 + ~section + ~name:"no_attestable_slot_at_level_for_delegate" + ~level:Warning + ~msg: + "No attestable slots found at level {attested_level} for {delegate_id}" + ("attested_level", Data_encoding.int32) + ("delegate_id", Delegate_id.encoding) + + let monitor_attestable_slots_failed = + declare_2 + ~section + ~name:"monitor_attestable_slots_failed" + ~level:Error + ~msg: + "Unable to subscribe to monitor attestable slots stream for {delegate} \ + -- {trace}" + ("delegate", Delegate_id.encoding) + ~pp2:Error_monad.pp_print_trace + ("trace", Error_monad.trace_encoding) + + let stream_ended = + declare_1 + ~section + ~name:"dal_stream_ended" + ~level:Notice + ~msg:"DAL monitor stream ended for {delegate_id}" + ("delegate_id", Delegate_id.encoding) + + let consume_streams_ended = + declare_1 + ~section + ~name:"consume_streams_ended" + ~level:Error + ~msg:"consume_streams ended with error {stacktrace}" + ("stacktrace", Data_encoding.string) +end + +type dal_attestable_slots = (Delegate_id.t * Types.attestable_slots) list + +module DelegateSet = Set.Make (Delegate_id) + +(** A handle to a single delegate’s DAL monitoring subscription. *) +type stream_handle = { + stream : Types.Attestable_event.t Lwt_stream.t; + stopper : Tezos_rpc.Context.stopper; +} + +type slots_by_delegate = Types.attestable_slots Delegate_id.Table.t + +type t = { + lock : Lwt_mutex.t; (** Global mutex protecting [streams] and [cache]. *) + attestation_lag : int; + number_of_slots : int; + streams : stream_handle Delegate_id.Table.t; + (** Active per-delegate subscriptions. *) + cache : (int32, slots_by_delegate) Stdlib.Hashtbl.t; + (** Cache of attestable slots, keyed by attested levels. *) +} + +(** [monitor_attestable_slots dal_node_rpc_ctxt ~delegate_id] opens a streamed RPC + to the DAL node for the given [~delegate_id]. Each item emitted on the stream contains + DAL attestable information for this delegate. *) +let monitor_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) + ~delegate_id = + let open Lwt_syntax in + let pkh = Delegate_id.to_pkh delegate_id in + let* result = + Tezos_rpc.Context.make_streamed_call + Services.monitor_attestable_slots + dal_node_rpc_ctxt + ((), Tezos_crypto.Signature.Of_V2.public_key_hash pkh) + () + () + in + match result with + | Ok result -> return_ok result + | Error trace -> + let* () = + Events.(emit monitor_attestable_slots_failed (delegate_id, trace)) + in + Lwt.return_error trace + +let get_slots_by_delegate state ~attested_level = + match Stdlib.Hashtbl.find_opt state.cache attested_level with + | Some slots_by_delegate -> slots_by_delegate + | None -> + let slots_by_delegate = Delegate_id.Table.create 10 in + Stdlib.Hashtbl.add state.cache attested_level slots_by_delegate ; + slots_by_delegate + +(** [update_cache_with_attestable_slot ~delegate_id ~slot_id] adds [~slot_id] into the cache, + using the keys [attested_level] = [slot_level] + [attestation_lag] and [~delegate_id]. + The function must be called under a lock, to avoid race conditions. *) +let update_cache_with_attestable_slot state ~delegate_id ~slot_id = + let Types.Slot_id.{slot_level; slot_index} = slot_id in + let attested_level = Int32.(add slot_level (of_int state.attestation_lag)) in + let slots_by_delegate = get_slots_by_delegate state ~attested_level in + let attestable_slots = + match Delegate_id.Table.find_opt slots_by_delegate delegate_id with + | Some (Types.Attestable_slots {slots; published_level}) -> + let slots_array = Array.of_list slots in + slots_array.(slot_id.slot_index) <- true ; + let slots = Array.to_list slots_array in + Types.Attestable_slots {slots; published_level} + | Some Not_in_committee -> + (* We should never reach this point, as the delegate should not have any attestable slot, + as they are not in the committee. *) + Types.Not_in_committee + | None -> + let slots = Array.make state.number_of_slots false in + slots.(slot_index) <- true ; + Types.Attestable_slots + {slots = Array.to_list slots; published_level = slot_level} + in + Delegate_id.Table.replace slots_by_delegate delegate_id attestable_slots + +let update_cache_no_shards_assigned state ~delegate_id ~attested_level = + let slots_by_delegate = get_slots_by_delegate state ~attested_level in + Delegate_id.Table.replace slots_by_delegate delegate_id Types.Not_in_committee + +(** [consume_stream state ~delegate_id stream_handler] consumes + [~delegate_id]'s stream forever, updating the cache accordingly . *) +let rec consume_stream state ~delegate_id stream_handle = + let open Lwt_syntax in + let module E = Types.Attestable_event in + let* attestable_event_opt = Lwt_stream.get stream_handle.stream in + match attestable_event_opt with + | None -> + Lwt_mutex.with_lock state.lock @@ fun () -> + let* () = Events.(emit stream_ended delegate_id) in + Delegate_id.Table.remove state.streams delegate_id ; + return_unit + | Some (E.Attestable_slot {slot_id}) -> + let* () = + Lwt_mutex.with_lock state.lock @@ fun () -> + update_cache_with_attestable_slot state ~delegate_id ~slot_id ; + return_unit + in + consume_stream state ~delegate_id stream_handle + | Some (No_shards_assigned {attestation_level}) -> + let* () = + let attested_level = Int32.succ attestation_level in + Lwt_mutex.with_lock state.lock @@ fun () -> + update_cache_no_shards_assigned state ~delegate_id ~attested_level ; + return_unit + in + consume_stream state ~delegate_id stream_handle + | Some (Slot_has_trap _slot_id) -> + (* In case of a trap, we do not need to update the cache with any information. *) + consume_stream state ~delegate_id stream_handle + | Some (Backfill _backfill_payload) -> + (* This case should never be reachable, since we only backfill once, in a synchronous call. *) + Lwt.fail_with + "Backfill should never be done in consume_streams, as that is an \ + asynchronous function." + +let update_cache_backfill_payload state ~delegate_id ~backfill_payload = + let module E = Types.Attestable_event in + let E.{slot_ids; no_shards_attestation_levels} = backfill_payload in + Lwt_mutex.with_lock state.lock @@ fun () -> + List.iter + (fun slot_id -> + update_cache_with_attestable_slot state ~delegate_id ~slot_id) + slot_ids ; + List.iter + (fun attestation_level -> + let attested_level = Int32.succ attestation_level in + update_cache_no_shards_assigned state ~delegate_id ~attested_level) + no_shards_attestation_levels ; + Lwt.return_unit + +(** [consume_backfill_stream state ~delegate_id stream_handle] consumes the initial [Backfill] + event from a freshly opened DAL monitoring stream. This function is meant to be called + immediatelly after subscribing to the DAL node stream for a [~delegate_id] and before + spawning the asynchronous consumer that handles live events. *) +let consume_backfill_stream state ~delegate_id stream_handle = + let open Lwt_syntax in + let module E = Types.Attestable_event in + let* attestable_event_opt = Lwt_stream.get stream_handle.stream in + match attestable_event_opt with + | Some (E.Backfill {backfill_payload}) -> + let* () = + update_cache_backfill_payload state ~delegate_id ~backfill_payload + in + let* () = Events.(emit consumed_backfill_stream delegate_id) in + return_unit + | None -> + Lwt_mutex.with_lock state.lock @@ fun () -> + let* () = Events.(emit stream_ended delegate_id) in + Delegate_id.Table.remove state.streams delegate_id ; + return_unit + | _ -> + Lwt.fail_with + "The stream must always start properly with a Backfill event." + +let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = + let open Lwt_syntax in + let* new_streams = + List.fold_left_s + (fun acc delegate_id -> + let* res = monitor_attestable_slots dal_node_rpc_ctxt ~delegate_id in + match res with + | Ok (stream, stopper) -> + return ((delegate_id, {stream; stopper}) :: acc) + | Error _trace -> return acc) + [] + delegate_ids_to_add + in + let* () = + Lwt_mutex.with_lock state.lock @@ fun () -> + List.iter + (fun (delegate_id, stream_handle) -> + Delegate_id.Table.add state.streams delegate_id stream_handle) + new_streams ; + return_unit + in + let* () = + List.iter_p + (fun (delegate_id, stream_handle) -> + consume_backfill_stream state ~delegate_id stream_handle) + new_streams + in + List.iter + (fun (delegate_id, stream_handle) -> + Lwt.dont_wait + (fun () -> consume_stream state ~delegate_id stream_handle) + (fun exn -> + Events.( + emit__dont_wait__use_with_care + consume_streams_ended + (Printexc.to_string exn)))) + new_streams ; + return_unit + +(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids] reconciles + the active set of streams using [delegate_ids] by computing the list of streams + to subscribe to. *) +let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids = + let open Lwt_syntax in + let* delegate_ids_to_add = + Lwt_mutex.with_lock state.lock @@ fun () -> + let new_delegate_ids = DelegateSet.of_list delegate_ids in + let current_delegate_ids = + DelegateSet.of_seq @@ Delegate_id.Table.to_seq_keys state.streams + in + return DelegateSet.(elements (diff new_delegate_ids current_delegate_ids)) + in + subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add + +(** [prune_cache_before state ~prune_level] prunes all cache entries before + [~prune_level]. This is called by the public getter once per level, keeping memory + bounded without background maintenance. *) +let prune_cache_before state ~prune_level = + Lwt_mutex.with_lock state.lock @@ fun () -> + Stdlib.Hashtbl.filter_map_inplace + (fun attested_level slots_by_delegate -> + if Int32.compare attested_level prune_level < 0 then None + else Some slots_by_delegate) + state.cache ; + Lwt.return_unit + +let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level + delegate_ids = + let open Lwt_syntax in + let attested_level = Int32.succ attestation_level in + let* () = update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids in + let* dal_attestable_slots = + let empty_slots = Stdlib.List.init state.number_of_slots (fun _ -> false) in + Lwt_mutex.with_lock state.lock @@ fun () -> + match Stdlib.Hashtbl.find_opt state.cache attested_level with + | None -> + let* () = Events.(emit no_attestable_slot_at_level attested_level) in + let published_level = + Int32.(sub attested_level (of_int state.attestation_lag)) + in + List.map_s + (fun delegate_id -> + return + ( delegate_id, + Types.Attestable_slots {slots = empty_slots; published_level} )) + delegate_ids + | Some slots_by_delegate -> + List.map_s + (fun delegate_id -> + match Delegate_id.Table.find_opt slots_by_delegate delegate_id with + | None -> + let published_level = + Int32.(sub attested_level (of_int state.attestation_lag)) + in + let* () = + Events.( + emit + no_attestable_slot_at_level_for_delegate + (attested_level, delegate_id)) + in + return + ( delegate_id, + Types.Attestable_slots + {slots = empty_slots; published_level} ) + | Some slots -> return (delegate_id, slots)) + delegate_ids + in + let* () = prune_cache_before state ~prune_level:attestation_level in + return dal_attestable_slots + +let create ~attestation_lag ~number_of_slots = + { + lock = Lwt_mutex.create (); + attestation_lag; + number_of_slots; + streams = Delegate_id.Table.create 10; + cache = Stdlib.Hashtbl.create 16; + } + +let shutdown_worker state = + let open Lwt_syntax in + let* stoppers = + Lwt_mutex.with_lock state.lock @@ fun () -> + let stoppers = + Delegate_id.Table.to_seq state.streams + |> Seq.map (fun (_delegate_id, {stopper; _}) -> stopper) + |> List.of_seq + in + Delegate_id.Table.clear state.streams ; + Stdlib.Hashtbl.reset state.cache ; + return stoppers + in + List.iter (fun stopper -> stopper ()) stoppers ; + return_unit diff --git a/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.mli new file mode 100644 index 000000000000..ea7a313c7edc --- /dev/null +++ b/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.mli @@ -0,0 +1,71 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +(** The DAL attestable slots worker keeps per-delegate subscriptions to + the DAL node’s streaming RPC [GET /profiles//monitor/attestable_slots)]. + Each stream emits attestable slots events as soon as a delegate’s shards for a + published level become fully available or when we know the delegate is not in + a particular DAL committee. + + Incoming events are folded into an in-memory cache keyed by attested level + (published_level + attestation_lag) and delegate id. For each (attested_level, + delegate), the worker maintains a boolean bitset of attestable slots. + This cache is filled continuously and independently of the baker’s main + loop so that consensus code never waits on the network. + + When the baker asks for attestable slots at level L for a set of delegates, + the worker: + - reconciles subscriptions with that set (subscribing for new delegates); + - prunes cached entries strictly older than L to keep memory bounded; + - returns an immediate "snapshot" from the cache (defaulting to an all-false + bitset if nothing has been observed yet). + + The worker's purpose is to decouple the critical consensus path from DAL + RPC latency: streams advance in the background, therefore the cache can serve + DAL information instantly. +*) + +open Baking_state_types + +(** {1 Datatypes} *) + +(* An association list between delegates and promises for their DAL attestations + at some level (as obtained through the [monitor_attestable_slots] RPC). See usage + in {!Baking_actions}. *) +type dal_attestable_slots = + (Delegate_id.t * Tezos_dal_node_services.Types.attestable_slots) list + +type t + +(** {1 Accessors} *) + +(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids] reconciles + the active set of streams using [delegate_ids] by computing the list of streams + to subscribe to. *) +val update_streams_subscriptions : + t -> Tezos_rpc.Context.generic -> Delegate_id.t list -> unit Lwt.t + +(** [get_dal_attestable_slots t ctxt ~attestation_level delegate_ids] + returns, for each delegate, the current bitset for attested level + [~attestation_level] + 1 and its [published_level]. *) +val get_dal_attestable_slots : + t -> + Tezos_rpc.Context.generic -> + attestation_level:int32 -> + Delegate_id.t list -> + dal_attestable_slots Lwt.t + +(** {1 Constructors} *) + +(** [create ~attestation_lag ~number_of_slots] creates a new worker state. This + does not start any background thread, as streams are opened lazily. *) +val create : attestation_lag:int -> number_of_slots:int -> t + +(** [shutdown_worker state] stops all active delegate subscriptions and clears + the worker’s in-memory state. The worker will no longer hold any references + to live streams and the cache will become empty. *) +val shutdown_worker : t -> unit Lwt.t diff --git a/src/proto_023_PtSeouLo/lib_delegate/node_rpc.ml b/src/proto_023_PtSeouLo/lib_delegate/node_rpc.ml index 558d31e4ceb7..8d17c6386317 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/node_rpc.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/node_rpc.ml @@ -27,7 +27,6 @@ open Protocol open Alpha_context open Baking_cache open Baking_state -open Baking_state_types module Block_services = Block_services.Make (Protocol) (Protocol) module Events = Baking_events.Node_rpc @@ -422,27 +421,6 @@ let fetch_dal_config cctxt = | Error e -> return_error e | Ok dal_config -> return_ok dal_config -let get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level = - let pkh = Delegate_id.to_pkh delegate_id in - Tezos_rpc.Context.make_call - Tezos_dal_node_services.Services.get_attestable_slots - dal_node_rpc_ctxt - (((), Tezos_crypto.Signature.Of_V2.public_key_hash pkh), attested_level) - () - () - -let dal_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) - ~attestation_level delegate_slots = - let attested_level = Int32.succ attestation_level in - List.map - (fun (delegate_slot : delegate_slot) -> - let delegate_id = - Baking_state_types.Delegate.delegate_id delegate_slot.delegate - in - ( delegate_id, - get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level )) - delegate_slots - let get_dal_profiles dal_node_rpc_ctxt = Tezos_rpc.Context.make_call Tezos_dal_node_services.Services.get_profiles diff --git a/src/proto_023_PtSeouLo/lib_delegate/node_rpc.mli b/src/proto_023_PtSeouLo/lib_delegate/node_rpc.mli index e8385790e861..13bc982e8670 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/node_rpc.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/node_rpc.mli @@ -86,16 +86,6 @@ val await_protocol_activation : val fetch_dal_config : #Protocol_client_context.rpc_context -> Cryptobox.Config.t tzresult Lwt.t -(** [dal_attestable_slots ctxt ~attestation_level delegates_slots] calls the DAL - node RPC GET /profiles//attested_levels//attestable_slots/ - for each of the delegates in [delegate_slots] and returns the corresponding - promises. *) -val dal_attestable_slots : - Tezos_rpc.Context.generic -> - attestation_level:int32 -> - Baking_state.delegate_slot list -> - Baking_state.dal_attestable_slots - (** [get_dal_profiles ctxt delegates] calls the DAL node RPC GET /profiles/ to retrieve the DAL node's profiles. *) val get_dal_profiles : diff --git a/src/proto_023_PtSeouLo/lib_delegate/state_transitions.ml b/src/proto_023_PtSeouLo/lib_delegate/state_transitions.ml index f6a35994f084..8bbe820cfcea 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/state_transitions.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/state_transitions.ml @@ -425,8 +425,7 @@ let rec handle_proposal ~is_proposal_applied state (new_proposal : proposal) = let* () = Events.(emit new_head_with_increasing_level ()) in let new_level = new_proposal.block.shell.level in let compute_new_state ~current_round ~delegate_slots - ~next_level_delegate_slots ~dal_attestable_slots - ~next_level_dal_attestable_slots = + ~next_level_delegate_slots = let round_state = { current_round; @@ -448,8 +447,6 @@ let rec handle_proposal ~is_proposal_applied state (new_proposal : proposal) = delegate_slots; next_level_delegate_slots; next_level_latest_forge_request = None; - dal_attestable_slots; - next_level_dal_attestable_slots; } in (* recursive call with the up-to-date state to handle the new -- GitLab From 02de9817af58bf4fd5dd7bd280d94ddbc0dcfd04 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 10 Nov 2025 10:56:49 +0000 Subject: [PATCH 14/19] Dal_node: Increase backfill window to get finalized payload level --- src/lib_dal_node/attestable_slots.ml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index 858beb1afbb8..965294100584 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -186,6 +186,11 @@ let may_notify_not_in_committee ctxt committee ~attestation_level = - `stop = L`, as this is the newest level where we did not have time to obtain the information about the published slots. + We include [L+1] in backfill to cover publications that can occur between sampling + [L] and attaching the stream subscription (cycle boundary race). This keeps the + client's cache consistent even if the first slot was published before the stream + was fully established. + For each level in [start .. stop] (inclusively), we accumulate the attestation status information about each slot id. *) let get_backfill_payload ctxt ~pkh = @@ -197,9 +202,11 @@ let get_backfill_payload ctxt ~pkh = get_attestation_lag ctxt ~level:last_finalized_level in let published_levels = - let count = Int32.(to_int @@ min last_finalized_level attestation_lag) in + let count = + Int32.(to_int @@ min last_finalized_level attestation_lag) + 1 + in Stdlib.List.init count (fun i -> - Int32.(sub last_finalized_level (of_int i))) + Int32.(sub (succ last_finalized_level) (of_int i))) |> List.rev in List.fold_left_es -- GitLab From 3564bb5b29a84118b6c4815d47ef7a018141b738 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 10 Nov 2025 11:00:35 +0000 Subject: [PATCH 15/19] Dal_node: Yield Backfill first on subscribe to fix post-migration race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ensures that every new DAL attestable-slots subscriber receives a Backfill as the very first stream item. We implement this by wrapping the subscriber’s `next()` so it emits `Attestable_event.Backfill` once (built from `get_backfill_payload`), then delegates to the live watcher stream. This is because after a protocol migration, per-delegate streams are torn down and rebuilt. The baker re-subscribes, but sometimes consumes live items before Backfill, leaving the worker cache empty and skipping attestation rights. --- src/lib_dal_node/attestable_slots.ml | 24 ++++++++++--------- .../attestable_slots_watcher_table.ml | 6 ----- .../attestable_slots_watcher_table.mli | 8 ------- 3 files changed, 13 insertions(+), 25 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index 965294100584..84b8a977df8e 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -263,18 +263,20 @@ let subscribe ctxt ~pkh = in let watcher = T.get_or_init attestable_slots_watcher_table pkh proto_params in let stream, stopper = Lwt_watcher.create_stream (T.get_stream watcher) in - let* () = - let* backfill_payload = get_backfill_payload ctxt ~pkh in - match backfill_payload with - | Ok backfill_payload -> - T.notify_backfill_payload - attestable_slots_watcher_table - pkh - ~backfill_payload ; - return_unit - | Error error -> Event.emit_backfill_error ~error + let sent_backfill = ref false in + let next () = + let module E = Types.Attestable_event in + if not !sent_backfill then ( + sent_backfill := true ; + let* backfill_payload = get_backfill_payload ctxt ~pkh in + match backfill_payload with + | Ok backfill_payload -> return (Some (E.Backfill {backfill_payload})) + | Error error -> + let* () = Event.emit_backfill_error ~error in + let empty = E.{slot_ids = []; no_shards_attestation_levels = []} in + return (Some (E.Backfill {backfill_payload = empty}))) + else Lwt_stream.get stream in - let next () = Lwt_stream.get stream in let shutdown () = (* stop this stream, then possibly remove the whole watcher if last subscriber *) Lwt_watcher.shutdown stopper ; diff --git a/src/lib_dal_node/attestable_slots_watcher_table.ml b/src/lib_dal_node/attestable_slots_watcher_table.ml index c995df8095ca..2ee4d39e1222 100644 --- a/src/lib_dal_node/attestable_slots_watcher_table.ml +++ b/src/lib_dal_node/attestable_slots_watcher_table.ml @@ -79,12 +79,6 @@ let notify_slot_has_trap t pkh ~slot_id = SlotIdSet.add watcher.notified_slots slot_id ; Lwt_watcher.notify watcher.stream (Slot_has_trap {slot_id})) -let notify_backfill_payload t pkh ~backfill_payload = - match Signature.Public_key_hash.Table.find t pkh with - | None -> () - | Some watcher -> - Lwt_watcher.notify watcher.stream (Backfill {backfill_payload}) - let remove = Signature.Public_key_hash.Table.remove let elements = Signature.Public_key_hash.Table.to_seq_keys diff --git a/src/lib_dal_node/attestable_slots_watcher_table.mli b/src/lib_dal_node/attestable_slots_watcher_table.mli index 9900f9a3711e..f61f4e346234 100644 --- a/src/lib_dal_node/attestable_slots_watcher_table.mli +++ b/src/lib_dal_node/attestable_slots_watcher_table.mli @@ -46,14 +46,6 @@ val notify_no_shards_assigned : val notify_slot_has_trap : t -> Signature.public_key_hash -> slot_id:slot_id -> unit -(** [notify_backfill_payload t pkh ~backfill_payload] pushes a [Backfill] event for - [~backfill_payload] to the stream for [pkh], if present. *) -val notify_backfill_payload : - t -> - Signature.public_key_hash -> - backfill_payload:Attestable_event.backfill_payload -> - unit - (** [remove t pkh] removes the watcher entry for [pkh] from [t] if present. *) val remove : t -> Signature.public_key_hash -> unit -- GitLab From 36514c6667379a93194cdb1166063fb563faf7ba Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Fri, 31 Oct 2025 15:00:11 +0000 Subject: [PATCH 16/19] Baker: Attestable slots worker cache key becomes attestation level --- .../dal_attestable_slots_worker.ml | 55 +++++++++++-------- .../dal_attestable_slots_worker.mli | 8 +-- .../dal_attestable_slots_worker.ml | 55 +++++++++++-------- .../dal_attestable_slots_worker.mli | 8 +-- .../dal_attestable_slots_worker.ml | 55 +++++++++++-------- .../dal_attestable_slots_worker.mli | 8 +-- 6 files changed, 105 insertions(+), 84 deletions(-) diff --git a/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.ml index 65afcf0aaa6d..64d4b5a8de86 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.ml @@ -26,8 +26,10 @@ module Events = struct ~section ~name:"no_attestable_slot_at_level" ~level:Warning - ~msg:"No attestable slots found at level {attested_level} in cache" - ("attested_level", Data_encoding.int32) + ~msg: + "No attestable slots found for attestation level {attestation_level} \ + in cache" + ("attestation_level", Data_encoding.int32) let no_attestable_slot_at_level_for_delegate = declare_2 @@ -35,8 +37,9 @@ module Events = struct ~name:"no_attestable_slot_at_level_for_delegate" ~level:Warning ~msg: - "No attestable slots found at level {attested_level} for {delegate_id}" - ("attested_level", Data_encoding.int32) + "No attestable slots found for level {attestation_level} and \ + {delegate_id}" + ("attestation_level", Data_encoding.int32) ("delegate_id", Delegate_id.encoding) let monitor_attestable_slots_failed = @@ -113,21 +116,23 @@ let monitor_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) in Lwt.return_error trace -let get_slots_by_delegate state ~attested_level = - match Stdlib.Hashtbl.find_opt state.cache attested_level with +let get_slots_by_delegate state ~attestation_level = + match Stdlib.Hashtbl.find_opt state.cache attestation_level with | Some slots_by_delegate -> slots_by_delegate | None -> let slots_by_delegate = Delegate_id.Table.create 10 in - Stdlib.Hashtbl.add state.cache attested_level slots_by_delegate ; + Stdlib.Hashtbl.add state.cache attestation_level slots_by_delegate ; slots_by_delegate (** [update_cache_with_attestable_slot ~delegate_id ~slot_id] adds [~slot_id] into the cache, - using the keys [attested_level] = [slot_level] + [attestation_lag] and [~delegate_id]. + using the keys [attestation_level] = [slot_level] + [attestation_lag] - 1 and [~delegate_id]. The function must be called under a lock, to avoid race conditions. *) let update_cache_with_attestable_slot state ~delegate_id ~slot_id = let Types.Slot_id.{slot_level; slot_index} = slot_id in - let attested_level = Int32.(add slot_level (of_int state.attestation_lag)) in - let slots_by_delegate = get_slots_by_delegate state ~attested_level in + let attestation_level = + Int32.(pred @@ add slot_level (of_int state.attestation_lag)) + in + let slots_by_delegate = get_slots_by_delegate state ~attestation_level in let attestable_slots = match Delegate_id.Table.find_opt slots_by_delegate delegate_id with | Some (Types.Attestable_slots {slots; published_level}) -> @@ -147,8 +152,12 @@ let update_cache_with_attestable_slot state ~delegate_id ~slot_id = in Delegate_id.Table.replace slots_by_delegate delegate_id attestable_slots -let update_cache_no_shards_assigned state ~delegate_id ~attested_level = - let slots_by_delegate = get_slots_by_delegate state ~attested_level in +(** [update_cache_no_shards_assigned state ~delegate_id ~attestation_level] adds a + [Not_in_committee] element into the cache, using [~delegate_id] and + [~attestation_level] as keys. The function must be called under a lock, to avoid race + conditions. *) +let update_cache_no_shards_assigned state ~delegate_id ~attestation_level = + let slots_by_delegate = get_slots_by_delegate state ~attestation_level in Delegate_id.Table.replace slots_by_delegate delegate_id Types.Not_in_committee (** [consume_stream state ~delegate_id stream_handler] consumes @@ -172,9 +181,8 @@ let rec consume_stream state ~delegate_id stream_handle = consume_stream state ~delegate_id stream_handle | Some (No_shards_assigned {attestation_level}) -> let* () = - let attested_level = Int32.succ attestation_level in Lwt_mutex.with_lock state.lock @@ fun () -> - update_cache_no_shards_assigned state ~delegate_id ~attested_level ; + update_cache_no_shards_assigned state ~delegate_id ~attestation_level ; return_unit in consume_stream state ~delegate_id stream_handle @@ -197,8 +205,7 @@ let update_cache_backfill_payload state ~delegate_id ~backfill_payload = slot_ids ; List.iter (fun attestation_level -> - let attested_level = Int32.succ attestation_level in - update_cache_no_shards_assigned state ~delegate_id ~attested_level) + update_cache_no_shards_assigned state ~delegate_id ~attestation_level) no_shards_attestation_levels ; Lwt.return_unit @@ -286,8 +293,8 @@ let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids = let prune_cache_before state ~prune_level = Lwt_mutex.with_lock state.lock @@ fun () -> Stdlib.Hashtbl.filter_map_inplace - (fun attested_level slots_by_delegate -> - if Int32.compare attested_level prune_level < 0 then None + (fun attestation_level slots_by_delegate -> + if Int32.compare attestation_level prune_level < 0 then None else Some slots_by_delegate) state.cache ; Lwt.return_unit @@ -295,16 +302,15 @@ let prune_cache_before state ~prune_level = let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level delegate_ids = let open Lwt_syntax in - let attested_level = Int32.succ attestation_level in let* () = update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids in let* dal_attestable_slots = let empty_slots = Stdlib.List.init state.number_of_slots (fun _ -> false) in Lwt_mutex.with_lock state.lock @@ fun () -> - match Stdlib.Hashtbl.find_opt state.cache attested_level with + match Stdlib.Hashtbl.find_opt state.cache attestation_level with | None -> - let* () = Events.(emit no_attestable_slot_at_level attested_level) in + let* () = Events.(emit no_attestable_slot_at_level attestation_level) in let published_level = - Int32.(sub attested_level (of_int state.attestation_lag)) + Int32.(succ @@ sub attestation_level (of_int state.attestation_lag)) in List.map_s (fun delegate_id -> @@ -318,13 +324,14 @@ let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level match Delegate_id.Table.find_opt slots_by_delegate delegate_id with | None -> let published_level = - Int32.(sub attested_level (of_int state.attestation_lag)) + Int32.( + succ @@ sub attestation_level (of_int state.attestation_lag)) in let* () = Events.( emit no_attestable_slot_at_level_for_delegate - (attested_level, delegate_id)) + (attestation_level, delegate_id)) in return ( delegate_id, diff --git a/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.mli index ea7a313c7edc..63160024ff06 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.mli @@ -11,8 +11,8 @@ published level become fully available or when we know the delegate is not in a particular DAL committee. - Incoming events are folded into an in-memory cache keyed by attested level - (published_level + attestation_lag) and delegate id. For each (attested_level, + Incoming events are folded into an in-memory cache keyed by attestation level + (published_level + attestation_lag - 1) and delegate id. For each (attestation_level, delegate), the worker maintains a boolean bitset of attestable slots. This cache is filled continuously and independently of the baker’s main loop so that consensus code never waits on the network. @@ -50,8 +50,8 @@ val update_streams_subscriptions : t -> Tezos_rpc.Context.generic -> Delegate_id.t list -> unit Lwt.t (** [get_dal_attestable_slots t ctxt ~attestation_level delegate_ids] - returns, for each delegate, the current bitset for attested level - [~attestation_level] + 1 and its [published_level]. *) + returns, for each delegate in [delegate_ids], the current bitset for + [published_level] derived from [~attestation_level]. *) val get_dal_attestable_slots : t -> Tezos_rpc.Context.generic -> diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.ml index c6ede9b21c89..ef0b3e4ef5b7 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.ml @@ -26,8 +26,10 @@ module Events = struct ~section ~name:"no_attestable_slot_at_level" ~level:Warning - ~msg:"No attestable slots found at level {attested_level} in cache" - ("attested_level", Data_encoding.int32) + ~msg: + "No attestable slots found for attestation level {attestation_level} \ + in cache" + ("attestation_level", Data_encoding.int32) let no_attestable_slot_at_level_for_delegate = declare_2 @@ -35,8 +37,9 @@ module Events = struct ~name:"no_attestable_slot_at_level_for_delegate" ~level:Warning ~msg: - "No attestable slots found at level {attested_level} for {delegate_id}" - ("attested_level", Data_encoding.int32) + "No attestable slots found for level {attestation_level} and \ + {delegate_id}" + ("attestation_level", Data_encoding.int32) ("delegate_id", Delegate_id.encoding) let monitor_attestable_slots_failed = @@ -113,21 +116,23 @@ let monitor_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) in Lwt.return_error trace -let get_slots_by_delegate state ~attested_level = - match Stdlib.Hashtbl.find_opt state.cache attested_level with +let get_slots_by_delegate state ~attestation_level = + match Stdlib.Hashtbl.find_opt state.cache attestation_level with | Some slots_by_delegate -> slots_by_delegate | None -> let slots_by_delegate = Delegate_id.Table.create 10 in - Stdlib.Hashtbl.add state.cache attested_level slots_by_delegate ; + Stdlib.Hashtbl.add state.cache attestation_level slots_by_delegate ; slots_by_delegate (** [update_cache_with_attestable_slot ~delegate_id ~slot_id] adds [~slot_id] into the cache, - using the keys [attested_level] = [slot_level] + [attestation_lag] and [~delegate_id]. + using the keys [attestation_level] = [slot_level] + [attestation_lag] - 1 and [~delegate_id]. The function must be called under a lock, to avoid race conditions. *) let update_cache_with_attestable_slot state ~delegate_id ~slot_id = let Types.Slot_id.{slot_level; slot_index} = slot_id in - let attested_level = Int32.(add slot_level (of_int state.attestation_lag)) in - let slots_by_delegate = get_slots_by_delegate state ~attested_level in + let attestation_level = + Int32.(pred @@ add slot_level (of_int state.attestation_lag)) + in + let slots_by_delegate = get_slots_by_delegate state ~attestation_level in let attestable_slots = match Delegate_id.Table.find_opt slots_by_delegate delegate_id with | Some (Types.Attestable_slots {slots; published_level}) -> @@ -147,8 +152,12 @@ let update_cache_with_attestable_slot state ~delegate_id ~slot_id = in Delegate_id.Table.replace slots_by_delegate delegate_id attestable_slots -let update_cache_no_shards_assigned state ~delegate_id ~attested_level = - let slots_by_delegate = get_slots_by_delegate state ~attested_level in +(** [update_cache_no_shards_assigned state ~delegate_id ~attestation_level] adds a + [Not_in_committee] element into the cache, using [~delegate_id] and + [~attestation_level] as keys. The function must be called under a lock, to avoid race + conditions. *) +let update_cache_no_shards_assigned state ~delegate_id ~attestation_level = + let slots_by_delegate = get_slots_by_delegate state ~attestation_level in Delegate_id.Table.replace slots_by_delegate delegate_id Types.Not_in_committee (** [consume_stream state ~delegate_id stream_handler] consumes @@ -172,9 +181,8 @@ let rec consume_stream state ~delegate_id stream_handle = consume_stream state ~delegate_id stream_handle | Some (No_shards_assigned {attestation_level}) -> let* () = - let attested_level = Int32.succ attestation_level in Lwt_mutex.with_lock state.lock @@ fun () -> - update_cache_no_shards_assigned state ~delegate_id ~attested_level ; + update_cache_no_shards_assigned state ~delegate_id ~attestation_level ; return_unit in consume_stream state ~delegate_id stream_handle @@ -197,8 +205,7 @@ let update_cache_backfill_payload state ~delegate_id ~backfill_payload = slot_ids ; List.iter (fun attestation_level -> - let attested_level = Int32.succ attestation_level in - update_cache_no_shards_assigned state ~delegate_id ~attested_level) + update_cache_no_shards_assigned state ~delegate_id ~attestation_level) no_shards_attestation_levels ; Lwt.return_unit @@ -286,8 +293,8 @@ let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids = let prune_cache_before state ~prune_level = Lwt_mutex.with_lock state.lock @@ fun () -> Stdlib.Hashtbl.filter_map_inplace - (fun attested_level slots_by_delegate -> - if Int32.compare attested_level prune_level < 0 then None + (fun attestation_level slots_by_delegate -> + if Int32.compare attestation_level prune_level < 0 then None else Some slots_by_delegate) state.cache ; Lwt.return_unit @@ -295,16 +302,15 @@ let prune_cache_before state ~prune_level = let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level delegate_ids = let open Lwt_syntax in - let attested_level = Int32.succ attestation_level in let* () = update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids in let* dal_attestable_slots = let empty_slots = Stdlib.List.init state.number_of_slots (fun _ -> false) in Lwt_mutex.with_lock state.lock @@ fun () -> - match Stdlib.Hashtbl.find_opt state.cache attested_level with + match Stdlib.Hashtbl.find_opt state.cache attestation_level with | None -> - let* () = Events.(emit no_attestable_slot_at_level attested_level) in + let* () = Events.(emit no_attestable_slot_at_level attestation_level) in let published_level = - Int32.(sub attested_level (of_int state.attestation_lag)) + Int32.(succ @@ sub attestation_level (of_int state.attestation_lag)) in List.map_s (fun delegate_id -> @@ -318,13 +324,14 @@ let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level match Delegate_id.Table.find_opt slots_by_delegate delegate_id with | None -> let published_level = - Int32.(sub attested_level (of_int state.attestation_lag)) + Int32.( + succ @@ sub attestation_level (of_int state.attestation_lag)) in let* () = Events.( emit no_attestable_slot_at_level_for_delegate - (attested_level, delegate_id)) + (attestation_level, delegate_id)) in return ( delegate_id, diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.mli index ea7a313c7edc..63160024ff06 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.mli @@ -11,8 +11,8 @@ published level become fully available or when we know the delegate is not in a particular DAL committee. - Incoming events are folded into an in-memory cache keyed by attested level - (published_level + attestation_lag) and delegate id. For each (attested_level, + Incoming events are folded into an in-memory cache keyed by attestation level + (published_level + attestation_lag - 1) and delegate id. For each (attestation_level, delegate), the worker maintains a boolean bitset of attestable slots. This cache is filled continuously and independently of the baker’s main loop so that consensus code never waits on the network. @@ -50,8 +50,8 @@ val update_streams_subscriptions : t -> Tezos_rpc.Context.generic -> Delegate_id.t list -> unit Lwt.t (** [get_dal_attestable_slots t ctxt ~attestation_level delegate_ids] - returns, for each delegate, the current bitset for attested level - [~attestation_level] + 1 and its [published_level]. *) + returns, for each delegate in [delegate_ids], the current bitset for + [published_level] derived from [~attestation_level]. *) val get_dal_attestable_slots : t -> Tezos_rpc.Context.generic -> diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index 65afcf0aaa6d..64d4b5a8de86 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -26,8 +26,10 @@ module Events = struct ~section ~name:"no_attestable_slot_at_level" ~level:Warning - ~msg:"No attestable slots found at level {attested_level} in cache" - ("attested_level", Data_encoding.int32) + ~msg: + "No attestable slots found for attestation level {attestation_level} \ + in cache" + ("attestation_level", Data_encoding.int32) let no_attestable_slot_at_level_for_delegate = declare_2 @@ -35,8 +37,9 @@ module Events = struct ~name:"no_attestable_slot_at_level_for_delegate" ~level:Warning ~msg: - "No attestable slots found at level {attested_level} for {delegate_id}" - ("attested_level", Data_encoding.int32) + "No attestable slots found for level {attestation_level} and \ + {delegate_id}" + ("attestation_level", Data_encoding.int32) ("delegate_id", Delegate_id.encoding) let monitor_attestable_slots_failed = @@ -113,21 +116,23 @@ let monitor_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) in Lwt.return_error trace -let get_slots_by_delegate state ~attested_level = - match Stdlib.Hashtbl.find_opt state.cache attested_level with +let get_slots_by_delegate state ~attestation_level = + match Stdlib.Hashtbl.find_opt state.cache attestation_level with | Some slots_by_delegate -> slots_by_delegate | None -> let slots_by_delegate = Delegate_id.Table.create 10 in - Stdlib.Hashtbl.add state.cache attested_level slots_by_delegate ; + Stdlib.Hashtbl.add state.cache attestation_level slots_by_delegate ; slots_by_delegate (** [update_cache_with_attestable_slot ~delegate_id ~slot_id] adds [~slot_id] into the cache, - using the keys [attested_level] = [slot_level] + [attestation_lag] and [~delegate_id]. + using the keys [attestation_level] = [slot_level] + [attestation_lag] - 1 and [~delegate_id]. The function must be called under a lock, to avoid race conditions. *) let update_cache_with_attestable_slot state ~delegate_id ~slot_id = let Types.Slot_id.{slot_level; slot_index} = slot_id in - let attested_level = Int32.(add slot_level (of_int state.attestation_lag)) in - let slots_by_delegate = get_slots_by_delegate state ~attested_level in + let attestation_level = + Int32.(pred @@ add slot_level (of_int state.attestation_lag)) + in + let slots_by_delegate = get_slots_by_delegate state ~attestation_level in let attestable_slots = match Delegate_id.Table.find_opt slots_by_delegate delegate_id with | Some (Types.Attestable_slots {slots; published_level}) -> @@ -147,8 +152,12 @@ let update_cache_with_attestable_slot state ~delegate_id ~slot_id = in Delegate_id.Table.replace slots_by_delegate delegate_id attestable_slots -let update_cache_no_shards_assigned state ~delegate_id ~attested_level = - let slots_by_delegate = get_slots_by_delegate state ~attested_level in +(** [update_cache_no_shards_assigned state ~delegate_id ~attestation_level] adds a + [Not_in_committee] element into the cache, using [~delegate_id] and + [~attestation_level] as keys. The function must be called under a lock, to avoid race + conditions. *) +let update_cache_no_shards_assigned state ~delegate_id ~attestation_level = + let slots_by_delegate = get_slots_by_delegate state ~attestation_level in Delegate_id.Table.replace slots_by_delegate delegate_id Types.Not_in_committee (** [consume_stream state ~delegate_id stream_handler] consumes @@ -172,9 +181,8 @@ let rec consume_stream state ~delegate_id stream_handle = consume_stream state ~delegate_id stream_handle | Some (No_shards_assigned {attestation_level}) -> let* () = - let attested_level = Int32.succ attestation_level in Lwt_mutex.with_lock state.lock @@ fun () -> - update_cache_no_shards_assigned state ~delegate_id ~attested_level ; + update_cache_no_shards_assigned state ~delegate_id ~attestation_level ; return_unit in consume_stream state ~delegate_id stream_handle @@ -197,8 +205,7 @@ let update_cache_backfill_payload state ~delegate_id ~backfill_payload = slot_ids ; List.iter (fun attestation_level -> - let attested_level = Int32.succ attestation_level in - update_cache_no_shards_assigned state ~delegate_id ~attested_level) + update_cache_no_shards_assigned state ~delegate_id ~attestation_level) no_shards_attestation_levels ; Lwt.return_unit @@ -286,8 +293,8 @@ let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids = let prune_cache_before state ~prune_level = Lwt_mutex.with_lock state.lock @@ fun () -> Stdlib.Hashtbl.filter_map_inplace - (fun attested_level slots_by_delegate -> - if Int32.compare attested_level prune_level < 0 then None + (fun attestation_level slots_by_delegate -> + if Int32.compare attestation_level prune_level < 0 then None else Some slots_by_delegate) state.cache ; Lwt.return_unit @@ -295,16 +302,15 @@ let prune_cache_before state ~prune_level = let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level delegate_ids = let open Lwt_syntax in - let attested_level = Int32.succ attestation_level in let* () = update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids in let* dal_attestable_slots = let empty_slots = Stdlib.List.init state.number_of_slots (fun _ -> false) in Lwt_mutex.with_lock state.lock @@ fun () -> - match Stdlib.Hashtbl.find_opt state.cache attested_level with + match Stdlib.Hashtbl.find_opt state.cache attestation_level with | None -> - let* () = Events.(emit no_attestable_slot_at_level attested_level) in + let* () = Events.(emit no_attestable_slot_at_level attestation_level) in let published_level = - Int32.(sub attested_level (of_int state.attestation_lag)) + Int32.(succ @@ sub attestation_level (of_int state.attestation_lag)) in List.map_s (fun delegate_id -> @@ -318,13 +324,14 @@ let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level match Delegate_id.Table.find_opt slots_by_delegate delegate_id with | None -> let published_level = - Int32.(sub attested_level (of_int state.attestation_lag)) + Int32.( + succ @@ sub attestation_level (of_int state.attestation_lag)) in let* () = Events.( emit no_attestable_slot_at_level_for_delegate - (attested_level, delegate_id)) + (attestation_level, delegate_id)) in return ( delegate_id, diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli index ea7a313c7edc..63160024ff06 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -11,8 +11,8 @@ published level become fully available or when we know the delegate is not in a particular DAL committee. - Incoming events are folded into an in-memory cache keyed by attested level - (published_level + attestation_lag) and delegate id. For each (attested_level, + Incoming events are folded into an in-memory cache keyed by attestation level + (published_level + attestation_lag - 1) and delegate id. For each (attestation_level, delegate), the worker maintains a boolean bitset of attestable slots. This cache is filled continuously and independently of the baker’s main loop so that consensus code never waits on the network. @@ -50,8 +50,8 @@ val update_streams_subscriptions : t -> Tezos_rpc.Context.generic -> Delegate_id.t list -> unit Lwt.t (** [get_dal_attestable_slots t ctxt ~attestation_level delegate_ids] - returns, for each delegate, the current bitset for attested level - [~attestation_level] + 1 and its [published_level]. *) + returns, for each delegate in [delegate_ids], the current bitset for + [published_level] derived from [~attestation_level]. *) val get_dal_attestable_slots : t -> Tezos_rpc.Context.generic -> -- GitLab From 46b2405f39fb2f42e101ff6d09b3401312aa5946 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 11 Nov 2025 10:40:22 +0000 Subject: [PATCH 17/19] Baker: Make may_get_dal_content wait free --- .../lib_delegate/baking_actions.ml | 22 ++++-- .../lib_delegate/baking_lib.ml | 34 +++++---- .../lib_delegate/baking_scheduling.ml | 1 + .../lib_delegate/baking_scheduling.mli | 11 +++ .../dal_attestable_slots_worker.ml | 68 +++++++++--------- .../dal_attestable_slots_worker.mli | 11 +-- .../lib_delegate/baking_actions.ml | 22 ++++-- .../lib_delegate/baking_lib.ml | 34 +++++---- .../lib_delegate/baking_scheduling.ml | 7 +- .../lib_delegate/baking_scheduling.mli | 11 +++ .../dal_attestable_slots_worker.ml | 70 ++++++++++--------- .../dal_attestable_slots_worker.mli | 11 +-- .../lib_delegate/baking_actions.ml | 22 ++++-- src/proto_alpha/lib_delegate/baking_lib.ml | 34 +++++---- .../lib_delegate/baking_scheduling.ml | 7 +- .../lib_delegate/baking_scheduling.mli | 11 +++ .../dal_attestable_slots_worker.ml | 68 +++++++++--------- .../dal_attestable_slots_worker.mli | 11 +-- 18 files changed, 294 insertions(+), 161 deletions(-) diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_actions.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_actions.ml index 3fd9b11ae213..a01e825e460e 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_actions.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_actions.ml @@ -548,10 +548,9 @@ let may_get_dal_content state consensus_vote = only_if_dal_feature_enabled state ~default_value:[] - (fun dal_node_rpc_ctxt -> + (fun _dal_node_rpc_ctxt -> Dal_attestable_slots_worker.get_dal_attestable_slots state.global_state.dal_attestable_slots_worker - dal_node_rpc_ctxt ~attestation_level:level [delegate_id]) in @@ -1105,14 +1104,29 @@ let update_to_level state level_update = new_level_proposal round_durations [@profiler.record_f {verbosity = Debug} "compute round"]) in - let*! new_state = + let*! new_state, new_action = (compute_new_state ~current_round ~delegate_slots ~next_level_delegate_slots [@profiler.record_s {verbosity = Debug} "compute new state"]) in - return new_state + (* Proactively ensure subscriptions of DAL streams for all our delegates *) + let*! () = + only_if_dal_feature_enabled + new_state + ~default_value:() + (fun dal_node_rpc_ctxt -> + let next_level_delegate_ids = + Baking_state.Delegate_slots.own_delegate_ids next_level_delegate_slots + in + Dal_attestable_slots_worker.update_streams_subscriptions + state.global_state.dal_attestable_slots_worker + dal_node_rpc_ctxt + next_level_delegate_ids + ~await_backfill:false) + in + return (new_state, new_action) let synchronize_round state {new_round_proposal; handle_proposal} = let open Lwt_result_syntax in diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml index 2a5da976294e..323b48848503 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml @@ -75,18 +75,28 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool ~attestation_lag:constants.parametric.dal.attestation_lag ~number_of_slots:constants.parametric.dal.number_of_slots in - Baking_scheduling.create_initial_state - cctxt - ?dal_node_rpc_ctxt - ?synchronize - ~chain - config - operation_worker - round_durations - dal_attestable_slots_worker - ~current_proposal - ~constants - delegates + let* state = + Baking_scheduling.create_initial_state + cctxt + ?dal_node_rpc_ctxt + ?synchronize + ~chain + config + operation_worker + round_durations + dal_attestable_slots_worker + ~current_proposal + ~constants + delegates + in + let* () = + Baking_scheduling.register_dal_profiles + cctxt + state.global_state.dal_node_rpc_ctxt + dal_attestable_slots_worker + delegates + in + return state let get_current_proposal cctxt ?cache () = let open Lwt_result_syntax in diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml index cf96e4684451..7d818ac90a33 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml @@ -955,6 +955,7 @@ let register_dal_profiles cctxt dal_node_rpc_ctxt dal_attestable_slots_worker dal_attestable_slots_worker dal_ctxt delegate_ids + ~await_backfill:true in return_unit in diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli index e3fc81c04a69..465956d5a594 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli @@ -51,6 +51,17 @@ val retry : 'a -> 'b tzresult Lwt.t +(** [register_dal_profiles cctxt dal_node_rpc_ctxt dal_attestable_slots_worker + delegates] registers the baker’s [delegates] as DAL attesters and warms up the + attestable-slots cache from [dal_attestable_slots_worker]. The DAL RPCs are retried + until they succeed. *) +val register_dal_profiles : + #Protocol_client_context.full -> + Tezos_rpc.Context.generic option -> + Dal_attestable_slots_worker.t -> + Baking_state_types.Key.t list -> + unit tzresult Lwt.t + (** [run context ?canceler ?stop_on_event ?on_error ?constants chain baking_configuration consensus_keys] is the entry point of the baker automaton. This function performs the following tasks: diff --git a/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.ml index 64d4b5a8de86..eb5c46cb41f2 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.ml @@ -160,6 +160,20 @@ let update_cache_no_shards_assigned state ~delegate_id ~attestation_level = let slots_by_delegate = get_slots_by_delegate state ~attestation_level in Delegate_id.Table.replace slots_by_delegate delegate_id Types.Not_in_committee +let update_cache_backfill_payload state ~delegate_id ~backfill_payload = + let module E = Types.Attestable_event in + let E.{slot_ids; no_shards_attestation_levels} = backfill_payload in + Lwt_mutex.with_lock state.lock @@ fun () -> + List.iter + (fun slot_id -> + update_cache_with_attestable_slot state ~delegate_id ~slot_id) + slot_ids ; + List.iter + (fun attestation_level -> + update_cache_no_shards_assigned state ~delegate_id ~attestation_level) + no_shards_attestation_levels ; + Lwt.return_unit + (** [consume_stream state ~delegate_id stream_handler] consumes [~delegate_id]'s stream forever, updating the cache accordingly . *) let rec consume_stream state ~delegate_id stream_handle = @@ -189,25 +203,12 @@ let rec consume_stream state ~delegate_id stream_handle = | Some (Slot_has_trap _slot_id) -> (* In case of a trap, we do not need to update the cache with any information. *) consume_stream state ~delegate_id stream_handle - | Some (Backfill _backfill_payload) -> - (* This case should never be reachable, since we only backfill once, in a synchronous call. *) - Lwt.fail_with - "Backfill should never be done in consume_streams, as that is an \ - asynchronous function." - -let update_cache_backfill_payload state ~delegate_id ~backfill_payload = - let module E = Types.Attestable_event in - let E.{slot_ids; no_shards_attestation_levels} = backfill_payload in - Lwt_mutex.with_lock state.lock @@ fun () -> - List.iter - (fun slot_id -> - update_cache_with_attestable_slot state ~delegate_id ~slot_id) - slot_ids ; - List.iter - (fun attestation_level -> - update_cache_no_shards_assigned state ~delegate_id ~attestation_level) - no_shards_attestation_levels ; - Lwt.return_unit + | Some (Backfill {backfill_payload}) -> + (* This case is rarely reached, one instance being at migration time. *) + let* () = + update_cache_backfill_payload state ~delegate_id ~backfill_payload + in + consume_stream state ~delegate_id stream_handle (** [consume_backfill_stream state ~delegate_id stream_handle] consumes the initial [Backfill] event from a freshly opened DAL monitoring stream. This function is meant to be called @@ -233,7 +234,8 @@ let consume_backfill_stream state ~delegate_id stream_handle = Lwt.fail_with "The stream must always start properly with a Backfill event." -let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = +let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add + ~await_backfill = let open Lwt_syntax in let* new_streams = List.fold_left_s @@ -255,10 +257,12 @@ let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = return_unit in let* () = - List.iter_p - (fun (delegate_id, stream_handle) -> - consume_backfill_stream state ~delegate_id stream_handle) - new_streams + if await_backfill then + List.iter_p + (fun (delegate_id, stream_handle) -> + consume_backfill_stream state ~delegate_id stream_handle) + new_streams + else return_unit in List.iter (fun (delegate_id, stream_handle) -> @@ -272,10 +276,8 @@ let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = new_streams ; return_unit -(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids] reconciles - the active set of streams using [delegate_ids] by computing the list of streams - to subscribe to. *) -let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids = +let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids + ~await_backfill = let open Lwt_syntax in let* delegate_ids_to_add = Lwt_mutex.with_lock state.lock @@ fun () -> @@ -285,7 +287,11 @@ let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids = in return DelegateSet.(elements (diff new_delegate_ids current_delegate_ids)) in - subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add + subscribe_to_new_streams + state + dal_node_rpc_ctxt + ~delegate_ids_to_add + ~await_backfill (** [prune_cache_before state ~prune_level] prunes all cache entries before [~prune_level]. This is called by the public getter once per level, keeping memory @@ -299,10 +305,8 @@ let prune_cache_before state ~prune_level = state.cache ; Lwt.return_unit -let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level - delegate_ids = +let get_dal_attestable_slots state ~attestation_level delegate_ids = let open Lwt_syntax in - let* () = update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids in let* dal_attestable_slots = let empty_slots = Stdlib.List.init state.number_of_slots (fun _ -> false) in Lwt_mutex.with_lock state.lock @@ fun () -> diff --git a/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.mli index 63160024ff06..8eb64fabe2df 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/dal_attestable_slots_worker.mli @@ -43,18 +43,21 @@ type t (** {1 Accessors} *) -(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids] reconciles - the active set of streams using [delegate_ids] by computing the list of streams +(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids ~await_backfill] + reconciles the active set of streams using [delegate_ids] by computing the list of streams to subscribe to. *) val update_streams_subscriptions : - t -> Tezos_rpc.Context.generic -> Delegate_id.t list -> unit Lwt.t + t -> + Tezos_rpc.Context.generic -> + Delegate_id.t list -> + await_backfill:bool -> + unit Lwt.t (** [get_dal_attestable_slots t ctxt ~attestation_level delegate_ids] returns, for each delegate in [delegate_ids], the current bitset for [published_level] derived from [~attestation_level]. *) val get_dal_attestable_slots : t -> - Tezos_rpc.Context.generic -> attestation_level:int32 -> Delegate_id.t list -> dal_attestable_slots Lwt.t diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_actions.ml b/src/proto_024_PsD5wVTJ/lib_delegate/baking_actions.ml index 0860f9122eb7..b9e3d0aa82a9 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_actions.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_actions.ml @@ -540,10 +540,9 @@ let may_get_dal_content state consensus_vote = only_if_dal_feature_enabled state ~default_value:[] - (fun dal_node_rpc_ctxt -> + (fun _dal_node_rpc_ctxt -> Dal_attestable_slots_worker.get_dal_attestable_slots state.global_state.dal_attestable_slots_worker - dal_node_rpc_ctxt ~attestation_level:level [delegate_id]) in @@ -1104,14 +1103,29 @@ let update_to_level state level_update = new_level_proposal round_durations [@profiler.record_f {verbosity = Debug} "compute round"]) in - let*! new_state = + let*! new_state, new_action = (compute_new_state ~current_round ~delegate_infos ~next_level_delegate_infos [@profiler.record_s {verbosity = Debug} "compute new state"]) in - return new_state + (* Proactively ensure subscriptions of DAL streams for all our delegates *) + let*! () = + only_if_dal_feature_enabled + new_state + ~default_value:() + (fun dal_node_rpc_ctxt -> + let next_level_delegate_ids = + Baking_state.Delegate_infos.own_delegate_ids next_level_delegate_infos + in + Dal_attestable_slots_worker.update_streams_subscriptions + state.global_state.dal_attestable_slots_worker + dal_node_rpc_ctxt + next_level_delegate_ids + ~await_backfill:false) + in + return (new_state, new_action) let synchronize_round state {new_round_proposal; handle_proposal} = let open Lwt_result_syntax in diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml b/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml index 56c77c164b3c..0650c3c76543 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml @@ -71,18 +71,28 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool ~attestation_lag:constants.parametric.dal.attestation_lag ~number_of_slots:constants.parametric.dal.number_of_slots in - Baking_scheduling.create_initial_state - cctxt - ?dal_node_rpc_ctxt - ?synchronize - ~chain - config - operation_worker - round_durations - dal_attestable_slots_worker - ~current_proposal - ~constants - delegates + let* state = + Baking_scheduling.create_initial_state + cctxt + ?dal_node_rpc_ctxt + ?synchronize + ~chain + config + operation_worker + round_durations + dal_attestable_slots_worker + ~current_proposal + ~constants + delegates + in + let* () = + Baking_scheduling.register_dal_profiles + cctxt + state.global_state.dal_node_rpc_ctxt + dal_attestable_slots_worker + delegates + in + return state let get_current_proposal cctxt ?cache () = let open Lwt_result_syntax in diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml index 6ba8cbd3da6e..0410757e5950 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml @@ -909,7 +909,10 @@ let try_resolve_consensus_keys cctxt key = ~consensus_keys:[pkh] in match attesting_rights with - | Error _ | Ok [Plugin.RPC.Validators.{delegates = []; _}] -> + | Error _ + | Ok [Plugin.RPC.Validators.{delegates = []; _}] + | Ok (_ :: _ :: _) + | Ok [] -> try_find_delegate_key (head_offset - 1) | Ok Plugin.RPC.Validators. @@ -930,7 +933,6 @@ let try_resolve_consensus_keys cctxt key = ] -> (* The primary registered key as delegate found. Return it. *) return delegate - | Ok (_ :: _ :: _) | Ok [] -> assert false (* we query only one level, the returned list length must be 1 *) in try_find_delegate_key levels_to_inspect @@ -963,6 +965,7 @@ let register_dal_profiles cctxt dal_node_rpc_ctxt dal_attestable_slots_worker dal_attestable_slots_worker dal_ctxt delegate_ids + ~await_backfill:true in return_unit in diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli index e3fc81c04a69..465956d5a594 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli @@ -51,6 +51,17 @@ val retry : 'a -> 'b tzresult Lwt.t +(** [register_dal_profiles cctxt dal_node_rpc_ctxt dal_attestable_slots_worker + delegates] registers the baker’s [delegates] as DAL attesters and warms up the + attestable-slots cache from [dal_attestable_slots_worker]. The DAL RPCs are retried + until they succeed. *) +val register_dal_profiles : + #Protocol_client_context.full -> + Tezos_rpc.Context.generic option -> + Dal_attestable_slots_worker.t -> + Baking_state_types.Key.t list -> + unit tzresult Lwt.t + (** [run context ?canceler ?stop_on_event ?on_error ?constants chain baking_configuration consensus_keys] is the entry point of the baker automaton. This function performs the following tasks: diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.ml index ef0b3e4ef5b7..eb5c46cb41f2 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.ml @@ -75,7 +75,7 @@ type dal_attestable_slots = (Delegate_id.t * Types.attestable_slots) list module DelegateSet = Set.Make (Delegate_id) -(** A handle to a single delegate’s DAL moniTypes.Attestable_slots_watcher_table.toring subscription. *) +(** A handle to a single delegate’s DAL monitoring subscription. *) type stream_handle = { stream : Types.Attestable_event.t Lwt_stream.t; stopper : Tezos_rpc.Context.stopper; @@ -160,6 +160,20 @@ let update_cache_no_shards_assigned state ~delegate_id ~attestation_level = let slots_by_delegate = get_slots_by_delegate state ~attestation_level in Delegate_id.Table.replace slots_by_delegate delegate_id Types.Not_in_committee +let update_cache_backfill_payload state ~delegate_id ~backfill_payload = + let module E = Types.Attestable_event in + let E.{slot_ids; no_shards_attestation_levels} = backfill_payload in + Lwt_mutex.with_lock state.lock @@ fun () -> + List.iter + (fun slot_id -> + update_cache_with_attestable_slot state ~delegate_id ~slot_id) + slot_ids ; + List.iter + (fun attestation_level -> + update_cache_no_shards_assigned state ~delegate_id ~attestation_level) + no_shards_attestation_levels ; + Lwt.return_unit + (** [consume_stream state ~delegate_id stream_handler] consumes [~delegate_id]'s stream forever, updating the cache accordingly . *) let rec consume_stream state ~delegate_id stream_handle = @@ -189,25 +203,12 @@ let rec consume_stream state ~delegate_id stream_handle = | Some (Slot_has_trap _slot_id) -> (* In case of a trap, we do not need to update the cache with any information. *) consume_stream state ~delegate_id stream_handle - | Some (Backfill _backfill_payload) -> - (* This case should never be reachable, since we only backfill once, in a synchronous call. *) - Lwt.fail_with - "Backfill should never be done in consume_streams, as that is an \ - asynchronous function." - -let update_cache_backfill_payload state ~delegate_id ~backfill_payload = - let module E = Types.Attestable_event in - let E.{slot_ids; no_shards_attestation_levels} = backfill_payload in - Lwt_mutex.with_lock state.lock @@ fun () -> - List.iter - (fun slot_id -> - update_cache_with_attestable_slot state ~delegate_id ~slot_id) - slot_ids ; - List.iter - (fun attestation_level -> - update_cache_no_shards_assigned state ~delegate_id ~attestation_level) - no_shards_attestation_levels ; - Lwt.return_unit + | Some (Backfill {backfill_payload}) -> + (* This case is rarely reached, one instance being at migration time. *) + let* () = + update_cache_backfill_payload state ~delegate_id ~backfill_payload + in + consume_stream state ~delegate_id stream_handle (** [consume_backfill_stream state ~delegate_id stream_handle] consumes the initial [Backfill] event from a freshly opened DAL monitoring stream. This function is meant to be called @@ -233,7 +234,8 @@ let consume_backfill_stream state ~delegate_id stream_handle = Lwt.fail_with "The stream must always start properly with a Backfill event." -let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = +let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add + ~await_backfill = let open Lwt_syntax in let* new_streams = List.fold_left_s @@ -255,10 +257,12 @@ let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = return_unit in let* () = - List.iter_p - (fun (delegate_id, stream_handle) -> - consume_backfill_stream state ~delegate_id stream_handle) - new_streams + if await_backfill then + List.iter_p + (fun (delegate_id, stream_handle) -> + consume_backfill_stream state ~delegate_id stream_handle) + new_streams + else return_unit in List.iter (fun (delegate_id, stream_handle) -> @@ -272,10 +276,8 @@ let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = new_streams ; return_unit -(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids] reconciles - the active set of streams using [delegate_ids] by computing the list of streams - to subscribe to. *) -let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids = +let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids + ~await_backfill = let open Lwt_syntax in let* delegate_ids_to_add = Lwt_mutex.with_lock state.lock @@ fun () -> @@ -285,7 +287,11 @@ let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids = in return DelegateSet.(elements (diff new_delegate_ids current_delegate_ids)) in - subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add + subscribe_to_new_streams + state + dal_node_rpc_ctxt + ~delegate_ids_to_add + ~await_backfill (** [prune_cache_before state ~prune_level] prunes all cache entries before [~prune_level]. This is called by the public getter once per level, keeping memory @@ -299,10 +305,8 @@ let prune_cache_before state ~prune_level = state.cache ; Lwt.return_unit -let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level - delegate_ids = +let get_dal_attestable_slots state ~attestation_level delegate_ids = let open Lwt_syntax in - let* () = update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids in let* dal_attestable_slots = let empty_slots = Stdlib.List.init state.number_of_slots (fun _ -> false) in Lwt_mutex.with_lock state.lock @@ fun () -> diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.mli index 63160024ff06..8eb64fabe2df 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_024_PsD5wVTJ/lib_delegate/dal_attestable_slots_worker.mli @@ -43,18 +43,21 @@ type t (** {1 Accessors} *) -(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids] reconciles - the active set of streams using [delegate_ids] by computing the list of streams +(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids ~await_backfill] + reconciles the active set of streams using [delegate_ids] by computing the list of streams to subscribe to. *) val update_streams_subscriptions : - t -> Tezos_rpc.Context.generic -> Delegate_id.t list -> unit Lwt.t + t -> + Tezos_rpc.Context.generic -> + Delegate_id.t list -> + await_backfill:bool -> + unit Lwt.t (** [get_dal_attestable_slots t ctxt ~attestation_level delegate_ids] returns, for each delegate in [delegate_ids], the current bitset for [published_level] derived from [~attestation_level]. *) val get_dal_attestable_slots : t -> - Tezos_rpc.Context.generic -> attestation_level:int32 -> Delegate_id.t list -> dal_attestable_slots Lwt.t diff --git a/src/proto_alpha/lib_delegate/baking_actions.ml b/src/proto_alpha/lib_delegate/baking_actions.ml index 0860f9122eb7..b9e3d0aa82a9 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.ml +++ b/src/proto_alpha/lib_delegate/baking_actions.ml @@ -540,10 +540,9 @@ let may_get_dal_content state consensus_vote = only_if_dal_feature_enabled state ~default_value:[] - (fun dal_node_rpc_ctxt -> + (fun _dal_node_rpc_ctxt -> Dal_attestable_slots_worker.get_dal_attestable_slots state.global_state.dal_attestable_slots_worker - dal_node_rpc_ctxt ~attestation_level:level [delegate_id]) in @@ -1104,14 +1103,29 @@ let update_to_level state level_update = new_level_proposal round_durations [@profiler.record_f {verbosity = Debug} "compute round"]) in - let*! new_state = + let*! new_state, new_action = (compute_new_state ~current_round ~delegate_infos ~next_level_delegate_infos [@profiler.record_s {verbosity = Debug} "compute new state"]) in - return new_state + (* Proactively ensure subscriptions of DAL streams for all our delegates *) + let*! () = + only_if_dal_feature_enabled + new_state + ~default_value:() + (fun dal_node_rpc_ctxt -> + let next_level_delegate_ids = + Baking_state.Delegate_infos.own_delegate_ids next_level_delegate_infos + in + Dal_attestable_slots_worker.update_streams_subscriptions + state.global_state.dal_attestable_slots_worker + dal_node_rpc_ctxt + next_level_delegate_ids + ~await_backfill:false) + in + return (new_state, new_action) let synchronize_round state {new_round_proposal; handle_proposal} = let open Lwt_result_syntax in diff --git a/src/proto_alpha/lib_delegate/baking_lib.ml b/src/proto_alpha/lib_delegate/baking_lib.ml index 56c77c164b3c..0650c3c76543 100644 --- a/src/proto_alpha/lib_delegate/baking_lib.ml +++ b/src/proto_alpha/lib_delegate/baking_lib.ml @@ -71,18 +71,28 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool ~attestation_lag:constants.parametric.dal.attestation_lag ~number_of_slots:constants.parametric.dal.number_of_slots in - Baking_scheduling.create_initial_state - cctxt - ?dal_node_rpc_ctxt - ?synchronize - ~chain - config - operation_worker - round_durations - dal_attestable_slots_worker - ~current_proposal - ~constants - delegates + let* state = + Baking_scheduling.create_initial_state + cctxt + ?dal_node_rpc_ctxt + ?synchronize + ~chain + config + operation_worker + round_durations + dal_attestable_slots_worker + ~current_proposal + ~constants + delegates + in + let* () = + Baking_scheduling.register_dal_profiles + cctxt + state.global_state.dal_node_rpc_ctxt + dal_attestable_slots_worker + delegates + in + return state let get_current_proposal cctxt ?cache () = let open Lwt_result_syntax in diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index 6ba8cbd3da6e..0410757e5950 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -909,7 +909,10 @@ let try_resolve_consensus_keys cctxt key = ~consensus_keys:[pkh] in match attesting_rights with - | Error _ | Ok [Plugin.RPC.Validators.{delegates = []; _}] -> + | Error _ + | Ok [Plugin.RPC.Validators.{delegates = []; _}] + | Ok (_ :: _ :: _) + | Ok [] -> try_find_delegate_key (head_offset - 1) | Ok Plugin.RPC.Validators. @@ -930,7 +933,6 @@ let try_resolve_consensus_keys cctxt key = ] -> (* The primary registered key as delegate found. Return it. *) return delegate - | Ok (_ :: _ :: _) | Ok [] -> assert false (* we query only one level, the returned list length must be 1 *) in try_find_delegate_key levels_to_inspect @@ -963,6 +965,7 @@ let register_dal_profiles cctxt dal_node_rpc_ctxt dal_attestable_slots_worker dal_attestable_slots_worker dal_ctxt delegate_ids + ~await_backfill:true in return_unit in diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.mli b/src/proto_alpha/lib_delegate/baking_scheduling.mli index e3fc81c04a69..465956d5a594 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.mli +++ b/src/proto_alpha/lib_delegate/baking_scheduling.mli @@ -51,6 +51,17 @@ val retry : 'a -> 'b tzresult Lwt.t +(** [register_dal_profiles cctxt dal_node_rpc_ctxt dal_attestable_slots_worker + delegates] registers the baker’s [delegates] as DAL attesters and warms up the + attestable-slots cache from [dal_attestable_slots_worker]. The DAL RPCs are retried + until they succeed. *) +val register_dal_profiles : + #Protocol_client_context.full -> + Tezos_rpc.Context.generic option -> + Dal_attestable_slots_worker.t -> + Baking_state_types.Key.t list -> + unit tzresult Lwt.t + (** [run context ?canceler ?stop_on_event ?on_error ?constants chain baking_configuration consensus_keys] is the entry point of the baker automaton. This function performs the following tasks: diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index 64d4b5a8de86..eb5c46cb41f2 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -160,6 +160,20 @@ let update_cache_no_shards_assigned state ~delegate_id ~attestation_level = let slots_by_delegate = get_slots_by_delegate state ~attestation_level in Delegate_id.Table.replace slots_by_delegate delegate_id Types.Not_in_committee +let update_cache_backfill_payload state ~delegate_id ~backfill_payload = + let module E = Types.Attestable_event in + let E.{slot_ids; no_shards_attestation_levels} = backfill_payload in + Lwt_mutex.with_lock state.lock @@ fun () -> + List.iter + (fun slot_id -> + update_cache_with_attestable_slot state ~delegate_id ~slot_id) + slot_ids ; + List.iter + (fun attestation_level -> + update_cache_no_shards_assigned state ~delegate_id ~attestation_level) + no_shards_attestation_levels ; + Lwt.return_unit + (** [consume_stream state ~delegate_id stream_handler] consumes [~delegate_id]'s stream forever, updating the cache accordingly . *) let rec consume_stream state ~delegate_id stream_handle = @@ -189,25 +203,12 @@ let rec consume_stream state ~delegate_id stream_handle = | Some (Slot_has_trap _slot_id) -> (* In case of a trap, we do not need to update the cache with any information. *) consume_stream state ~delegate_id stream_handle - | Some (Backfill _backfill_payload) -> - (* This case should never be reachable, since we only backfill once, in a synchronous call. *) - Lwt.fail_with - "Backfill should never be done in consume_streams, as that is an \ - asynchronous function." - -let update_cache_backfill_payload state ~delegate_id ~backfill_payload = - let module E = Types.Attestable_event in - let E.{slot_ids; no_shards_attestation_levels} = backfill_payload in - Lwt_mutex.with_lock state.lock @@ fun () -> - List.iter - (fun slot_id -> - update_cache_with_attestable_slot state ~delegate_id ~slot_id) - slot_ids ; - List.iter - (fun attestation_level -> - update_cache_no_shards_assigned state ~delegate_id ~attestation_level) - no_shards_attestation_levels ; - Lwt.return_unit + | Some (Backfill {backfill_payload}) -> + (* This case is rarely reached, one instance being at migration time. *) + let* () = + update_cache_backfill_payload state ~delegate_id ~backfill_payload + in + consume_stream state ~delegate_id stream_handle (** [consume_backfill_stream state ~delegate_id stream_handle] consumes the initial [Backfill] event from a freshly opened DAL monitoring stream. This function is meant to be called @@ -233,7 +234,8 @@ let consume_backfill_stream state ~delegate_id stream_handle = Lwt.fail_with "The stream must always start properly with a Backfill event." -let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = +let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add + ~await_backfill = let open Lwt_syntax in let* new_streams = List.fold_left_s @@ -255,10 +257,12 @@ let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = return_unit in let* () = - List.iter_p - (fun (delegate_id, stream_handle) -> - consume_backfill_stream state ~delegate_id stream_handle) - new_streams + if await_backfill then + List.iter_p + (fun (delegate_id, stream_handle) -> + consume_backfill_stream state ~delegate_id stream_handle) + new_streams + else return_unit in List.iter (fun (delegate_id, stream_handle) -> @@ -272,10 +276,8 @@ let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = new_streams ; return_unit -(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids] reconciles - the active set of streams using [delegate_ids] by computing the list of streams - to subscribe to. *) -let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids = +let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids + ~await_backfill = let open Lwt_syntax in let* delegate_ids_to_add = Lwt_mutex.with_lock state.lock @@ fun () -> @@ -285,7 +287,11 @@ let update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids = in return DelegateSet.(elements (diff new_delegate_ids current_delegate_ids)) in - subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add + subscribe_to_new_streams + state + dal_node_rpc_ctxt + ~delegate_ids_to_add + ~await_backfill (** [prune_cache_before state ~prune_level] prunes all cache entries before [~prune_level]. This is called by the public getter once per level, keeping memory @@ -299,10 +305,8 @@ let prune_cache_before state ~prune_level = state.cache ; Lwt.return_unit -let get_dal_attestable_slots state dal_node_rpc_ctxt ~attestation_level - delegate_ids = +let get_dal_attestable_slots state ~attestation_level delegate_ids = let open Lwt_syntax in - let* () = update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids in let* dal_attestable_slots = let empty_slots = Stdlib.List.init state.number_of_slots (fun _ -> false) in Lwt_mutex.with_lock state.lock @@ fun () -> diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli index 63160024ff06..8eb64fabe2df 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -43,18 +43,21 @@ type t (** {1 Accessors} *) -(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids] reconciles - the active set of streams using [delegate_ids] by computing the list of streams +(** [update_streams_subscriptions state dal_node_rpc_ctxt delegate_ids ~await_backfill] + reconciles the active set of streams using [delegate_ids] by computing the list of streams to subscribe to. *) val update_streams_subscriptions : - t -> Tezos_rpc.Context.generic -> Delegate_id.t list -> unit Lwt.t + t -> + Tezos_rpc.Context.generic -> + Delegate_id.t list -> + await_backfill:bool -> + unit Lwt.t (** [get_dal_attestable_slots t ctxt ~attestation_level delegate_ids] returns, for each delegate in [delegate_ids], the current bitset for [published_level] derived from [~attestation_level]. *) val get_dal_attestable_slots : t -> - Tezos_rpc.Context.generic -> attestation_level:int32 -> Delegate_id.t list -> dal_attestable_slots Lwt.t -- GitLab From e370c549b638a0dc262bdd77cacb6ec2f2951026 Mon Sep 17 00:00:00 2001 From: Eugen Zalinescu Date: Wed, 12 Nov 2025 09:40:49 +0100 Subject: [PATCH 18/19] DAL/Tests: start waiting for event sooner in 'new attester attests' test --- tezt/tests/dal.ml | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index 8bed84044753..7506bca3e762 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -9350,12 +9350,10 @@ let test_new_attester_attests protocol dal_parameters _cryptobox node client "first_level_in_committee = %d; published_level = %d" first_level_in_committee published_level ; - Log.info "Bake blocks up to level %d" (published_level - 1) ; - let* () = bake_for ~count:(published_level - 1 - level) client in - let* id_attester = peer_id attester in - let* id_producer = peer_id producer in let check_graft_promises = + let* id_attester = peer_id attester in + let* id_producer = peer_id producer in Lwt.pick @@ check_grafts ~number_of_slots @@ -9364,14 +9362,18 @@ let test_new_attester_attests protocol dal_parameters _cryptobox node client (producer, id_producer) new_account.public_key_hash in - let* assigned_shard_indexes = - Dal_RPC.( - call attester - @@ get_assigned_shard_indices - ~level:first_level_in_committee - ~pkh:new_account.public_key_hash) - in + + Log.info "Bake blocks up to level %d" (published_level - 1) ; + let* () = bake_for ~count:(published_level - 1 - level) client in + let wait_for_shards_promises = + let* assigned_shard_indexes = + Dal_RPC.( + call attester + @@ get_assigned_shard_indices + ~level:first_level_in_committee + ~pkh:new_account.public_key_hash) + in wait_for_shards_promises ~dal_node:attester ~storage_profile:`Cache_only -- GitLab From 34a021a2823e9ea2531f299a370914aea70dc269 Mon Sep 17 00:00:00 2001 From: Eugen Zalinescu Date: Wed, 12 Nov 2025 10:19:53 +0100 Subject: [PATCH 19/19] DAL/Tests: wait for injection event in rolluo slot injection test --- tezt/tests/dal.ml | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index 7506bca3e762..52e4b51adf34 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -9001,24 +9001,19 @@ let rollup_node_injects_dal_slots _protocol parameters dal_node sc_node Sc_rollup_node.RPC.call sc_node @@ Sc_rollup_rpc.post_dal_slot_indices ~slot_indices:[0] in + let wait_injected = + Node.wait_for node "operation_injected.v0" (fun _ -> Some ()) + in let* () = Sc_rollup_node.RPC.call sc_node @@ Sc_rollup_rpc.post_local_dal_batcher_injection ~messages:["Hello DAL from a Smart Rollup"] in - (* We need to bake once to get the commitment injected and once more to have it - included in a block. *) - let* () = - repeat 2 (fun () -> - let* () = bake_for client in - let* level = Client.level client in - let* _level = - Sc_rollup_node.wait_for_level ~timeout:10. sc_node level - in - unit) - in + let* () = wait_injected in + (* We need to bake once to have the commitment included in a block, and then + [attestation_lag} to have it attested. *) let* () = - repeat parameters.Dal.Parameters.attestation_lag (fun () -> + repeat (1 + parameters.Dal.Parameters.attestation_lag) (fun () -> let* () = bake_for client in let* level = Client.level client in let* _level = -- GitLab