From b7c4cb44ee1eba3e3d17c99e3127213c3eaeca29 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 10 Nov 2025 10:38:32 +0000 Subject: [PATCH 1/3] Tezt: Dal: Fix type referenced in documentation --- tezt/lib_tezos/dal_node.ml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tezt/lib_tezos/dal_node.ml b/tezt/lib_tezos/dal_node.ml index d86e9977e6f0..b0fab86cbef2 100644 --- a/tezt/lib_tezos/dal_node.ml +++ b/tezt/lib_tezos/dal_node.ml @@ -575,8 +575,7 @@ module Proxy = struct [~published_level]. To do this, we need to add the missing slot indices for each level in the [Backfill] element that is always the first one sent in the monitoring stream. Note: this function heavily depends on the types of elements that are in the returned RPC - stream, information which can be found at - {!Tezos_dal_node_services.Types.Attestable_slots_watcher_table.Attestable_event.t}. *) + stream, information which can be found at {!Tezos_dal_node_services.Types.Attestable_event.t}. *) let rewrite_attestable_stream_line ~number_of_slots ~published_level line = let open Ezjsonm in let make_slot_id slot_level slot_index = -- GitLab From 8878371351107fee07c16ff8d7d132cfc8e4da8b Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 12 Nov 2025 14:06:04 +0000 Subject: [PATCH 2/3] Dal_node: Increase backfill window to get finalized payload level --- src/lib_dal_node/attestable_slots.ml | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index 858beb1afbb8..c1f5dd6bbd35 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -185,8 +185,13 @@ let may_notify_not_in_committee ctxt committee ~attestation_level = slots can be published and attested in the near future; - `stop = L`, as this is the newest level where we did not have time to obtain the information about the published slots. + + We include [L+1] in backfill to cover possible races between updating the + last-finalized level and stream subscription. This keeps the + client's cache consistent even if the first slot was published before the stream + was fully established. - For each level in [start .. stop] (inclusively), we accumulate the attestation status + For each level in [start .. stop + 1] (inclusively), we accumulate the attestation status information about each slot id. *) let get_backfill_payload ctxt ~pkh = let open Lwt_result_syntax in @@ -197,9 +202,11 @@ let get_backfill_payload ctxt ~pkh = get_attestation_lag ctxt ~level:last_finalized_level in let published_levels = - let count = Int32.(to_int @@ min last_finalized_level attestation_lag) in + let count = + Int32.(to_int @@ min last_finalized_level attestation_lag) + 1 + in Stdlib.List.init count (fun i -> - Int32.(sub last_finalized_level (of_int i))) + Int32.(sub (succ last_finalized_level) (of_int i))) |> List.rev in List.fold_left_es -- GitLab From 6ba842adb77f20bc292e4e42ca86baa9a283a3c3 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 12 Nov 2025 14:31:02 +0000 Subject: [PATCH 3/3] Dal_node: Fix documentation indentation --- .../attestable_slots_watcher_table.ml | 18 +++++++++--------- .../attestable_slots_watcher_table.mli | 14 +++++++------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/lib_dal_node/attestable_slots_watcher_table.ml b/src/lib_dal_node/attestable_slots_watcher_table.ml index c995df8095ca..a4c78a9f5a94 100644 --- a/src/lib_dal_node/attestable_slots_watcher_table.ml +++ b/src/lib_dal_node/attestable_slots_watcher_table.ml @@ -9,12 +9,12 @@ module SlotIdSet = Aches.Vache.Set (Aches.Vache.LRU_Sloppy) (Aches.Vache.Strong) (Types.Slot_id) (** A watcher used to stream newly-attestable slots for a given delegate (pkh). - - [stream] is the push endpoint used by the DAL node to notify consumers - (RPC layer / baker) with [attestable_event] information. - - [num_subscribers] is the number of active consumers currently subscribed to - this pkh’s stream. - - [notified_slots] is an LRU set of slot ids already notified, so that we avoid - sending duplicates in the stream. *) + - [stream] is the push endpoint used by the DAL node to notify consumers + (RPC layer / baker) with [attestable_event] information. + - [num_subscribers] is the number of active consumers currently subscribed to + this pkh’s stream. + - [notified_slots] is an LRU set of slot ids already notified, so that we avoid + sending duplicates in the stream. *) type watcher = { stream : Types.Attestable_event.t Lwt_watcher.input; mutable num_subscribers : int; @@ -38,9 +38,9 @@ let get_or_init t pkh proto_params = watcher | None -> (* We only need to remember slot ids from their publication level until the - attestation window closes ([attestation_lag] levels later). There can be - at most [number_of_slots] distinct slot ids per level across that window. - We allow a small (2 levels) additional window for safety. *) + attestation window closes ([attestation_lag] levels later). There can be + at most [number_of_slots] distinct slot ids per level across that window. + We allow a small (2 levels) additional window for safety. *) let capacity = match proto_params with | Some Types.{number_of_slots; attestation_lag; _} -> diff --git a/src/lib_dal_node/attestable_slots_watcher_table.mli b/src/lib_dal_node/attestable_slots_watcher_table.mli index 9900f9a3711e..b7a7b8c1a446 100644 --- a/src/lib_dal_node/attestable_slots_watcher_table.mli +++ b/src/lib_dal_node/attestable_slots_watcher_table.mli @@ -19,30 +19,30 @@ val get_num_subscribers : watcher -> shard_index val set_num_subscribers : watcher -> shard_index -> unit (** Table where we lazily create a watcher on first subscription for a pkh. When the last - subscriber calls [shutdown], the watcher is removed from the table to avoid leaks. *) + subscriber calls [shutdown], the watcher is removed from the table to avoid leaks. *) type t = watcher Signature.Public_key_hash.Table.t (** [create t ~initial_size] creates an empty table of watchers with an initial - bucket size of [~initial_size] (which is an initial estimation). *) + bucket size of [~initial_size] (which is an initial estimation). *) val create : initial_size:int -> t (** [get_or_init t pkh proto_params] returns (creating if absent) the watcher entry for - [pkh] in [t].*) + [pkh] in [t].*) val get_or_init : t -> Signature.public_key_hash -> proto_parameters option -> watcher (** [notify_attestable_slot t pkh ~slot_id] pushes an [Attestable_slot] event for [~slot_id] - to the stream for [pkh], if present. *) + to the stream for [pkh], if present. *) val notify_attestable_slot : t -> Signature.public_key_hash -> slot_id:slot_id -> unit (** [notify_no_shards_assigned t pkh ~attestation_level] pushes a [No_shards_assigned] event for - [~attestation_level] to the stream for [pkh], if present. *) + [~attestation_level] to the stream for [pkh], if present. *) val notify_no_shards_assigned : t -> Signature.public_key_hash -> attestation_level:level -> unit (** [notify_slot_has_trap t pkh ~slot_id] pushed a [Slot_has_trap] event for [~slot_id] to the - stream for [pkh], if present. *) + stream for [pkh], if present. *) val notify_slot_has_trap : t -> Signature.public_key_hash -> slot_id:slot_id -> unit @@ -58,5 +58,5 @@ val notify_backfill_payload : val remove : t -> Signature.public_key_hash -> unit (** [elements t] returns the current set of pkhs that have an active monitoring - subscription in [t]. *) + subscription in [t]. *) val elements : t -> Signature.public_key_hash Seq.t -- GitLab