From 05087d65d70c53addeea21094f0e7d2f6a670f3b Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 13:29:07 +0000 Subject: [PATCH 1/3] Dal_node: Backfill case for elements in the attestable slots stream --- src/lib_dal_node_services/types.ml | 33 +++++++++++++++++++++++++++++ src/lib_dal_node_services/types.mli | 17 +++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index c377443acad1..89c11e0659a1 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -790,10 +790,27 @@ module SlotIdSet = module Attestable_slots_watcher_table = struct module Attestable_event = struct + type backfill_payload = { + slot_ids : slot_id list; + no_shards_attestation_levels : level list; + } + type t = | Attestable_slot of {slot_id : slot_id} | No_shards_assigned of {attestation_level : level} | Slot_has_trap of {slot_id : slot_id} + | Backfill of {backfill_payload : backfill_payload} + + let backfill_payload_encoding = + let open Data_encoding in + conv + (fun {slot_ids; no_shards_attestation_levels} -> + (slot_ids, no_shards_attestation_levels)) + (fun (slot_ids, no_shards_attestation_levels) -> + {slot_ids; no_shards_attestation_levels}) + (obj2 + (req "slot_ids" (list slot_id_encoding)) + (req "no_shards_attestation_levels" (list int32))) let encoding = let open Data_encoding in @@ -829,6 +846,16 @@ module Attestable_slots_watcher_table = struct (function | Slot_has_trap {slot_id} -> Some ((), slot_id) | _ -> None) (fun ((), slot_id) -> Slot_has_trap {slot_id}); + case + ~title:"backfill" + (Tag 3) + (obj2 + (req "kind" (constant "backfill")) + (req "backfill_payload" backfill_payload_encoding)) + (function + | Backfill {backfill_payload} -> Some ((), backfill_payload) + | _ -> None) + (fun ((), backfill_payload) -> Backfill {backfill_payload}); ] end @@ -905,6 +932,12 @@ module Attestable_slots_watcher_table = struct SlotIdSet.add watcher.notified_slots slot_id ; Lwt_watcher.notify watcher.stream (Slot_has_trap {slot_id})) + let notify_backfill_payload t pkh ~backfill_payload = + match Signature.Public_key_hash.Table.find t pkh with + | None -> () + | Some watcher -> + Lwt_watcher.notify watcher.stream (Backfill {backfill_payload}) + let remove = Signature.Public_key_hash.Table.remove let elements = Signature.Public_key_hash.Table.to_seq_keys diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index c2d8919df90d..e0f5dec1e198 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -413,6 +413,13 @@ end module Attestable_slots_watcher_table : sig module Attestable_event : sig + type backfill_payload = { + slot_ids : slot_id list; + (** All slots that should be marked attestable for this delegate *) + no_shards_attestation_levels : level list; + (** All attestation levels where this delegate has no shards *) + } + (** DAL attestability items emitted on a per-delegate stream. The delegate is implicit, as each stream is bound to a single delegate. *) type t = @@ -422,6 +429,8 @@ module Attestable_slots_watcher_table : sig (** the delegate has no assigned shards at [attestation_level] *) | Slot_has_trap of {slot_id : slot_id} (** the [slot_id] is a trap for the delegate *) + | Backfill of {backfill_payload : backfill_payload} + (** information about the delegate attestation status from the past *) val encoding : t Data_encoding.t end @@ -465,6 +474,14 @@ module Attestable_slots_watcher_table : sig val notify_slot_has_trap : t -> Signature.public_key_hash -> slot_id:slot_id -> unit + (** [notify_backfill_payload t pkh ~backfill_payload] pushes a [Backfill] event for + [~backfill_payload] to the stream for [pkh], if present. *) + val notify_backfill_payload : + t -> + Signature.public_key_hash -> + backfill_payload:Attestable_event.backfill_payload -> + unit + (** [remove t pkh] removes the watcher entry for [pkh] from [t] if present. *) val remove : t -> Signature.public_key_hash -> unit -- GitLab From e691f881d98fbbf631b06a7d566c5f56fe6b5afe Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 15:10:18 +0000 Subject: [PATCH 2/3] Dal_node: Move subscribe function for Attestable_slots after notify This is for ease of reviewing, as in the next commit(s) we will need the notify functionality in subscribe. --- src/lib_dal_node/attestable_slots.ml | 44 +++++++++++++-------------- src/lib_dal_node/attestable_slots.mli | 18 +++++------ 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index a2650c801f1d..f02799f5210e 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -39,28 +39,6 @@ let is_attestable_slot_with_traps shards_store traps_fraction pkh return_false) assigned_shard_indexes -let subscribe ctxt ~pkh = - let open Node_context in - let module T = Types.Attestable_slots_watcher_table in - let proto_params = - Result.to_option - @@ get_proto_parameters ctxt ~level:(`Level (get_last_finalized_level ctxt)) - in - let attestable_slots_watcher_table = - get_attestable_slots_watcher_table ctxt - in - let watcher = T.get_or_init attestable_slots_watcher_table pkh proto_params in - let stream, stopper = Lwt_watcher.create_stream (T.get_stream watcher) in - let next () = Lwt_stream.get stream in - let shutdown () = - (* stop this stream, then possibly remove the whole watcher if last subscriber *) - Lwt_watcher.shutdown stopper ; - T.set_num_subscribers watcher (T.get_num_subscribers watcher - 1) ; - if T.get_num_subscribers watcher <= 0 then - T.remove attestable_slots_watcher_table pkh - in - Resto_directory.Answer.{next; shutdown} - let published_just_before_migration ctxt ~published_level = let open Result_syntax in let open Node_context in @@ -188,3 +166,25 @@ let may_notify_not_in_committee ctxt committee ~attestation_level = pkh ~attestation_level) subscribers + +let subscribe ctxt ~pkh = + let open Node_context in + let module T = Types.Attestable_slots_watcher_table in + let proto_params = + Result.to_option + @@ get_proto_parameters ctxt ~level:(`Level (get_last_finalized_level ctxt)) + in + let attestable_slots_watcher_table = + get_attestable_slots_watcher_table ctxt + in + let watcher = T.get_or_init attestable_slots_watcher_table pkh proto_params in + let stream, stopper = Lwt_watcher.create_stream (T.get_stream watcher) in + let next () = Lwt_stream.get stream in + let shutdown () = + (* stop this stream, then possibly remove the whole watcher if last subscriber *) + Lwt_watcher.shutdown stopper ; + T.set_num_subscribers watcher (T.get_num_subscribers watcher - 1) ; + if T.get_num_subscribers watcher <= 0 then + T.remove attestable_slots_watcher_table pkh + in + Resto_directory.Answer.{next; shutdown} diff --git a/src/lib_dal_node/attestable_slots.mli b/src/lib_dal_node/attestable_slots.mli index 3e0f75caa0ad..4fe57c1ba282 100644 --- a/src/lib_dal_node/attestable_slots.mli +++ b/src/lib_dal_node/attestable_slots.mli @@ -20,15 +20,6 @@ val is_attestable_slot_with_traps : Types.slot_id -> (bool, [> Errors.not_found | Errors.other]) result Lwt.t -(** [subscribe ctxt ~pkh] opens a [Resto_directory.Answer] stream that yields - [Types.Attestable_slots_watcher_table.Attestable_event.t] values. The stream - only emits items produced after subscription. *) -val subscribe : - Node_context.t -> - pkh:Signature.public_key_hash -> - Types.Attestable_slots_watcher_table.Attestable_event.t - Resto_directory.Answer.stream - (** Let M = migration level (last block of the old protocol). This function attempts to determine whether [~published_level] is included in `[M - lag + 1 .. M]` (inclusively), where [lag] is the lag at [~published_level]. @@ -54,3 +45,12 @@ val may_notify_attestable_slot_or_trap : If so, it emits an event to that [pkh]'s stream. *) val may_notify_not_in_committee : Node_context.t -> Committee_cache.committee -> attestation_level:int32 -> unit + +(** [subscribe ctxt ~pkh] opens a [Resto_directory.Answer] stream that yields + [Types.Attestable_slots_watcher_table.Attestable_event.t] values. The stream + only emits items produced after subscription. *) +val subscribe : + Node_context.t -> + pkh:Signature.public_key_hash -> + Types.Attestable_slots_watcher_table.Attestable_event.t + Resto_directory.Answer.stream -- GitLab From 3fbd8a5b904f1097fc9493e5c1b58a0a549ffa7d Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 15:19:02 +0000 Subject: [PATCH 3/3] Dal_node: Plug backfill in subscribing mechanism --- src/lib_dal_node/RPC_server.ml | 3 +- src/lib_dal_node/attestable_slots.ml | 86 ++++++++++++++++++++++++++- src/lib_dal_node/attestable_slots.mli | 1 + src/lib_dal_node/event.ml | 11 ++++ 4 files changed, 99 insertions(+), 2 deletions(-) diff --git a/src/lib_dal_node/RPC_server.ml b/src/lib_dal_node/RPC_server.ml index 675c12b758be..d43ce3309b6d 100644 --- a/src/lib_dal_node/RPC_server.ml +++ b/src/lib_dal_node/RPC_server.ml @@ -540,7 +540,8 @@ module Profile_handlers = struct ~attested_level) let monitor_attestable_slots ctxt pkh () () = - let Resto_directory.Answer.{next; shutdown} = + let open Lwt_syntax in + let* Resto_directory.Answer.{next; shutdown} = Attestable_slots.subscribe ctxt ~pkh in Tezos_rpc.Answer.return_stream {next; shutdown} diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index f02799f5210e..fc3cbdbf5317 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -167,7 +167,80 @@ let may_notify_not_in_committee ctxt committee ~attestation_level = ~attestation_level) subscribers +(** [get_backfill_payload ctxt ~pkh] computes a compact “backfill” payload for + the freshly subscribed delegate [~pkh]. + + The payload can be used to pre-populate a structure that requires information from + the past of the creation of the stream corresponding to [~pkh] with: + - all attestable [slot_id]'s observed in a recent window; + - attestation levels where [pkh] has no assigned shards. + + The "backfill" window is calculated in the following way: + - Let [L] be the current last-finalized level at subscription time; + - `start = max (1, L - attestation_lag + 1)`, as this is the oldest level where + slots can be published and attested in the near future; + - `stop = L`, as this is the newest level where we did not have time to obtain + the information about the published slots. + + For each level in [start .. stop] (inclusively), we accumulate the attestation status + information about each slot id. *) +let get_backfill_payload ctxt ~pkh = + let open Lwt_result_syntax in + let open Node_context in + let module E = Types.Attestable_slots_watcher_table.Attestable_event in + let last_finalized_level = get_last_finalized_level ctxt in + let*? attestation_lag = + get_attestation_lag ctxt ~level:last_finalized_level + in + let published_levels = + let count = Int32.(to_int @@ min last_finalized_level attestation_lag) in + Stdlib.List.init count (fun i -> + Int32.(sub last_finalized_level (of_int i))) + |> List.rev + in + List.fold_left_es + (fun acc published_level -> + let attestation_level = + Int32.(pred (add published_level attestation_lag)) + in + let* committee = + Node_context.fetch_committees ctxt ~level:attestation_level + in + if is_not_in_committee committee ~pkh then + (* If not in committee, record and skip per-slot checks. *) + return + E. + { + acc with + no_shards_attestation_levels = + attestation_level :: acc.no_shards_attestation_levels; + } + else + let*? proto_params = + get_proto_parameters ctxt ~level:(`Level published_level) + in + (* Check each slot for attestability. *) + let number_of_slots = proto_params.number_of_slots in + let slot_indices = Stdlib.List.init number_of_slots Fun.id in + List.fold_left_es + (fun acc slot_index -> + let slot_id = + Types.Slot_id.{slot_level = published_level; slot_index} + in + let* is_attestable_slot_or_trap = + is_attestable_slot_or_trap ctxt ~pkh ~slot_id + in + match is_attestable_slot_or_trap with + | Some `Attestable_slot -> + return E.{acc with slot_ids = slot_id :: acc.slot_ids} + | Some `Trap | None -> return acc) + acc + slot_indices) + {slot_ids = []; no_shards_attestation_levels = []} + published_levels + let subscribe ctxt ~pkh = + let open Lwt_syntax in let open Node_context in let module T = Types.Attestable_slots_watcher_table in let proto_params = @@ -179,6 +252,17 @@ let subscribe ctxt ~pkh = in let watcher = T.get_or_init attestable_slots_watcher_table pkh proto_params in let stream, stopper = Lwt_watcher.create_stream (T.get_stream watcher) in + let* () = + let* backfill_payload = get_backfill_payload ctxt ~pkh in + match backfill_payload with + | Ok backfill_payload -> + T.notify_backfill_payload + attestable_slots_watcher_table + pkh + ~backfill_payload ; + return_unit + | Error error -> Event.emit_backfill_error ~error + in let next () = Lwt_stream.get stream in let shutdown () = (* stop this stream, then possibly remove the whole watcher if last subscriber *) @@ -187,4 +271,4 @@ let subscribe ctxt ~pkh = if T.get_num_subscribers watcher <= 0 then T.remove attestable_slots_watcher_table pkh in - Resto_directory.Answer.{next; shutdown} + return Resto_directory.Answer.{next; shutdown} diff --git a/src/lib_dal_node/attestable_slots.mli b/src/lib_dal_node/attestable_slots.mli index 4fe57c1ba282..12af2bb57db4 100644 --- a/src/lib_dal_node/attestable_slots.mli +++ b/src/lib_dal_node/attestable_slots.mli @@ -54,3 +54,4 @@ val subscribe : pkh:Signature.public_key_hash -> Types.Attestable_slots_watcher_table.Attestable_event.t Resto_directory.Answer.stream + Lwt.t diff --git a/src/lib_dal_node/event.ml b/src/lib_dal_node/event.ml index 3ed0128bf97a..69013d6daf2f 100644 --- a/src/lib_dal_node/event.ml +++ b/src/lib_dal_node/event.ml @@ -1263,6 +1263,15 @@ open struct migration level." ~level:Warning ("level", Data_encoding.int32) + + let backfill_error = + declare_1 + ~section + ~name:"backfill_error" + ~msg:"Backfill failed with error: {error}" + ~level:Error + ~pp1:Error_monad.pp_print_trace + ("error", Error_monad.trace_encoding) end (* DAL node event emission functions *) @@ -1623,3 +1632,5 @@ let emit_reception_of_shard_detailed ~level ~slot_index ~shard_index ~sender = emit reception_of_shard_detailed (level, slot_index, shard_index, sender) let emit_skip_attesting_shards ~level = emit skip_attesting_shards level + +let emit_backfill_error ~error = emit backfill_error error -- GitLab