From c597017433f6cd25cccafe4d2325642b26a47f14 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 18 Nov 2025 13:59:10 +0000 Subject: [PATCH 1/8] Alpha/Baker: Node_rpc: Change dal_attestable_slots signature Simplifies the RPC logic and is more natural way to call the RPC. --- src/proto_alpha/lib_delegate/baking_actions.ml | 4 ++-- src/proto_alpha/lib_delegate/baking_scheduling.ml | 4 ++-- src/proto_alpha/lib_delegate/baking_state.ml | 6 ++++++ src/proto_alpha/lib_delegate/baking_state.mli | 3 +++ src/proto_alpha/lib_delegate/node_rpc.ml | 9 ++------- src/proto_alpha/lib_delegate/node_rpc.mli | 6 +++--- 6 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/proto_alpha/lib_delegate/baking_actions.ml b/src/proto_alpha/lib_delegate/baking_actions.ml index 702acfa83285..72d8b0009800 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.ml +++ b/src/proto_alpha/lib_delegate/baking_actions.ml @@ -1138,13 +1138,13 @@ let update_to_level state level_update = Node_rpc.dal_attestable_slots dal_node_rpc_ctxt ~attestation_level:new_level - (Delegate_infos.own_delegates delegate_infos) + (Delegate_infos.own_delegate_ids delegate_infos) in let next_level_dal_attestable_slots = Node_rpc.dal_attestable_slots dal_node_rpc_ctxt ~attestation_level:(Int32.succ new_level) - (Delegate_infos.own_delegates next_level_delegate_infos) + (Delegate_infos.own_delegate_ids next_level_delegate_infos) in Lwt.return (dal_attestable_slots, next_level_dal_attestable_slots)) in diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index 2e740609c2e9..79711e76ad70 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -700,7 +700,7 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain Node_rpc.dal_attestable_slots dal_node_rpc_ctxt ~attestation_level:current_level - (Delegate_infos.own_delegates delegate_infos)) + (Delegate_infos.own_delegate_ids delegate_infos)) dal_node_rpc_ctxt in let next_level_dal_attestable_slots = @@ -710,7 +710,7 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain Node_rpc.dal_attestable_slots dal_node_rpc_ctxt ~attestation_level:(Int32.succ current_level) - (Delegate_infos.own_delegates next_level_delegate_infos)) + (Delegate_infos.own_delegate_ids next_level_delegate_infos)) dal_node_rpc_ctxt in let level_state = diff --git a/src/proto_alpha/lib_delegate/baking_state.ml b/src/proto_alpha/lib_delegate/baking_state.ml index 8853d5c2a969..e79478c7e6b7 100644 --- a/src/proto_alpha/lib_delegate/baking_state.ml +++ b/src/proto_alpha/lib_delegate/baking_state.ml @@ -133,6 +133,12 @@ module Delegate_infos = struct let own_delegates t = t.own_delegates + let own_delegate_ids t = + List.map + (fun delegate_info -> + Baking_state_types.Delegate.delegate_id delegate_info.delegate) + t.own_delegates + let own_round_owner t ~committee_size ~round = let open Result_syntax in let* round_int = Round.to_int round |> Environment.wrap_tzresult in diff --git a/src/proto_alpha/lib_delegate/baking_state.mli b/src/proto_alpha/lib_delegate/baking_state.mli index db8899d6ce76..3ea4c80cca20 100644 --- a/src/proto_alpha/lib_delegate/baking_state.mli +++ b/src/proto_alpha/lib_delegate/baking_state.mli @@ -37,6 +37,9 @@ module Delegate_infos : sig no duplicates, the associated slot is the first one. *) val own_delegates : t -> delegate_info list + (** Returns the list of the delegate ids from [own_delegates]. *) + val own_delegate_ids : t -> Delegate_id.t list + (** Returns, among our *own* delegates, the delegate (together with its first attesting slot) that owns the given round, if any. *) val own_round_owner : diff --git a/src/proto_alpha/lib_delegate/node_rpc.ml b/src/proto_alpha/lib_delegate/node_rpc.ml index 1172824ee05f..51116626f03f 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.ml +++ b/src/proto_alpha/lib_delegate/node_rpc.ml @@ -476,16 +476,11 @@ let get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level = () let dal_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) - ~attestation_level delegate_infos = + ~attestation_level = let attested_level = Int32.succ attestation_level in - List.map - (fun (delegate_info : delegate_info) -> - let delegate_id = - Baking_state_types.Delegate.delegate_id delegate_info.delegate - in + List.map (fun delegate_id -> ( delegate_id, get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level )) - delegate_infos let get_dal_profiles dal_node_rpc_ctxt = Tezos_rpc.Context.make_call diff --git a/src/proto_alpha/lib_delegate/node_rpc.mli b/src/proto_alpha/lib_delegate/node_rpc.mli index 505959ad9817..46b325c9ad78 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.mli +++ b/src/proto_alpha/lib_delegate/node_rpc.mli @@ -149,14 +149,14 @@ val forge_double_baking_evidence : bh2:block_header -> (bytes, Error_monad.tztrace) result Lwt.t -(** [dal_attestable_slots ctxt ~attestation_level delegates_slots] calls the DAL +(** [dal_attestable_slots ctxt ~attestation_level delegate_ids] calls the DAL node RPC GET /profiles//attested_levels//attestable_slots/ - for each of the delegates in [delegate_infos] and returns the corresponding + for each of the delegates in [delegate_ids] and returns the corresponding promises. *) val dal_attestable_slots : Tezos_rpc.Context.generic -> attestation_level:int32 -> - Baking_state_types.delegate_info list -> + Baking_state_types.Delegate_id.t list -> Baking_state_types.dal_attestable_slots (** [get_dal_profiles ctxt delegates] calls the DAL node RPC GET -- GitLab From 833858bcf7f363a32c9f74a513c322dba5d86272 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 18 Nov 2025 14:52:33 +0000 Subject: [PATCH 2/8] Alpha/Baker: Dal_attestable_slots_worker: Add constructors and include in global_state --- .../testnet_experiment_tools/tool_alpha.ml | 6 +++ src/proto_alpha/lib_delegate/baking_lib.ml | 6 +++ .../lib_delegate/baking_scheduling.ml | 15 +++++- .../lib_delegate/baking_scheduling.mli | 5 +- src/proto_alpha/lib_delegate/baking_state.ml | 4 +- src/proto_alpha/lib_delegate/baking_state.mli | 1 + .../dal_attestable_slots_worker.ml | 51 +++++++++++++++++++ .../dal_attestable_slots_worker.mli | 16 ++++++ 8 files changed, 99 insertions(+), 5 deletions(-) create mode 100644 src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml create mode 100644 src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli diff --git a/devtools/testnet_experiment_tools/tool_alpha.ml b/devtools/testnet_experiment_tools/tool_alpha.ml index 693e485be87c..cde7852d3ba2 100644 --- a/devtools/testnet_experiment_tools/tool_alpha.ml +++ b/devtools/testnet_experiment_tools/tool_alpha.ml @@ -261,6 +261,11 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config let*! operation_worker = Operation_worker.run ?monitor_node_operations ~round_durations cctxt in + let dal_attestable_slots_worker = + Dal_attestable_slots_worker.create + ~attestation_lag:constants.parametric.dal.attestation_lag + ~number_of_slots:constants.parametric.dal.number_of_slots + in Baking_scheduling.create_initial_state cctxt ?synchronize @@ -268,6 +273,7 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config ~chain config operation_worker + dal_attestable_slots_worker ~current_proposal round_durations delegates diff --git a/src/proto_alpha/lib_delegate/baking_lib.ml b/src/proto_alpha/lib_delegate/baking_lib.ml index 5a5b4d097455..ef3cbd809b42 100644 --- a/src/proto_alpha/lib_delegate/baking_lib.ml +++ b/src/proto_alpha/lib_delegate/baking_lib.ml @@ -67,6 +67,11 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool let*! operation_worker = Operation_worker.run ?monitor_node_operations ~round_durations cctxt in + let dal_attestable_slots_worker = + Dal_attestable_slots_worker.create + ~attestation_lag:constants.parametric.dal.attestation_lag + ~number_of_slots:constants.parametric.dal.number_of_slots + in Baking_scheduling.create_initial_state cctxt ?dal_node_rpc_ctxt @@ -74,6 +79,7 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool ~chain config operation_worker + dal_attestable_slots_worker round_durations ~current_proposal ~constants diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index 79711e76ad70..d3b342044bf1 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -616,8 +616,8 @@ let create_round_durations constants = (Round.Durations.create ~first_round_duration ~delay_increment_per_round) let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain - config operation_worker round_durations ~(current_proposal : proposal) - ?constants delegates = + config operation_worker dal_attestable_slots_worker round_durations + ~(current_proposal : proposal) ?constants delegates = let open Lwt_result_syntax in (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7391 consider saved attestable value *) @@ -646,6 +646,7 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain constants; round_durations; operation_worker; + dal_attestable_slots_worker; forge_worker_hooks = { push_request = (fun _ -> assert false); @@ -1017,10 +1018,19 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) in let*? round_durations = create_round_durations constants in let*! operation_worker = Operation_worker.run ~round_durations cctxt in + let dal_attestable_slots_worker = + Dal_attestable_slots_worker.create + ~attestation_lag:constants.parametric.dal.attestation_lag + ~number_of_slots:constants.parametric.dal.number_of_slots + in Option.iter (fun canceler -> Lwt_canceler.on_cancel canceler (fun () -> let*! _ = Operation_worker.shutdown_worker operation_worker in + let*! _ = + Dal_attestable_slots_worker.shutdown_worker + dal_attestable_slots_worker + in Lwt.return_unit)) canceler ; let* initial_state = @@ -1030,6 +1040,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) ~chain config operation_worker + dal_attestable_slots_worker round_durations ~current_proposal ~constants diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.mli b/src/proto_alpha/lib_delegate/baking_scheduling.mli index 88127b792294..81d2ace576c2 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.mli +++ b/src/proto_alpha/lib_delegate/baking_scheduling.mli @@ -132,8 +132,8 @@ val create_loop_state : loop_state (** [create_initial_state context ?synchronize chain baking_configuration - operation_worker current_proposal ?constants consensus_keys] creates an - initial {!Baking_state.t} by initializing a + operation_worker dal_attestable_slots_worker current_proposal ?constants consensus_keys] + creates an initial {!Baking_state.t} by initializing a {!type-Baking_state.global_state}, a {!type-Baking_state.level_state} and a {!type-Baking_state.round_state}. @@ -155,6 +155,7 @@ val create_initial_state : chain:Chain_services.chain -> Baking_configuration.t -> Operation_worker.t -> + Dal_attestable_slots_worker.t -> Round.round_durations -> current_proposal:proposal -> ?constants:Constants.t -> diff --git a/src/proto_alpha/lib_delegate/baking_state.ml b/src/proto_alpha/lib_delegate/baking_state.ml index e79478c7e6b7..f3ea633a5acf 100644 --- a/src/proto_alpha/lib_delegate/baking_state.ml +++ b/src/proto_alpha/lib_delegate/baking_state.ml @@ -499,8 +499,10 @@ type global_state = { constants : Constants.t; (* round durations *) round_durations : Round.round_durations; - (* worker that monitor and aggregates new operations *) + (* worker that monitors and aggregates new operations *) operation_worker : Operation_worker.t; + (* worker that retrieves DAL attestable slots from the DAL node *) + dal_attestable_slots_worker : Dal_attestable_slots_worker.t; (* hooks to the consensus and block forge worker *) mutable forge_worker_hooks : forge_worker_hooks; (* the validation mode used by the baker*) diff --git a/src/proto_alpha/lib_delegate/baking_state.mli b/src/proto_alpha/lib_delegate/baking_state.mli index 3ea4c80cca20..5cfc915a02a8 100644 --- a/src/proto_alpha/lib_delegate/baking_state.mli +++ b/src/proto_alpha/lib_delegate/baking_state.mli @@ -434,6 +434,7 @@ type global_state = { constants : Constants.t; round_durations : Round.round_durations; operation_worker : Operation_worker.t; + dal_attestable_slots_worker : Dal_attestable_slots_worker.t; mutable forge_worker_hooks : forge_worker_hooks; validation_mode : validation_mode; delegates : Key.t list; diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml new file mode 100644 index 000000000000..9a4db35d6436 --- /dev/null +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -0,0 +1,51 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +open Baking_state_types +open Tezos_dal_node_services + +(** A handle to a single delegate’s DAL monitoring subscription. *) +type stream_handle = { + stream : Types.Attestable_event.t Lwt_stream.t; + stopper : Tezos_rpc.Context.stopper; +} + +type slots_by_delegate = Types.attestable_slots Delegate_id.Table.t + +type t = { + attestation_lag : int; + number_of_slots : int; + streams : stream_handle Delegate_id.Table.t; + (** Active per-delegate subscriptions. *) + cache : (int32, slots_by_delegate) Stdlib.Hashtbl.t; + (** Cache of attestable slots, keyed by attestation levels. *) +} + +let create_delegate_table () = Delegate_id.Table.create 10 + +let create ~attestation_lag ~number_of_slots = + { + attestation_lag; + number_of_slots; + streams = create_delegate_table (); + cache = Stdlib.Hashtbl.create 16; + } + +let shutdown_worker state = + let open Lwt_syntax in + let* stoppers = + let stoppers = + Delegate_id.Table.to_seq state.streams + |> Seq.map (fun (_delegate_id, {stopper; _}) -> stopper) + |> List.of_seq + in + Delegate_id.Table.clear state.streams ; + Stdlib.Hashtbl.reset state.cache ; + return stoppers + in + List.iter (fun stopper -> stopper ()) stoppers ; + return_unit diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli new file mode 100644 index 000000000000..0c49f2d006ba --- /dev/null +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -0,0 +1,16 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +type t + +(** [create ~attestation_lag ~number_of_slots] creates a new worker state. *) +val create : attestation_lag:int -> number_of_slots:int -> t + +(** [shutdown_worker state] stops all active delegate subscriptions and clears + the worker’s in-memory state. The worker will no longer hold any references + to live streams and the cache will become empty. *) +val shutdown_worker : t -> unit Lwt.t -- GitLab From e1973390deb6c544c935aad84825485e4aa5398b Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 18 Nov 2025 14:55:12 +0000 Subject: [PATCH 3/8] Alpha/Baker: Node_rpc: Small refactoring --- src/proto_alpha/lib_delegate/node_rpc.ml | 6 +++--- src/proto_alpha/lib_delegate/node_rpc.mli | 13 +++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/proto_alpha/lib_delegate/node_rpc.ml b/src/proto_alpha/lib_delegate/node_rpc.ml index 51116626f03f..7dc4e1061a3e 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.ml +++ b/src/proto_alpha/lib_delegate/node_rpc.ml @@ -466,8 +466,9 @@ let fetch_dal_config cctxt = | Error e -> return_error e | Ok dal_config -> return_ok dal_config -let get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level = +let get_attestable_slots dal_node_rpc_ctxt delegate_id ~attestation_level = let pkh = Delegate_id.to_pkh delegate_id in + let attested_level = Int32.succ attestation_level in Tezos_rpc.Context.make_call Tezos_dal_node_services.Services.get_attestable_slots dal_node_rpc_ctxt @@ -477,10 +478,9 @@ let get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level = let dal_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) ~attestation_level = - let attested_level = Int32.succ attestation_level in List.map (fun delegate_id -> ( delegate_id, - get_attestable_slots dal_node_rpc_ctxt delegate_id ~attested_level )) + get_attestable_slots dal_node_rpc_ctxt delegate_id ~attestation_level )) let get_dal_profiles dal_node_rpc_ctxt = Tezos_rpc.Context.make_call diff --git a/src/proto_alpha/lib_delegate/node_rpc.mli b/src/proto_alpha/lib_delegate/node_rpc.mli index 46b325c9ad78..b4adf827c8d8 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.mli +++ b/src/proto_alpha/lib_delegate/node_rpc.mli @@ -25,6 +25,7 @@ open Protocol open Alpha_context +open Baking_state_types (** Inject a block. @@ -64,17 +65,17 @@ val preapply_block : val monitor_valid_proposals : #Protocol_client_context.rpc_context -> chain:Shell_services.chain -> - ?cache:Baking_state_types.block_info Baking_cache.Block_cache.t -> + ?cache:block_info Baking_cache.Block_cache.t -> unit -> - (Baking_state_types.proposal Lwt_stream.t * (unit -> unit)) tzresult Lwt.t + (proposal Lwt_stream.t * (unit -> unit)) tzresult Lwt.t (** Monitor heads from the node. *) val monitor_heads : #Protocol_client_context.rpc_context -> chain:Shell_services.chain -> - ?cache:Baking_state_types.block_info Baking_cache.Block_cache.t -> + ?cache:block_info Baking_cache.Block_cache.t -> unit -> - (Baking_state_types.proposal Lwt_stream.t * (unit -> unit)) tzresult Lwt.t + (proposal Lwt_stream.t * (unit -> unit)) tzresult Lwt.t (** Await the current protocol to be activated. *) val await_protocol_activation : @@ -156,8 +157,8 @@ val forge_double_baking_evidence : val dal_attestable_slots : Tezos_rpc.Context.generic -> attestation_level:int32 -> - Baking_state_types.Delegate_id.t list -> - Baking_state_types.dal_attestable_slots + Delegate_id.t list -> + dal_attestable_slots (** [get_dal_profiles ctxt delegates] calls the DAL node RPC GET /profiles/ to retrieve the DAL node's profiles. *) -- GitLab From 86561a5ea433e5371461c1b946fd76b5eeb20c84 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 18 Nov 2025 15:10:49 +0000 Subject: [PATCH 4/8] Alpha/Baker: Dal_attestable_slots_worker: Add streams management mechanism Introduce the capability to update the subscribed streams managed in the worker's cache. This update function is to be used by the baker, currently it is a no-op. --- .../dal_attestable_slots_worker.ml | 215 ++++++++++++++++++ .../dal_attestable_slots_worker.mli | 13 +- src/proto_alpha/lib_delegate/node_rpc.ml | 16 ++ src/proto_alpha/lib_delegate/node_rpc.mli | 12 + 4 files changed, 255 insertions(+), 1 deletion(-) diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index 9a4db35d6436..f69f273244c7 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -8,6 +8,50 @@ open Baking_state_types open Tezos_dal_node_services +module Events = struct + include Internal_event.Simple + + let section = [Protocol.name; "baker"; "dal_attestable_slots_worker"] + + let consumed_backfill_stream = + declare_1 + ~section + ~name:"consumed_backfill_stream" + ~level:Info + ~msg:"Consumed backfill stream for {delegate_id}" + ("delegate_id", Delegate_id.encoding) + + let monitor_attestable_slots_failed = + declare_2 + ~section + ~name:"monitor_attestable_slots_failed" + ~level:Error + ~msg: + "Unable to subscribe to monitor attestable slots stream for {delegate} \ + -- {trace}" + ("delegate", Delegate_id.encoding) + ~pp2:Error_monad.pp_print_trace + ("trace", Error_monad.trace_encoding) + + let stream_ended = + declare_1 + ~section + ~name:"dal_stream_ended" + ~level:Notice + ~msg:"DAL monitor stream ended for {delegate_id}" + ("delegate_id", Delegate_id.encoding) + + let consume_streams_ended = + declare_1 + ~section + ~name:"consume_streams_ended" + ~level:Error + ~msg:"consume_streams ended with error {stacktrace}" + ("stacktrace", Data_encoding.string) +end + +module DelegateSet = Set.Make (Delegate_id) + (** A handle to a single delegate’s DAL monitoring subscription. *) type stream_handle = { stream : Types.Attestable_event.t Lwt_stream.t; @@ -27,6 +71,177 @@ type t = { let create_delegate_table () = Delegate_id.Table.create 10 +(** [get_slots_by_delegate state ~attestation_level] returns the per-delegate + cache bucket for the given [~attestation_level]. If none exists yet, it + creates an empty one, stores it in [state.cache], and returns it. *) +let get_slots_by_delegate state ~attestation_level = + match Stdlib.Hashtbl.find_opt state.cache attestation_level with + | Some slots_by_delegate -> slots_by_delegate + | None -> + let slots_by_delegate = create_delegate_table () in + Stdlib.Hashtbl.add state.cache attestation_level slots_by_delegate ; + slots_by_delegate + +(** [update_cache_with_attestable_slot state ?is_trap ~delegate_id ~slot_id] adds [~slot_id] + to the cache, using the keys [attestation_level] = [slot_level] + [attestation_lag] - 1 + and [~delegate_id]. The bit associated to [~slot_id] and [~delegate_id] is set to the + opposite of [?is_trap]. *) +let update_cache_with_attestable_slot ?(is_trap = false) state ~delegate_id + ~slot_id = + let Types.Slot_id.{slot_level; slot_index} = slot_id in + let attestation_level = + Int32.(pred @@ add slot_level (of_int state.attestation_lag)) + in + let slots_by_delegate = get_slots_by_delegate state ~attestation_level in + let value = not is_trap in + let attestable_slots = + match Delegate_id.Table.find_opt slots_by_delegate delegate_id with + | Some (Types.Attestable_slots {slots; published_level}) -> + let slots_array = Array.of_list slots in + slots_array.(slot_index) <- value ; + let slots = Array.to_list slots_array in + Types.Attestable_slots {slots; published_level} + | Some Not_in_committee -> + (* We should never reach this point, as the delegate should not have any attestable + slot, as they are not in the committee. *) + Types.Not_in_committee + | None -> + let slots = Array.make state.number_of_slots false in + slots.(slot_index) <- value ; + Types.Attestable_slots + {slots = Array.to_list slots; published_level = slot_level} + in + Delegate_id.Table.replace slots_by_delegate delegate_id attestable_slots + +(** [update_cache_no_shards_assigned state ~delegate_id ~attestation_level] adds a + [Not_in_committee] element into the cache, using [~delegate_id] and [~attestation_level] + as keys. *) +let update_cache_no_shards_assigned state ~delegate_id ~attestation_level = + let slots_by_delegate = get_slots_by_delegate state ~attestation_level in + Delegate_id.Table.replace slots_by_delegate delegate_id Types.Not_in_committee + +(** [update_cache_backfill_payload state ~delegate_id ~backfill_payload] + merges a DAL [Backfill] event for [~delegate_id] into the in-memory cache. *) +let update_cache_backfill_payload state ~delegate_id ~backfill_payload = + let module E = Types.Attestable_event in + let E.{slot_ids; no_shards_attestation_levels} = backfill_payload in + List.iter + (fun slot_id -> + update_cache_with_attestable_slot state ~delegate_id ~slot_id) + slot_ids ; + List.iter + (fun attestation_level -> + update_cache_no_shards_assigned state ~delegate_id ~attestation_level) + no_shards_attestation_levels + +(** [consume_backfill_stream state stream_handle ~delegate_id] consumes the initial [Backfill] + event from a freshly opened DAL monitoring stream. This function is meant to be called + immediatelly after subscribing to the DAL node stream for a [~delegate_id] + and before spawning the asynchronous consumer that handles live events. *) +let consume_backfill_stream state stream_handle ~delegate_id = + let open Lwt_syntax in + let module E = Types.Attestable_event in + let* attestable_event_opt = Lwt_stream.get stream_handle.stream in + match attestable_event_opt with + | Some (E.Backfill {backfill_payload}) -> + update_cache_backfill_payload state ~delegate_id ~backfill_payload ; + let* () = Events.(emit consumed_backfill_stream delegate_id) in + return_unit + | None -> + let* () = Events.(emit stream_ended delegate_id) in + Delegate_id.Table.remove state.streams delegate_id ; + return_unit + | _ -> + Lwt.fail_with + (Format.asprintf + "The stream must always start properly with a Backfill event for \ + delegate id: %a." + Delegate_id.pp + delegate_id) + +(** [consume_stream state stream_handler ~delegate_id] consumes [~delegate_id]'s stream + continuously, updating the [state] cache accordingly. *) +let rec consume_stream state stream_handle ~delegate_id = + let open Lwt_syntax in + let module E = Types.Attestable_event in + let* attestable_event_opt = Lwt_stream.get stream_handle.stream in + match attestable_event_opt with + | None -> + (* Stream gets closed, as we deliberately drop the subscription here; the consumer + will detect (e.g. via [update_streams_subscriptions]) the missing entry and + re-subscribe with backfill on the next level. *) + let* () = Events.(emit stream_ended delegate_id) in + Delegate_id.Table.remove state.streams delegate_id ; + return_unit + | Some (E.Attestable_slot {slot_id}) -> + update_cache_with_attestable_slot state ~delegate_id ~slot_id ; + consume_stream state ~delegate_id stream_handle + | Some (No_shards_assigned {attestation_level}) -> + update_cache_no_shards_assigned state ~delegate_id ~attestation_level ; + consume_stream state ~delegate_id stream_handle + | Some (Slot_has_trap {slot_id}) -> + (* In case of a trap, we know the slot is not attestable, so we record an explicit [false] bit. *) + update_cache_with_attestable_slot + state + ~is_trap:true + ~delegate_id + ~slot_id ; + consume_stream state ~delegate_id stream_handle + | Some (Backfill _backfill_payload) -> + (* This case should never be reached as the [Backfill] is always the first element of the + stream, and is to be consumed in [consume_backfill_stream]. *) + Lwt.fail_with + "Backfill events should always be consumed synchronously at the \ + beginning of a subscription." + +(** [subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add] + opens new monitoring DAL streams for each entry in [~delegate_ids_to_add], + and starts consuming from them, to populate the internal [state] cache. *) +let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = + let open Lwt_syntax in + let* new_streams = + List.filter_map_p + (fun delegate_id -> + let* res = + Node_rpc.monitor_attestable_slots dal_node_rpc_ctxt ~delegate_id + in + match res with + | Ok (stream, stopper) -> return_some (delegate_id, {stream; stopper}) + | Error trace -> + let* () = + Events.(emit monitor_attestable_slots_failed (delegate_id, trace)) + in + return_none) + delegate_ids_to_add + in + List.iter + (fun (delegate_id, stream_handle) -> + Delegate_id.Table.add state.streams delegate_id stream_handle) + new_streams ; + List.iter + (fun (delegate_id, stream_handle) -> + Lwt.dont_wait + (fun () -> + let* () = consume_backfill_stream state ~delegate_id stream_handle in + consume_stream state ~delegate_id stream_handle) + (fun exn -> + Events.( + emit__dont_wait__use_with_care + consume_streams_ended + (Printexc.to_string exn)))) + new_streams ; + return_unit + +let update_streams_subscriptions state dal_node_rpc_ctxt ~delegate_ids = + let delegate_ids_to_add = + let new_delegate_ids = DelegateSet.of_list delegate_ids in + let current_delegate_ids = + DelegateSet.of_seq @@ Delegate_id.Table.to_seq_keys state.streams + in + DelegateSet.(elements (diff new_delegate_ids current_delegate_ids)) + in + subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add + let create ~attestation_lag ~number_of_slots = { attestation_lag; diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli index 0c49f2d006ba..246e30f0f2d6 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -7,7 +7,18 @@ type t -(** [create ~attestation_lag ~number_of_slots] creates a new worker state. *) +(** [update_streams_subscriptions state dal_node_rpc_ctxt ~delegate_ids] reconciles + the active set of streams using [~delegate_ids] by computing the list of streams + to subscribe to. *) +val update_streams_subscriptions : + t -> + Tezos_rpc.Context.generic -> + delegate_ids:Baking_state_types.Delegate_id.t list -> + unit Lwt.t + +(** [create ~attestation_lag ~number_of_slots] creates a new worker state. This + does not start any background thread, as streams are opened via + [update_streams_subscriptions]. *) val create : attestation_lag:int -> number_of_slots:int -> t (** [shutdown_worker state] stops all active delegate subscriptions and clears diff --git a/src/proto_alpha/lib_delegate/node_rpc.ml b/src/proto_alpha/lib_delegate/node_rpc.ml index 7dc4e1061a3e..e593013bd309 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.ml +++ b/src/proto_alpha/lib_delegate/node_rpc.ml @@ -482,6 +482,22 @@ let dal_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) ( delegate_id, get_attestable_slots dal_node_rpc_ctxt delegate_id ~attestation_level )) +let monitor_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) + ~delegate_id = + let open Lwt_syntax in + let pkh = Delegate_id.to_pkh delegate_id in + let* result = + Tezos_rpc.Context.make_streamed_call + Tezos_dal_node_services.Services.monitor_attestable_slots + dal_node_rpc_ctxt + ((), Tezos_crypto.Signature.Of_V2.public_key_hash pkh) + () + () + in + match result with + | Ok result -> return_ok result + | Error trace -> return_error trace + let get_dal_profiles dal_node_rpc_ctxt = Tezos_rpc.Context.make_call Tezos_dal_node_services.Services.get_profiles diff --git a/src/proto_alpha/lib_delegate/node_rpc.mli b/src/proto_alpha/lib_delegate/node_rpc.mli index b4adf827c8d8..fe6b8f3774c1 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.mli +++ b/src/proto_alpha/lib_delegate/node_rpc.mli @@ -160,6 +160,18 @@ val dal_attestable_slots : Delegate_id.t list -> dal_attestable_slots +(** [monitor_attestable_slots dal_node_rpc_ctxt ~delegate_id] opens a streamed RPC + to the DAL node for the given [~delegate_id]. Each item emitted on the stream + contains DAL attestable information for this delegate. *) +val monitor_attestable_slots : + Tezos_rpc.Context.generic -> + delegate_id:Delegate_id.t -> + ( Tezos_dal_node_services.Types.Attestable_event.t Lwt_stream.t + * Tezos_rpc.Context.stopper, + Error_monad.tztrace ) + result + Lwt.t + (** [get_dal_profiles ctxt delegates] calls the DAL node RPC GET /profiles/ to retrieve the DAL node's profiles. *) val get_dal_profiles : -- GitLab From d44add3765f7584a0853621b2da031cf998cb266 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 18 Nov 2025 15:12:53 +0000 Subject: [PATCH 5/8] Alpha/Baker: Add mutex for streams subscription We need this because the streams can be updated at different places in the baker code, for instance at DAL profiles registration, or when we update to a new level. Therefore, we need to avoid a scenario where these calls are intertwined. --- .../dal_attestable_slots_worker.ml | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index f69f273244c7..966524ff64ac 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -67,6 +67,7 @@ type t = { (** Active per-delegate subscriptions. *) cache : (int32, slots_by_delegate) Stdlib.Hashtbl.t; (** Cache of attestable slots, keyed by attestation levels. *) + subscriptions_lock : Lwt_mutex.t; (** Lock for streams subscriptions. *) } let create_delegate_table () = Delegate_id.Table.create 10 @@ -149,6 +150,7 @@ let consume_backfill_stream state stream_handle ~delegate_id = return_unit | None -> let* () = Events.(emit stream_ended delegate_id) in + Lwt_mutex.with_lock state.subscriptions_lock @@ fun () -> Delegate_id.Table.remove state.streams delegate_id ; return_unit | _ -> @@ -171,6 +173,7 @@ let rec consume_stream state stream_handle ~delegate_id = will detect (e.g. via [update_streams_subscriptions]) the missing entry and re-subscribe with backfill on the next level. *) let* () = Events.(emit stream_ended delegate_id) in + Lwt_mutex.with_lock state.subscriptions_lock @@ fun () -> Delegate_id.Table.remove state.streams delegate_id ; return_unit | Some (E.Attestable_slot {slot_id}) -> @@ -214,10 +217,14 @@ let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = return_none) delegate_ids_to_add in - List.iter - (fun (delegate_id, stream_handle) -> - Delegate_id.Table.add state.streams delegate_id stream_handle) - new_streams ; + let* () = + Lwt_mutex.with_lock state.subscriptions_lock @@ fun () -> + List.iter + (fun (delegate_id, stream_handle) -> + Delegate_id.Table.replace state.streams delegate_id stream_handle) + new_streams ; + return_unit + in List.iter (fun (delegate_id, stream_handle) -> Lwt.dont_wait @@ -233,12 +240,15 @@ let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = return_unit let update_streams_subscriptions state dal_node_rpc_ctxt ~delegate_ids = - let delegate_ids_to_add = + let open Lwt_syntax in + let* delegate_ids_to_add = + Lwt_mutex.with_lock state.subscriptions_lock @@ fun () -> let new_delegate_ids = DelegateSet.of_list delegate_ids in let current_delegate_ids = DelegateSet.of_seq @@ Delegate_id.Table.to_seq_keys state.streams in - DelegateSet.(elements (diff new_delegate_ids current_delegate_ids)) + return + @@ DelegateSet.(elements (diff new_delegate_ids current_delegate_ids)) in subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add @@ -248,6 +258,7 @@ let create ~attestation_lag ~number_of_slots = number_of_slots; streams = create_delegate_table (); cache = Stdlib.Hashtbl.create 16; + subscriptions_lock = Lwt_mutex.create (); } let shutdown_worker state = -- GitLab From 926212ed09c2f1f2255d2ca65722f9e72c6b21dc Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 18 Nov 2025 15:14:26 +0000 Subject: [PATCH 6/8] Alpha/Baker: Dal_attestable_slots_worker: Add documentation --- .../dal_attestable_slots_worker.mli | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli index 246e30f0f2d6..989a67f3ce8b 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -5,6 +5,24 @@ (* *) (*****************************************************************************) +(** The DAL attestable slots worker keeps per-delegate subscriptions to + the DAL node’s streaming RPC [GET /profiles//monitor/attestable_slots)]. + Each stream emits attestable slots events as soon as they are available from + the DAL node. Check {!Tezos_dal_node_services.Types.Attestable_event.t} for + the possible types of events available. + + Incoming events are folded into an in-memory cache keyed by attestation level + and delegate id. For each (attestation_level, delegate), the worker maintains a + boolean bitset of attestable slots. This cache is filled continuously and independently + of the baker’s main loop so that consensus code never waits on the network. There can + be a little overhead for the backfill of the cache, which is done at stream subscription + (usually at startup). + + The worker's purpose is to decouple the critical consensus path from DAL + RPC latency: streams advance in the background, therefore the cache can serve + DAL information instantly. +*) + type t (** [update_streams_subscriptions state dal_node_rpc_ctxt ~delegate_ids] reconciles -- GitLab From 89ea991ddc0001298189b84637ba40086ecd8130 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 18 Nov 2025 15:15:54 +0000 Subject: [PATCH 7/8] Alpha/Baker: Dal_attestable_slots_worker: Add get_attestable_slots This functionality is not yet used, but it will be in the baker in a future MR. --- .../dal_attestable_slots_worker.ml | 40 +++++++++++++++++++ .../dal_attestable_slots_worker.mli | 13 ++++++ 2 files changed, 53 insertions(+) diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index 966524ff64ac..ec5102895b8c 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -21,6 +21,27 @@ module Events = struct ~msg:"Consumed backfill stream for {delegate_id}" ("delegate_id", Delegate_id.encoding) + let no_attestable_slot_at_level = + declare_1 + ~section + ~name:"no_attestable_slot_at_level" + ~level:Warning + ~msg: + "No attestable slots found for attestation level {attestation_level} \ + in cache" + ("attestation_level", Data_encoding.int32) + + let no_attestable_slot_at_level_for_delegate = + declare_2 + ~section + ~name:"no_attestable_slot_at_level_for_delegate" + ~level:Warning + ~msg: + "No attestable slots found for attestation level {attestation_level} \ + and {delegate_id} in cache" + ("attestation_level", Data_encoding.int32) + ("delegate_id", Delegate_id.encoding) + let monitor_attestable_slots_failed = declare_2 ~section @@ -252,6 +273,25 @@ let update_streams_subscriptions state dal_node_rpc_ctxt ~delegate_ids = in subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add +(* TODO: Use this functionality in the baker instead of the [dal_attestable_slots] static method. *) +let get_dal_attestable_slots state ~delegate_id ~attestation_level = + let open Lwt_syntax in + match Stdlib.Hashtbl.find_opt state.cache attestation_level with + | None -> + let* () = Events.(emit no_attestable_slot_at_level attestation_level) in + return_none + | Some slots_by_delegate -> ( + match Delegate_id.Table.find_opt slots_by_delegate delegate_id with + | None -> + let* () = + Events.( + emit + no_attestable_slot_at_level_for_delegate + (attestation_level, delegate_id)) + in + return_none + | Some slots -> return_some slots) + let create ~attestation_lag ~number_of_slots = { attestation_lag; diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli index 989a67f3ce8b..19bf32a15c6d 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.mli @@ -21,6 +21,10 @@ The worker's purpose is to decouple the critical consensus path from DAL RPC latency: streams advance in the background, therefore the cache can serve DAL information instantly. + + When a consumer asks for attestable slots at level L for a set of delegates, + the worker returns an immediate "snapshot" from the cache (defaulting to an all-false + bitset if nothing has been observed yet). *) type t @@ -34,6 +38,15 @@ val update_streams_subscriptions : delegate_ids:Baking_state_types.Delegate_id.t list -> unit Lwt.t +(** [get_dal_attestable_slots t ctxt ~delegate_id ~attestation_level] + returns for [~delegate_id], the current bitset for [published_level] derived + from [~attestation_level], if found in the cache. *) +val get_dal_attestable_slots : + t -> + delegate_id:Baking_state_types.Delegate_id.t -> + attestation_level:int32 -> + Tezos_dal_node_services.Types.attestable_slots option Lwt.t + (** [create ~attestation_lag ~number_of_slots] creates a new worker state. This does not start any background thread, as streams are opened via [update_streams_subscriptions]. *) -- GitLab From 91833249ee874a622e8aa1ed6c1441d1dd4ee3a7 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 18 Nov 2025 15:17:51 +0000 Subject: [PATCH 8/8] Alpha/Baker: Dal_attestable_slots_worker: Add pruning mechanism When adding the FIFO eviction policy in the cache, we rely on the Backfill adding elements in ascending order of attestation_level. --- .../dal_attestable_slots_worker.ml | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index ec5102895b8c..37ebf5c0a5d6 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -81,13 +81,21 @@ type stream_handle = { type slots_by_delegate = Types.attestable_slots Delegate_id.Table.t +module Level_map = + Aches.Vache.Map (Aches.Vache.FIFO_Precise) (Aches.Vache.Strong) + (struct + include Int32 + + let hash = Hashtbl.hash + end) + type t = { attestation_lag : int; number_of_slots : int; streams : stream_handle Delegate_id.Table.t; (** Active per-delegate subscriptions. *) - cache : (int32, slots_by_delegate) Stdlib.Hashtbl.t; - (** Cache of attestable slots, keyed by attestation levels. *) + cache : slots_by_delegate Level_map.t; + (** Bounded FIFO cache of attestable slots, keyed by attestation levels. *) subscriptions_lock : Lwt_mutex.t; (** Lock for streams subscriptions. *) } @@ -97,11 +105,11 @@ let create_delegate_table () = Delegate_id.Table.create 10 cache bucket for the given [~attestation_level]. If none exists yet, it creates an empty one, stores it in [state.cache], and returns it. *) let get_slots_by_delegate state ~attestation_level = - match Stdlib.Hashtbl.find_opt state.cache attestation_level with + match Level_map.find_opt state.cache attestation_level with | Some slots_by_delegate -> slots_by_delegate | None -> let slots_by_delegate = create_delegate_table () in - Stdlib.Hashtbl.add state.cache attestation_level slots_by_delegate ; + Level_map.replace state.cache attestation_level slots_by_delegate ; slots_by_delegate (** [update_cache_with_attestable_slot state ?is_trap ~delegate_id ~slot_id] adds [~slot_id] @@ -276,7 +284,7 @@ let update_streams_subscriptions state dal_node_rpc_ctxt ~delegate_ids = (* TODO: Use this functionality in the baker instead of the [dal_attestable_slots] static method. *) let get_dal_attestable_slots state ~delegate_id ~attestation_level = let open Lwt_syntax in - match Stdlib.Hashtbl.find_opt state.cache attestation_level with + match Level_map.find_opt state.cache attestation_level with | None -> let* () = Events.(emit no_attestable_slot_at_level attestation_level) in return_none @@ -297,7 +305,9 @@ let create ~attestation_lag ~number_of_slots = attestation_lag; number_of_slots; streams = create_delegate_table (); - cache = Stdlib.Hashtbl.create 16; + cache = + (* a [2 * lag] size should be enough; we use more for safety *) + Level_map.create (3 * attestation_lag); subscriptions_lock = Lwt_mutex.create (); } @@ -310,7 +320,7 @@ let shutdown_worker state = |> List.of_seq in Delegate_id.Table.clear state.streams ; - Stdlib.Hashtbl.reset state.cache ; + Level_map.clear state.cache ; return stoppers in List.iter (fun stopper -> stopper ()) stoppers ; -- GitLab