From 1af7502707e47ee168bbef7c2e19a0855ca49590 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 18 Nov 2025 17:10:08 +0000 Subject: [PATCH 1/3] Alpha/Baker: Update DAL attestable slots worker streams --- .../lib_delegate/baking_actions.ml | 129 ++++++------------ .../lib_delegate/baking_actions.mli | 6 +- src/proto_alpha/lib_delegate/baking_events.ml | 21 --- src/proto_alpha/lib_delegate/baking_lib.ml | 13 +- .../lib_delegate/baking_scheduling.ml | 22 --- src/proto_alpha/lib_delegate/baking_state.ml | 11 +- src/proto_alpha/lib_delegate/baking_state.mli | 5 - .../lib_delegate/baking_state_types.ml | 5 - .../lib_delegate/baking_state_types.mli | 8 -- .../dal_attestable_slots_worker.ml | 1 - src/proto_alpha/lib_delegate/node_rpc.ml | 17 --- src/proto_alpha/lib_delegate/node_rpc.mli | 10 -- .../lib_delegate/state_transitions.ml | 5 +- 13 files changed, 55 insertions(+), 198 deletions(-) diff --git a/src/proto_alpha/lib_delegate/baking_actions.ml b/src/proto_alpha/lib_delegate/baking_actions.ml index 215791718e03..f1e8140cc916 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.ml +++ b/src/proto_alpha/lib_delegate/baking_actions.ml @@ -148,8 +148,6 @@ and level_update = { current_round:Round.t -> delegate_infos:delegate_infos -> next_level_delegate_infos:delegate_infos -> - dal_attestable_slots:dal_attestable_slots -> - next_level_dal_attestable_slots:dal_attestable_slots -> (state * action) Lwt.t; } @@ -491,13 +489,13 @@ let dal_checks_and_warnings state = else return_unit let only_if_dal_feature_enabled state ~default_value f = - let open Lwt_syntax in + let open Lwt_result_syntax in let open Constants in let Parametric.{dal = {feature_enable; _}; _} = state.global_state.constants.parametric in if feature_enable then - let* () = dal_checks_and_warnings state in + let*! () = dal_checks_and_warnings state in Option.fold ~none:(return default_value) ~some:f @@ -507,42 +505,31 @@ let only_if_dal_feature_enabled state ~default_value f = let process_dal_rpc_result state delegate level round = let open Lwt_result_syntax in function - | `RPC_timeout -> - let*! () = - Events.(emit failed_to_get_dal_attestations_in_time (delegate, level)) - in + | None -> return_none + | Some Tezos_dal_node_services.Types.Not_in_committee -> + let*! () = Events.(emit not_in_dal_committee (delegate, level)) in return_none - | `RPC_result (Error errs) -> + | Some (Attestable_slots {slots; published_level}) -> + let number_of_slots = + state.global_state.constants.parametric.dal.number_of_slots + in + let dal_attestation = + List.fold_left_i + (fun i acc flag -> + match Dal.Slot_index.of_int_opt ~number_of_slots i with + | Some index when flag -> Dal.Attestation.commit acc index + | None | Some _ -> acc) + Dal.Attestation.empty + slots + in + let dal_content = {attestation = dal_attestation} in let*! () = - Events.(emit failed_to_get_dal_attestations (delegate, errs)) + Events.( + emit + attach_dal_attestation + (delegate, dal_content, published_level, level, round)) in - return_none - | `RPC_result (Ok res) -> ( - match res with - | Tezos_dal_node_services.Types.Not_in_committee -> - let*! () = Events.(emit not_in_dal_committee (delegate, level)) in - return_none - | Attestable_slots {slots; published_level} -> - let number_of_slots = - state.global_state.constants.parametric.dal.number_of_slots - in - let dal_attestation = - List.fold_left_i - (fun i acc flag -> - match Dal.Slot_index.of_int_opt ~number_of_slots i with - | Some index when flag -> Dal.Attestation.commit acc index - | None | Some _ -> acc) - Dal.Attestation.empty - slots - in - let dal_content = {attestation = dal_attestation} in - let*! () = - Events.( - emit - attach_dal_attestation - (delegate, dal_content, published_level, level, round)) - in - return_some dal_content) + return_some dal_content let may_get_dal_content state consensus_vote = let open Lwt_result_syntax in @@ -552,30 +539,17 @@ let may_get_dal_content state consensus_vote = vote_consensus_content.round ) in let delegate_id = Delegate.delegate_id delegate in - let promise_opt = - List.assoc_opt - ~equal:Delegate_id.equal - delegate_id - state.level_state.dal_attestable_slots - in - match promise_opt with - | None -> return_none - | Some promise -> - let*! res = - (* Normally we'd just check the state of the promise and return the - resolved value or an error if the promise is still pending. However, - tests that bake in the past would fail, because there would not be - sufficient time to get the answer from the DAL node. Therefore, we - wait for a bit for the DAL node to provide an answer. *) - Lwt.pick - [ - (let*! () = Lwt_unix.sleep 0.75 in - Lwt.return `RPC_timeout); - (let*! tz_res = promise in - Lwt.return (`RPC_result tz_res)); - ] + only_if_dal_feature_enabled + state + ~default_value:None + (fun _dal_node_rpc_ctxt -> + let*! dal_attestable_slots = + Dal_attestable_slots_worker.get_dal_attestable_slots + state.global_state.dal_attestable_slots_worker + ~delegate_id + ~attestation_level:level in - process_dal_rpc_result state delegate_id level round res + process_dal_rpc_result state delegate_id level round dal_attestable_slots) let is_authorized (global_state : global_state) highwatermarks consensus_vote = let {delegate; vote_consensus_content; _} = consensus_vote in @@ -1126,35 +1100,11 @@ let update_to_level state level_update = new_level_proposal round_durations [@profiler.record_f {verbosity = Debug} "compute round"]) in - let*! dal_attestable_slots, next_level_dal_attestable_slots = - only_if_dal_feature_enabled - state - ~default_value:([], []) - (fun dal_node_rpc_ctxt -> - let dal_attestable_slots = - if Int32.(new_level = succ state.level_state.current_level) then - state.level_state.next_level_dal_attestable_slots - else - Node_rpc.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:new_level - (Delegate_infos.own_delegate_ids delegate_infos) - in - let next_level_dal_attestable_slots = - Node_rpc.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:(Int32.succ new_level) - (Delegate_infos.own_delegate_ids next_level_delegate_infos) - in - Lwt.return (dal_attestable_slots, next_level_dal_attestable_slots)) - in let*! new_state, new_action = (compute_new_state ~current_round ~delegate_infos ~next_level_delegate_infos - ~dal_attestable_slots - ~next_level_dal_attestable_slots [@profiler.record_s {verbosity = Debug} "compute new state"]) in let _promise = @@ -1169,10 +1119,13 @@ let update_to_level state level_update = can change at level boundaries (e.g. migrations, reorganisations, key/profile updates). Doing it here makes the streams ready before we build next-level attestations. *) - Dal_attestable_slots_worker.update_streams_subscriptions - state.global_state.dal_attestable_slots_worker - dal_node_rpc_ctxt - ~delegate_ids:next_level_delegate_ids) + let*! () = + Dal_attestable_slots_worker.update_streams_subscriptions + state.global_state.dal_attestable_slots_worker + dal_node_rpc_ctxt + ~delegate_ids:next_level_delegate_ids + in + return_unit) in return (new_state, new_action) diff --git a/src/proto_alpha/lib_delegate/baking_actions.mli b/src/proto_alpha/lib_delegate/baking_actions.mli index 67d32b2d79e2..a90e04b7c247 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.mli +++ b/src/proto_alpha/lib_delegate/baking_actions.mli @@ -57,8 +57,6 @@ and level_update = { current_round:Round.t -> delegate_infos:delegate_infos -> next_level_delegate_infos:delegate_infos -> - dal_attestable_slots:dal_attestable_slots -> - next_level_dal_attestable_slots:dal_attestable_slots -> (state * action) Lwt.t; } @@ -144,8 +142,8 @@ val update_to_level : state -> level_update -> (state * t) tzresult Lwt.t val only_if_dal_feature_enabled : state -> default_value:'a -> - (Tezos_rpc.Context.generic -> 'a Lwt.t) -> - 'a Lwt.t + (Tezos_rpc.Context.generic -> 'a tzresult Lwt.t) -> + 'a tzresult Lwt.t (** [may_get_dal_content state unsigned_consensus_vote], if the DAL feature is enabled, recovers the attestable slots by calling diff --git a/src/proto_alpha/lib_delegate/baking_events.ml b/src/proto_alpha/lib_delegate/baking_events.ml index 1b917a6708e3..82d34e3e3ff3 100644 --- a/src/proto_alpha/lib_delegate/baking_events.ml +++ b/src/proto_alpha/lib_delegate/baking_events.ml @@ -803,27 +803,6 @@ module Actions = struct ~pp2:Error_monad.pp_print_trace ("trace", Error_monad.trace_encoding) - let failed_to_get_dal_attestations = - declare_2 - ~section - ~name:"failed_to_get_attestations" - ~level:Error - ~msg:"unable to get DAL attestation for {delegate} -- {trace}" - ("delegate", Delegate_id.encoding) - ~pp2:Error_monad.pp_print_trace - ("trace", Error_monad.trace_encoding) - - let failed_to_get_dal_attestations_in_time = - declare_2 - ~section - ~name:"failed_to_get_attestations_in_time" - ~level:Error - ~msg: - "unable to get DAL attestation for {delegate} in time for attestation \ - level {level}" - ("delegate", Delegate_id.encoding) - ("level", Data_encoding.int32) - let failed_to_inject_consensus_vote = declare_2 ~section diff --git a/src/proto_alpha/lib_delegate/baking_lib.ml b/src/proto_alpha/lib_delegate/baking_lib.ml index 47283cc4a62c..d76806e7e6d6 100644 --- a/src/proto_alpha/lib_delegate/baking_lib.ml +++ b/src/proto_alpha/lib_delegate/baking_lib.ml @@ -86,7 +86,7 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool ~constants delegates in - let*! () = + let* () = Baking_actions.only_if_dal_feature_enabled state ~default_value:() @@ -101,10 +101,13 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool in (* Ensures the DAL attestable slots cache is populated in time for the first block’s attestation. *) - Dal_attestable_slots_worker.update_streams_subscriptions - state.global_state.dal_attestable_slots_worker - dal_node_rpc_ctxt - ~delegate_ids) + let*! () = + Dal_attestable_slots_worker.update_streams_subscriptions + state.global_state.dal_attestable_slots_worker + dal_node_rpc_ctxt + ~delegate_ids + in + return_unit) in return state diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index d943c919259b..3f8d15818aef 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -694,26 +694,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain else None in let current_level = current_proposal.block.shell.level in - let dal_attestable_slots = - Option.fold - ~none:[] - ~some:(fun dal_node_rpc_ctxt -> - Node_rpc.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:current_level - (Delegate_infos.own_delegate_ids delegate_infos)) - dal_node_rpc_ctxt - in - let next_level_dal_attestable_slots = - Option.fold - ~none:[] - ~some:(fun dal_node_rpc_ctxt -> - Node_rpc.dal_attestable_slots - dal_node_rpc_ctxt - ~attestation_level:(Int32.succ current_level) - (Delegate_infos.own_delegate_ids next_level_delegate_infos)) - dal_node_rpc_ctxt - in let level_state = { current_level; @@ -726,8 +706,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain delegate_infos; next_level_delegate_infos; next_level_latest_forge_request = None; - dal_attestable_slots; - next_level_dal_attestable_slots; } in let* round_state = diff --git a/src/proto_alpha/lib_delegate/baking_state.ml b/src/proto_alpha/lib_delegate/baking_state.ml index f3ea633a5acf..be03052310e5 100644 --- a/src/proto_alpha/lib_delegate/baking_state.ml +++ b/src/proto_alpha/lib_delegate/baking_state.ml @@ -215,10 +215,9 @@ type prepared_block = { } (* The fields {current_level}, {delegate_infos}, {next_level_delegate_infos}, - {next_level_latest_forge_request}, {dal_attestable_slots}, - {next_level_dal_attestable_slots} are updated only when we receive a block at - a different level than {current_level}. Note that this means that there is - always a {latest_proposal}, which may be our own baked block. *) + {next_level_latest_forge_request} are updated only when we receive a block at a different + level than {current_level}. Note that this means that there is always a {latest_proposal}, + which may be our own baked block. *) type level_state = { current_level : int32; latest_proposal : proposal; @@ -233,8 +232,6 @@ type level_state = { delegate_infos : delegate_infos; next_level_delegate_infos : delegate_infos; next_level_latest_forge_request : Round.t option; - dal_attestable_slots : dal_attestable_slots; - next_level_dal_attestable_slots : dal_attestable_slots; } type phase = @@ -1228,8 +1225,6 @@ let pp_level_state fmt delegate_infos; next_level_delegate_infos; next_level_latest_forge_request; - dal_attestable_slots = _; - next_level_dal_attestable_slots = _; } = Format.fprintf fmt diff --git a/src/proto_alpha/lib_delegate/baking_state.mli b/src/proto_alpha/lib_delegate/baking_state.mli index 5cfc915a02a8..dd657b1a0758 100644 --- a/src/proto_alpha/lib_delegate/baking_state.mli +++ b/src/proto_alpha/lib_delegate/baking_state.mli @@ -283,11 +283,6 @@ type level_state = { next_level_latest_forge_request : Round.t option; (** Some if a forge request has been sent for the next level on the given round *) - dal_attestable_slots : dal_attestable_slots; - (** For each (own) delegate having a DAL slot at the current level, store - a promise to obtain the attestable slots for that level. *) - next_level_dal_attestable_slots : dal_attestable_slots; - (** and similarly for the next level *) } val pp_level_state : Format.formatter -> level_state -> unit diff --git a/src/proto_alpha/lib_delegate/baking_state_types.ml b/src/proto_alpha/lib_delegate/baking_state_types.ml index 3feb824ccc69..c6cce19dd51c 100644 --- a/src/proto_alpha/lib_delegate/baking_state_types.ml +++ b/src/proto_alpha/lib_delegate/baking_state_types.ml @@ -246,8 +246,3 @@ type delegate_info = { attestation_slot : Slot.t; attesting_power : int64; } - -type dal_attestable_slots = - (Delegate_id.t - * Tezos_dal_node_services.Types.attestable_slots tzresult Lwt.t) - list diff --git a/src/proto_alpha/lib_delegate/baking_state_types.mli b/src/proto_alpha/lib_delegate/baking_state_types.mli index 7ecc41bb7326..edf0b9e9350c 100644 --- a/src/proto_alpha/lib_delegate/baking_state_types.mli +++ b/src/proto_alpha/lib_delegate/baking_state_types.mli @@ -163,11 +163,3 @@ type delegate_info = { attestation_slot : Slot.t; attesting_power : int64; } - -(** An association list between delegates and promises for their DAL - attestations at some level (as obtained through the [get_attestable_slots] - RPC). See usage in {!level_state}. *) -type dal_attestable_slots = - (Delegate_id.t - * Tezos_dal_node_services.Types.attestable_slots tzresult Lwt.t) - list diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index 37ebf5c0a5d6..c19136dac688 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -281,7 +281,6 @@ let update_streams_subscriptions state dal_node_rpc_ctxt ~delegate_ids = in subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add -(* TODO: Use this functionality in the baker instead of the [dal_attestable_slots] static method. *) let get_dal_attestable_slots state ~delegate_id ~attestation_level = let open Lwt_syntax in match Level_map.find_opt state.cache attestation_level with diff --git a/src/proto_alpha/lib_delegate/node_rpc.ml b/src/proto_alpha/lib_delegate/node_rpc.ml index 93d5e8723340..a80f8733ab29 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.ml +++ b/src/proto_alpha/lib_delegate/node_rpc.ml @@ -488,23 +488,6 @@ let fetch_dal_config cctxt = | Error e -> return_error e | Ok dal_config -> return_ok dal_config -let get_attestable_slots dal_node_rpc_ctxt delegate_id ~attestation_level = - let pkh = Delegate_id.to_pkh delegate_id in - let attested_level = Int32.succ attestation_level in - warn_on_stalling_rpc ~rpc_name:"get_attestable_slots" - @@ Tezos_rpc.Context.make_call - Tezos_dal_node_services.Services.get_attestable_slots - dal_node_rpc_ctxt - (((), Tezos_crypto.Signature.Of_V2.public_key_hash pkh), attested_level) - () - () - -let dal_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) - ~attestation_level = - List.map (fun delegate_id -> - ( delegate_id, - get_attestable_slots dal_node_rpc_ctxt delegate_id ~attestation_level )) - let monitor_attestable_slots (dal_node_rpc_ctxt : Tezos_rpc.Context.generic) ~delegate_id = let open Lwt_syntax in diff --git a/src/proto_alpha/lib_delegate/node_rpc.mli b/src/proto_alpha/lib_delegate/node_rpc.mli index 742bdbea1514..3cfd8e497f66 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.mli +++ b/src/proto_alpha/lib_delegate/node_rpc.mli @@ -150,16 +150,6 @@ val forge_double_baking_evidence : bh2:block_header -> (bytes, Error_monad.tztrace) result Lwt.t -(** [dal_attestable_slots ctxt ~attestation_level delegate_ids] calls the DAL - node RPC GET /profiles//attested_levels//attestable_slots/ - for each of the delegates in [delegate_ids] and returns the corresponding - promises. *) -val dal_attestable_slots : - Tezos_rpc.Context.generic -> - attestation_level:int32 -> - Delegate_id.t list -> - dal_attestable_slots - (** [monitor_attestable_slots dal_node_rpc_ctxt ~delegate_id] opens a streamed RPC to the DAL node for the given [~delegate_id]. Each item emitted on the stream contains DAL attestable information for this delegate. *) diff --git a/src/proto_alpha/lib_delegate/state_transitions.ml b/src/proto_alpha/lib_delegate/state_transitions.ml index 150888f677c1..dc9dfed774f9 100644 --- a/src/proto_alpha/lib_delegate/state_transitions.ml +++ b/src/proto_alpha/lib_delegate/state_transitions.ml @@ -427,8 +427,7 @@ let rec handle_proposal ~is_proposal_applied state (new_proposal : proposal) = let* () = Events.(emit new_head_with_increasing_level ()) in let new_level = new_proposal.block.shell.level in let compute_new_state ~current_round ~delegate_infos - ~next_level_delegate_infos ~dal_attestable_slots - ~next_level_dal_attestable_slots = + ~next_level_delegate_infos = let round_state = { current_round; @@ -450,8 +449,6 @@ let rec handle_proposal ~is_proposal_applied state (new_proposal : proposal) = delegate_infos; next_level_delegate_infos; next_level_latest_forge_request = None; - dal_attestable_slots; - next_level_dal_attestable_slots; } in (* recursive call with the up-to-date state to handle the new -- GitLab From 2e0a4e97761017e6ff706c945d3d360ea222ba2d Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Fri, 14 Nov 2025 10:23:13 +0000 Subject: [PATCH 2/3] Tezt: Dal: Fix attestation event in baker registers profiles test --- tezt/tests/dal.ml | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index cec6835e6087..2508b935968b 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -4238,8 +4238,8 @@ let test_dal_node_gs_invalid_messages_exchange ?batching_time_interval _protocol * the baker does not crash when there's a DAL node specified, but it is not running * the baker register profiles when the DAL node restarts. *) -let test_baker_registers_profiles _protocol _parameters _cryptobox l1_node - client dal_node = +let test_baker_registers_profiles protocol _parameters _cryptobox l1_node client + dal_node = let delegates = List.to_seq Constant.all_secret_keys |> Seq.take 3 |> List.of_seq in @@ -4257,8 +4257,11 @@ let test_baker_registers_profiles _protocol _parameters _cryptobox l1_node Agnostic_baker.create ~dal_node_rpc_endpoint l1_node client ~delegates in let wait_for_attestation_event = - Agnostic_baker.wait_for baker "failed_to_get_attestations.v0" (fun _json -> - Some ()) + let attestation_event = + if Protocol.number protocol >= 025 then "no_attestable_slot_at_level.v0" + else "failed_to_get_attestations.v0" + in + Agnostic_baker.wait_for baker attestation_event (fun _json -> Some ()) in let* () = Agnostic_baker.run baker in let* () = wait_for_attestation_event in -- GitLab From 9b646302046f299801bc36957d5b6b02bd357cfb Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Fri, 14 Nov 2025 18:10:29 +0000 Subject: [PATCH 3/3] Tezt: Dal_node: Fix monitor route published levels Due to the eviction policy from the Dal attestable_slots worker, we cannot add too many levels. --- tezt/lib_tezos/dal_node.ml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tezt/lib_tezos/dal_node.ml b/tezt/lib_tezos/dal_node.ml index b0fab86cbef2..0b8adc800fa1 100644 --- a/tezt/lib_tezos/dal_node.ml +++ b/tezt/lib_tezos/dal_node.ml @@ -904,7 +904,9 @@ module Mockup_for_baker = struct let routes ~attestation_lag ~attesters ~attestable_slots = [ - monitor_route ~published_levels:20; + (* Mirrors the DAL node’s backfill window on subscribe, where (attestation_lag + 1) + published levels are included. *) + monitor_route ~published_levels:(attestation_lag + 1); (let path_pattern = Format.sprintf "/profiles/(tz[1234][a-zA-Z0-9]+)/attested_levels/([1-9][0-9]*)/attestable_slots" -- GitLab