From c0b4685f701932c30a906d3bcead80fabaa77d6d Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 22 Oct 2025 17:13:16 +0100 Subject: [PATCH 1/7] Dal_node: Add Not_in_committee case for elements in the attestable slots stream --- src/lib_dal_node/attestable_slots.ml | 31 ++++++++++++++-- src/lib_dal_node/attestable_slots.mli | 13 +++++-- src/lib_dal_node/block_handler.ml | 10 +++++- src/lib_dal_node_services/services.ml | 4 +-- src/lib_dal_node_services/services.mli | 5 +-- src/lib_dal_node_services/types.ml | 49 +++++++++++++++++++++++--- src/lib_dal_node_services/types.mli | 25 +++++++++++-- 7 files changed, 120 insertions(+), 17 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index 6fe71c51e3bc..c8c38fdafc51 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -133,7 +133,10 @@ let may_notify ctxt ~(slot_id : Types.slot_id) = let all_stored = number_stored_shards = number_of_assigned_shards in if not last_known_parameters.incentives_enable then ( if all_stored then - T.notify attestable_slots_watcher_table pkh ~slot_id ; + T.notify_attestable_slot + attestable_slots_watcher_table + pkh + ~slot_id ; return_unit) else if not all_stored then return_unit else @@ -147,8 +150,32 @@ let may_notify ctxt ~(slot_id : Types.slot_id) = |> Errors.to_option_tzresult in (match is_slot_attestable_with_traps with - | Some true -> T.notify attestable_slots_watcher_table pkh ~slot_id + | Some true -> + T.notify_attestable_slot + attestable_slots_watcher_table + pkh + ~slot_id | _ -> ()) ; return_unit in Seq.iter_ep notify_if_attestable subscribers + +let may_notify_not_in_committee ctxt committee ~attestation_level = + let module T = Types.Attestable_slots_watcher_table in + let attestable_slots_watcher_table = + Node_context.get_attestable_slots_watcher_table ctxt + in + let subscribers = T.elements attestable_slots_watcher_table in + Seq.iter + (fun pkh -> + let assigned_shard_indices = + match Signature.Public_key_hash.Map.find pkh committee with + | None -> [] + | Some (indexes, _) -> indexes + in + if List.is_empty assigned_shard_indices then + T.notify_no_shards_assigned + attestable_slots_watcher_table + pkh + ~attestation_level) + subscribers diff --git a/src/lib_dal_node/attestable_slots.mli b/src/lib_dal_node/attestable_slots.mli index fee2b7f6a783..bc59e0296d5e 100644 --- a/src/lib_dal_node/attestable_slots.mli +++ b/src/lib_dal_node/attestable_slots.mli @@ -21,12 +21,13 @@ val is_slot_attestable_with_traps : (bool, [> Errors.not_found | Errors.other]) result Lwt.t (** [subscribe ctxt ~pkh] opens a [Resto_directory.Answer] stream that yields - [Types.slot_id] values whenever a slot becomes attestable for [~pkh]. The stream + [Types.Attestable_slots_watcher_table.Attestable_event.t] values. The stream only emits items produced after subscription. *) val subscribe : Node_context.t -> pkh:Signature.public_key_hash -> - Types.slot_id Resto_directory.Answer.stream + Types.Attestable_slots_watcher_table.Attestable_event.t + Resto_directory.Answer.stream (** Let M = migration level (last block of the old protocol). This function attempts to determine whether [~published_level] is included in `[M - lag + 1 .. M]` @@ -44,5 +45,11 @@ val attested_just_after_migration : (** [may_notify ctxt ~slot_id] checks, for each subscribed [pkh], whether all shards assigned to [pkh] at the attestation level corresponding to [~slot_id] are available; - if so, it emits [~slot_id] to that [pkh]’s stream. *) + if so, it emits an event to that [pkh]’s stream. *) val may_notify : Node_context.t -> slot_id:Types.slot_id -> unit tzresult Lwt.t + +(** [may_notify_not_in_committee ctxt committee ~attestation_level] checks, for each + subscribed [pkh], whether the delegate is in the [committee] for [~attestation_level]. + If so, it emits an event to that [pkh]'s stream. *) +val may_notify_not_in_committee : + Node_context.t -> Committee_cache.committee -> attestation_level:int32 -> unit diff --git a/src/lib_dal_node/block_handler.ml b/src/lib_dal_node/block_handler.ml index d0e21aaba0b6..8c2c691cb784 100644 --- a/src/lib_dal_node/block_handler.ml +++ b/src/lib_dal_node/block_handler.ml @@ -632,9 +632,17 @@ let new_finalized_head ctxt cctxt l1_crawler cryptobox finalized_block_hash Int32.( pred @@ add level (of_int proto_parameters.Types.attestation_lag)) in - let* (_committee : (int trace * int) Signature.Public_key_hash.Map.t) = + let* (committee : (int trace * int) Signature.Public_key_hash.Map.t) = Node_context.fetch_committees ctxt ~level:attestation_level in + (* Note that, when the baker and DAL node are synchronized, then if + the baker is at level L, then in this function `block_level = L - 2`. + We therefore need the committee at `block_level + 2`. So, as long as + `attestation_lag > 2`, there should be no issue. *) + Attestable_slots.may_notify_not_in_committee + ctxt + committee + ~attestation_level ; return_unit in Gossipsub.Worker.Validate_message_hook.set_batch diff --git a/src/lib_dal_node_services/services.ml b/src/lib_dal_node_services/services.ml index 5d4628a13510..b66a4052c8a6 100644 --- a/src/lib_dal_node_services/services.ml +++ b/src/lib_dal_node_services/services.ml @@ -406,7 +406,7 @@ let get_attestable_slots : let monitor_attestable_slots : < meth : [`GET] ; input : unit - ; output : Types.slot_id + ; output : Types.Attestable_slots_watcher_table.Attestable_event.t ; prefix : unit ; params : unit * Signature.public_key_hash ; query : unit > @@ -420,7 +420,7 @@ let monitor_attestable_slots : detected as traps for the baker, the slot should not be attested, so \ the id is not sent via the stream." ~query:Tezos_rpc.Query.empty - ~output:Types.slot_id_encoding + ~output:Types.Attestable_slots_watcher_table.Attestable_event.encoding Tezos_rpc.Path.( open_root / "profiles" /: Signature.Public_key_hash.rpc_arg / "monitor" / "attestable_slots") diff --git a/src/lib_dal_node_services/services.mli b/src/lib_dal_node_services/services.mli index ce960fc2437a..46fe892d726a 100644 --- a/src/lib_dal_node_services/services.mli +++ b/src/lib_dal_node_services/services.mli @@ -223,11 +223,12 @@ val get_attestable_slots : (** Stream attestable slot_ids for the given public key hash [pkh]. A slot is attestable at level [L] if it was published at [L - attestation_lag] and *all* shards assigned at level [L] to [pkh] are available in the DAL - node's store. *) + node's store. Returns a "not in committee" message if that is the case + for [pkh] at level [L]. *) val monitor_attestable_slots : < meth : [`GET] ; input : unit - ; output : Types.slot_id + ; output : Types.Attestable_slots_watcher_table.Attestable_event.t ; prefix : unit ; params : unit * Signature.public_key_hash ; query : unit > diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index 7a7b00e5e41d..827db5c3305b 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -780,15 +780,48 @@ module SlotIdSet = end) module Attestable_slots_watcher_table = struct + module Attestable_event = struct + type t = + | Attestable_slot of {slot_id : slot_id} + | No_shards_assigned of {attestation_level : level} + + let encoding = + let open Data_encoding in + union + [ + case + ~title:"attestable_slot" + (Tag 0) + (obj2 + (req "kind" (constant "attestable_slot")) + (req "slot_id" slot_id_encoding)) + (function + | Attestable_slot {slot_id} -> Some ((), slot_id) | _ -> None) + (fun ((), slot_id) -> Attestable_slot {slot_id}); + case + ~title:"no_shards_assigned" + (Tag 1) + (obj2 + (req "kind" (constant "no_shards_assigned")) + (req "attestation_level" int32)) + (function + | No_shards_assigned {attestation_level} -> + Some ((), attestation_level) + | _ -> None) + (fun ((), attestation_level) -> + No_shards_assigned {attestation_level}); + ] + end + (** A watcher used to stream newly-attestable slots for a given delegate (pkh). - [stream] is the push endpoint used by the DAL node to notify consumers - (RPC layer / baker) that a specific [slot_id] has become attestable. + (RPC layer / baker) with [attestable_event] information. - [num_subscribers] is the number of active consumers currently subscribed to this pkh’s stream. - [notified_slots] is an LRU set of slot ids already notified, so that we avoid sending duplicates in the stream. *) type watcher = { - stream : slot_id Lwt_watcher.input; + stream : Attestable_event.t Lwt_watcher.input; mutable num_subscribers : int; notified_slots : SlotIdSet.t; } @@ -829,13 +862,21 @@ module Attestable_slots_watcher_table = struct Signature.Public_key_hash.Table.add t pkh watcher ; watcher - let notify t pkh ~slot_id = + let notify_attestable_slot t pkh ~slot_id = match Signature.Public_key_hash.Table.find t pkh with | None -> () | Some watcher -> if not @@ SlotIdSet.mem watcher.notified_slots slot_id then ( SlotIdSet.add watcher.notified_slots slot_id ; - Lwt_watcher.notify watcher.stream slot_id) + Lwt_watcher.notify watcher.stream (Attestable_slot {slot_id})) + + let notify_no_shards_assigned t pkh ~attestation_level = + match Signature.Public_key_hash.Table.find t pkh with + | None -> () + | Some watcher -> + Lwt_watcher.notify + watcher.stream + (No_shards_assigned {attestation_level}) let remove = Signature.Public_key_hash.Table.remove diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index cf89aec1294f..262e20632984 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -411,10 +411,22 @@ module Health : sig end module Attestable_slots_watcher_table : sig + module Attestable_event : sig + (** DAL attestability items emitted on a per-delegate stream. + The delegate is implicit, as each stream is bound to a single delegate. *) + type t = + | Attestable_slot of {slot_id : slot_id} + (** the [slot_id] is now attestable for the delegate *) + | No_shards_assigned of {attestation_level : level} + (** the delegate has no assigned shards at [attestation_level] *) + + val encoding : t Data_encoding.t + end + type watcher (** Returns the stream currently stored in the watcher. *) - val get_stream : watcher -> slot_id Lwt_watcher.input + val get_stream : watcher -> Attestable_event.t Lwt_watcher.input (** Returns the number of subscribers to the stream currently stored in the watcher. *) val get_num_subscribers : watcher -> shard_index @@ -435,8 +447,15 @@ module Attestable_slots_watcher_table : sig val get_or_init : t -> Signature.public_key_hash -> proto_parameters option -> watcher - (** [notify t pkh ~slot_id] pushes [slot_id] to the stream for [pkh] if present. *) - val notify : t -> Signature.public_key_hash -> slot_id:slot_id -> unit + (** [notify_attestable_slot t pkh ~slot_id] pushes an [Attestable_slot] event for [~slot_id] + to the stream for [pkh], if present. *) + val notify_attestable_slot : + t -> Signature.public_key_hash -> slot_id:slot_id -> unit + + (** [notify_no_shards_assigned t pkh ~attestation_level] pushes a [No_shards_assigned] event for + [~attestation_level] to the stream for [pkh], if present. *) + val notify_no_shards_assigned : + t -> Signature.public_key_hash -> attestation_level:level -> unit (** [remove t pkh] removes the watcher entry for [pkh] from [t] if present. *) val remove : t -> Signature.public_key_hash -> unit -- GitLab From 7bf4a7166efeae55674576539db15a1fe1c110fa Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 14:53:21 +0000 Subject: [PATCH 2/7] Dal_node: Attestable_slots: Refactor is_attestable_slot function --- src/lib_dal_node/attestable_slots.ml | 122 +++++++++++++-------------- 1 file changed, 60 insertions(+), 62 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index c8c38fdafc51..f7c4b4bc1f7e 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -89,76 +89,74 @@ let attested_just_after_migration ctxt ~attested_level = && migration_level < attested_level && attested_level <= Int32.add migration_level new_lag) -let may_notify ctxt ~(slot_id : Types.slot_id) = +(** [is_attestable_slot ctxt ~pkh ~slot_id] decides whether [~slot_id] is + attestable for delegate [pkh] at the time of calling. *) +let is_attestable_slot ctxt ~pkh ~(slot_id : Types.slot_id) = let open Lwt_result_syntax in let open Node_context in + let published_level = slot_id.slot_level in + let*? lag = get_attestation_lag ctxt ~level:published_level in + let attested_level = Int32.(add published_level lag) in + let attestation_level = Int32.pred attested_level in + let*? should_drop = published_just_before_migration ctxt ~published_level in + if should_drop then return_false + else + let*? last_known_parameters = + get_proto_parameters ctxt ~level:(`Level attested_level) + in + let shards_store = Store.shards (get_store ctxt) in + (* For retrieving the assigned shard indexes, we consider the committee + at [attestation_level], because the (DAL) attestations in the blocks + at level [attested_level] refer to the predecessor level. *) + let* shard_indices = + fetch_assigned_shard_indices ctxt ~pkh ~level:attestation_level + in + let number_of_assigned_shards = List.length shard_indices in + if number_of_assigned_shards = 0 || published_level < 1l then return_false + else + let* number_stored_shards = + Store.Shards.number_of_shards_available + shards_store + slot_id + shard_indices + in + let all_stored = number_stored_shards = number_of_assigned_shards in + if not last_known_parameters.incentives_enable then return all_stored + else if not all_stored then return_false + else + let* is_slot_attestable_with_traps = + is_slot_attestable_with_traps + shards_store + last_known_parameters.traps_fraction + pkh + shard_indices + slot_id + |> Errors.to_option_tzresult + in + match is_slot_attestable_with_traps with + | Some true -> return_true + | _ -> return_false + +let may_notify ctxt ~(slot_id : Types.slot_id) = + let open Lwt_result_syntax in let module T = Types.Attestable_slots_watcher_table in let attestable_slots_watcher_table = - get_attestable_slots_watcher_table ctxt + Node_context.get_attestable_slots_watcher_table ctxt in let subscribers = T.elements attestable_slots_watcher_table in if Seq.is_empty subscribers then return_unit else - let published_level = slot_id.slot_level in - let*? lag = get_attestation_lag ctxt ~level:published_level in - let attested_level = Int32.(add published_level lag) in - let attestation_level = Int32.pred attested_level in - let*? should_drop = published_just_before_migration ctxt ~published_level in - if should_drop then return_unit - else - let*? last_known_parameters = - get_proto_parameters ctxt ~level:(`Level attested_level) - in - let shards_store = Store.shards (get_store ctxt) in - (* For each subscribed pkh, if it has assigned shards for that level, - check if all those shards are available for [slot_id] and notify watcher, - accordingly. *) - let notify_if_attestable pkh = - (* For retrieving the assigned shard indexes, we consider the committee - at [attestation_level], because the (DAL) attestations in the blocks - at level [attested_level] refer to the predecessor level. *) - let* shard_indices = - fetch_assigned_shard_indices ctxt ~pkh ~level:attestation_level - in - let number_of_assigned_shards = List.length shard_indices in - if number_of_assigned_shards = 0 then return_unit - else if published_level < 1l then return_unit - else - let* number_stored_shards = - Store.Shards.number_of_shards_available - shards_store - slot_id - shard_indices - in - let all_stored = number_stored_shards = number_of_assigned_shards in - if not last_known_parameters.incentives_enable then ( - if all_stored then - T.notify_attestable_slot - attestable_slots_watcher_table - pkh - ~slot_id ; - return_unit) - else if not all_stored then return_unit - else - let* is_slot_attestable_with_traps = - is_slot_attestable_with_traps - shards_store - last_known_parameters.traps_fraction - pkh - shard_indices - slot_id - |> Errors.to_option_tzresult - in - (match is_slot_attestable_with_traps with - | Some true -> - T.notify_attestable_slot - attestable_slots_watcher_table - pkh - ~slot_id - | _ -> ()) ; - return_unit - in - Seq.iter_ep notify_if_attestable subscribers + let notify_attestable_slot pkh = + let* is_attestable_slot = is_attestable_slot ctxt ~pkh ~slot_id in + if is_attestable_slot then + T.notify_attestable_slot attestable_slots_watcher_table pkh ~slot_id + else () ; + return_unit + in + (* For each subscribed pkh, if it has assigned shards for that level, + check if all those shards are available for [slot_id] and notify watcher, + accordingly. *) + Seq.iter_ep notify_attestable_slot subscribers let may_notify_not_in_committee ctxt committee ~attestation_level = let module T = Types.Attestable_slots_watcher_table in -- GitLab From c054886bfc890bc396e8ccb34de07af694fe050c Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 14:57:06 +0000 Subject: [PATCH 3/7] Dal_node: Attestable_slots: Refactor is_not_in_committee function --- src/lib_dal_node/attestable_slots.ml | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index f7c4b4bc1f7e..9959e34466ef 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -158,6 +158,14 @@ let may_notify ctxt ~(slot_id : Types.slot_id) = accordingly. *) Seq.iter_ep notify_attestable_slot subscribers +let is_not_in_committee committee ~pkh = + let assigned_shard_indices = + match Signature.Public_key_hash.Map.find pkh committee with + | None -> [] + | Some (indexes, _) -> indexes + in + List.is_empty assigned_shard_indices + let may_notify_not_in_committee ctxt committee ~attestation_level = let module T = Types.Attestable_slots_watcher_table in let attestable_slots_watcher_table = @@ -166,12 +174,7 @@ let may_notify_not_in_committee ctxt committee ~attestation_level = let subscribers = T.elements attestable_slots_watcher_table in Seq.iter (fun pkh -> - let assigned_shard_indices = - match Signature.Public_key_hash.Map.find pkh committee with - | None -> [] - | Some (indexes, _) -> indexes - in - if List.is_empty assigned_shard_indices then + if is_not_in_committee committee ~pkh then T.notify_no_shards_assigned attestable_slots_watcher_table pkh -- GitLab From ca0758aa33fae0ee292a38f73cc38a948a8f51a3 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 14:58:35 +0000 Subject: [PATCH 4/7] Dal_node: Change is_slot_attestable_with_traps function name --- src/lib_dal_node/RPC_server.ml | 2 +- src/lib_dal_node/attestable_slots.ml | 10 +++++----- src/lib_dal_node/attestable_slots.mli | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/lib_dal_node/RPC_server.ml b/src/lib_dal_node/RPC_server.ml index e4e9e0f8300e..e50e5e1adf92 100644 --- a/src/lib_dal_node/RPC_server.ml +++ b/src/lib_dal_node/RPC_server.ml @@ -466,7 +466,7 @@ module Profile_handlers = struct return all_stored else if not all_stored then return false else - Attestable_slots.is_slot_attestable_with_traps + Attestable_slots.is_attestable_slot_with_traps shards_store last_known_parameters.traps_fraction pkh diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index 9959e34466ef..c33ec9bd7a7f 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -5,7 +5,7 @@ (* *) (*****************************************************************************) -let is_slot_attestable_with_traps shards_store traps_fraction pkh +let is_attestable_slot_with_traps shards_store traps_fraction pkh assigned_shard_indexes slot_id = let open Lwt_result_syntax in List.for_all_es @@ -14,7 +14,7 @@ let is_slot_attestable_with_traps shards_store traps_fraction pkh Store.Shards.read shards_store slot_id shard_index in (* Note: here [pkh] should identify the baker using its delegate key - (not the consensus key) *) + (not the consensus key) *) let trap_res = Trap.share_is_trap pkh share ~traps_fraction in match trap_res with | Ok true -> @@ -124,8 +124,8 @@ let is_attestable_slot ctxt ~pkh ~(slot_id : Types.slot_id) = if not last_known_parameters.incentives_enable then return all_stored else if not all_stored then return_false else - let* is_slot_attestable_with_traps = - is_slot_attestable_with_traps + let* is_attestable_slot_with_traps = + is_attestable_slot_with_traps shards_store last_known_parameters.traps_fraction pkh @@ -133,7 +133,7 @@ let is_attestable_slot ctxt ~pkh ~(slot_id : Types.slot_id) = slot_id |> Errors.to_option_tzresult in - match is_slot_attestable_with_traps with + match is_attestable_slot_with_traps with | Some true -> return_true | _ -> return_false diff --git a/src/lib_dal_node/attestable_slots.mli b/src/lib_dal_node/attestable_slots.mli index bc59e0296d5e..b1dd9ddb0a61 100644 --- a/src/lib_dal_node/attestable_slots.mli +++ b/src/lib_dal_node/attestable_slots.mli @@ -5,14 +5,14 @@ (* *) (*****************************************************************************) -(** [is_slot_attestable_with_traps shards_store traps_fraction pkh +(** [is_attestable_slot_with_traps shards_store traps_fraction pkh assigned_shard_indexes slot_id] checks whether the slot identified by [slot_id] is attestable for delegate [pkh] with respect to the traps mechanism. The function iterates over the delegate’s [assigned_shard_indexes], reads each corresponding stored shard share from [shards_store], and evaluates [Trap.share_is_trap] on it using [traps_fraction]. *) -val is_slot_attestable_with_traps : +val is_attestable_slot_with_traps : Store.Shards.t -> Q.t -> Signature.public_key_hash -> -- GitLab From 18a035a78a9ce36f0cb02804539f63b5ec1f900f Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 15:03:58 +0000 Subject: [PATCH 5/7] Dal_node: Add Slot_has_trap case for elements in the attestable slots stream --- src/lib_dal_node/attestable_slots.ml | 38 +++++++++++++++++----------- src/lib_dal_node_services/types.ml | 18 +++++++++++++ src/lib_dal_node_services/types.mli | 7 +++++ 3 files changed, 48 insertions(+), 15 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index c33ec9bd7a7f..3a3bf32c8208 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -89,9 +89,10 @@ let attested_just_after_migration ctxt ~attested_level = && migration_level < attested_level && attested_level <= Int32.add migration_level new_lag) -(** [is_attestable_slot ctxt ~pkh ~slot_id] decides whether [~slot_id] is - attestable for delegate [pkh] at the time of calling. *) -let is_attestable_slot ctxt ~pkh ~(slot_id : Types.slot_id) = +(** [is_attestable_slot_or_trap ctxt ~pkh ~slot_id] decides whether [~slot_id] is + attestable for delegate [pkh] at the time of calling, and when DAL incentives + are enabled, whether it should be considered as a trap. *) +let is_attestable_slot_or_trap ctxt ~pkh ~(slot_id : Types.slot_id) = let open Lwt_result_syntax in let open Node_context in let published_level = slot_id.slot_level in @@ -99,7 +100,7 @@ let is_attestable_slot ctxt ~pkh ~(slot_id : Types.slot_id) = let attested_level = Int32.(add published_level lag) in let attestation_level = Int32.pred attested_level in let*? should_drop = published_just_before_migration ctxt ~published_level in - if should_drop then return_false + if should_drop then return_none else let*? last_known_parameters = get_proto_parameters ctxt ~level:(`Level attested_level) @@ -112,7 +113,7 @@ let is_attestable_slot ctxt ~pkh ~(slot_id : Types.slot_id) = fetch_assigned_shard_indices ctxt ~pkh ~level:attestation_level in let number_of_assigned_shards = List.length shard_indices in - if number_of_assigned_shards = 0 || published_level < 1l then return_false + if number_of_assigned_shards = 0 || published_level < 1l then return_none else let* number_stored_shards = Store.Shards.number_of_shards_available @@ -121,8 +122,9 @@ let is_attestable_slot ctxt ~pkh ~(slot_id : Types.slot_id) = shard_indices in let all_stored = number_stored_shards = number_of_assigned_shards in - if not last_known_parameters.incentives_enable then return all_stored - else if not all_stored then return_false + if not last_known_parameters.incentives_enable then + if all_stored then return_some `Attestable_slot else return_none + else if not all_stored then return_none else let* is_attestable_slot_with_traps = is_attestable_slot_with_traps @@ -134,8 +136,9 @@ let is_attestable_slot ctxt ~pkh ~(slot_id : Types.slot_id) = |> Errors.to_option_tzresult in match is_attestable_slot_with_traps with - | Some true -> return_true - | _ -> return_false + | Some true -> return_some `Attestable_slot + | Some false -> return_some `Trap + | None -> return_none let may_notify ctxt ~(slot_id : Types.slot_id) = let open Lwt_result_syntax in @@ -146,17 +149,22 @@ let may_notify ctxt ~(slot_id : Types.slot_id) = let subscribers = T.elements attestable_slots_watcher_table in if Seq.is_empty subscribers then return_unit else - let notify_attestable_slot pkh = - let* is_attestable_slot = is_attestable_slot ctxt ~pkh ~slot_id in - if is_attestable_slot then - T.notify_attestable_slot attestable_slots_watcher_table pkh ~slot_id - else () ; + let notify_slot_attestable_or_trap pkh = + let* is_attestable_slot_or_trap = + is_attestable_slot_or_trap ctxt ~pkh ~slot_id + in + (match is_attestable_slot_or_trap with + | Some `Attestable_slot -> + T.notify_attestable_slot attestable_slots_watcher_table pkh ~slot_id + | Some `Trap -> + T.notify_slot_has_trap attestable_slots_watcher_table pkh ~slot_id + | None -> ()) ; return_unit in (* For each subscribed pkh, if it has assigned shards for that level, check if all those shards are available for [slot_id] and notify watcher, accordingly. *) - Seq.iter_ep notify_attestable_slot subscribers + Seq.iter_ep notify_slot_attestable_or_trap subscribers let is_not_in_committee committee ~pkh = let assigned_shard_indices = diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index 827db5c3305b..04837ed544c9 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -784,6 +784,7 @@ module Attestable_slots_watcher_table = struct type t = | Attestable_slot of {slot_id : slot_id} | No_shards_assigned of {attestation_level : level} + | Slot_has_trap of {slot_id : slot_id} let encoding = let open Data_encoding in @@ -810,6 +811,15 @@ module Attestable_slots_watcher_table = struct | _ -> None) (fun ((), attestation_level) -> No_shards_assigned {attestation_level}); + case + ~title:"slot_has_trap" + (Tag 2) + (obj2 + (req "kind" (constant "slot_has_trap")) + (req "slot_id" slot_id_encoding)) + (function + | Slot_has_trap {slot_id} -> Some ((), slot_id) | _ -> None) + (fun ((), slot_id) -> Slot_has_trap {slot_id}); ] end @@ -878,6 +888,14 @@ module Attestable_slots_watcher_table = struct watcher.stream (No_shards_assigned {attestation_level}) + let notify_slot_has_trap t pkh ~slot_id = + match Signature.Public_key_hash.Table.find t pkh with + | None -> () + | Some watcher -> + if not @@ SlotIdSet.mem watcher.notified_slots slot_id then ( + SlotIdSet.add watcher.notified_slots slot_id ; + Lwt_watcher.notify watcher.stream (Slot_has_trap {slot_id})) + let remove = Signature.Public_key_hash.Table.remove let elements = Signature.Public_key_hash.Table.to_seq_keys diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index 262e20632984..3990fe7119f0 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -419,6 +419,8 @@ module Attestable_slots_watcher_table : sig (** the [slot_id] is now attestable for the delegate *) | No_shards_assigned of {attestation_level : level} (** the delegate has no assigned shards at [attestation_level] *) + | Slot_has_trap of {slot_id : slot_id} + (** the [slot_id] is a trap for the delegate *) val encoding : t Data_encoding.t end @@ -457,6 +459,11 @@ module Attestable_slots_watcher_table : sig val notify_no_shards_assigned : t -> Signature.public_key_hash -> attestation_level:level -> unit + (** [notify_slot_has_trap t pkh ~slot_id] pushed a [Slot_has_trap] event for [~slot_id] to the + stream for [pkh], if present. *) + val notify_slot_has_trap : + t -> Signature.public_key_hash -> slot_id:slot_id -> unit + (** [remove t pkh] removes the watcher entry for [pkh] from [t] if present. *) val remove : t -> Signature.public_key_hash -> unit -- GitLab From 7707b91a37b2c66f8b42424e60e4981358ec71c0 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 27 Oct 2025 15:05:13 +0000 Subject: [PATCH 6/7] Dal_node: Attestable_slots: Rename may_notify function to be more explicit --- src/lib_dal_node/amplificator.ml | 6 +++++- src/lib_dal_node/attestable_slots.ml | 2 +- src/lib_dal_node/attestable_slots.mli | 9 +++++---- src/lib_dal_node/daemon.ml | 4 +++- src/lib_dal_node/slot_manager.ml | 4 +++- 5 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index 59853a96ffb2..9847cc5914be 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -405,7 +405,11 @@ let reply_receiver_job {process; query_store; _} node_context = Store.Shards.write_all (Store.shards node_store) slot_id shards |> Errors.to_tzresult in - let* () = Attestable_slots.may_notify node_context ~slot_id in + let* () = + Attestable_slots.may_notify_attestable_slot_or_trap + node_context + ~slot_id + in let level_committee ~level = let* res = Node_context.fetch_committees node_context ~level in return (Signature.Public_key_hash.Map.map fst res) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index 3a3bf32c8208..a65569aec2c4 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -140,7 +140,7 @@ let is_attestable_slot_or_trap ctxt ~pkh ~(slot_id : Types.slot_id) = | Some false -> return_some `Trap | None -> return_none -let may_notify ctxt ~(slot_id : Types.slot_id) = +let may_notify_attestable_slot_or_trap ctxt ~(slot_id : Types.slot_id) = let open Lwt_result_syntax in let module T = Types.Attestable_slots_watcher_table in let attestable_slots_watcher_table = diff --git a/src/lib_dal_node/attestable_slots.mli b/src/lib_dal_node/attestable_slots.mli index b1dd9ddb0a61..3e0f75caa0ad 100644 --- a/src/lib_dal_node/attestable_slots.mli +++ b/src/lib_dal_node/attestable_slots.mli @@ -43,10 +43,11 @@ val published_just_before_migration : val attested_just_after_migration : Node_context.t -> attested_level:int32 -> bool tzresult -(** [may_notify ctxt ~slot_id] checks, for each subscribed [pkh], whether all shards - assigned to [pkh] at the attestation level corresponding to [~slot_id] are available; - if so, it emits an event to that [pkh]’s stream. *) -val may_notify : Node_context.t -> slot_id:Types.slot_id -> unit tzresult Lwt.t +(** [may_notify_attestable_slot_or_trap ctxt ~slot_id] checks, for each subscribed [pkh], + whether all shards assigned to [pkh] at the attestation level corresponding to [~slot_id] + are available; if so, it emits an event to that [pkh]’s stream. *) +val may_notify_attestable_slot_or_trap : + Node_context.t -> slot_id:Types.slot_id -> unit tzresult Lwt.t (** [may_notify_not_in_committee ctxt committee ~attestation_level] checks, for each subscribed [pkh], whether the delegate is in the [committee] for [~attestation_level]. diff --git a/src/lib_dal_node/daemon.ml b/src/lib_dal_node/daemon.ml index 61190e375f27..3f051ce87b4e 100644 --- a/src/lib_dal_node/daemon.ml +++ b/src/lib_dal_node/daemon.ml @@ -163,7 +163,9 @@ let connect_gossipsub_with_p2p proto_parameters gs_worker transport_layer [@profiler.aggregate_s {verbosity = Notice; profiler_module = Profiler} "save_and_notify"] in - let* () = Attestable_slots.may_notify node_ctxt ~slot_id in + let* () = + Attestable_slots.may_notify_attestable_slot_or_trap node_ctxt ~slot_id + in (* Introduce a new store read at each received shard. Not sure it can be a problem, though *) let* number_of_already_stored_shards = diff --git a/src/lib_dal_node/slot_manager.ml b/src/lib_dal_node/slot_manager.ml index 6661c29cc5c8..5f4029bbddc8 100644 --- a/src/lib_dal_node/slot_manager.ml +++ b/src/lib_dal_node/slot_manager.ml @@ -737,7 +737,9 @@ let publish_slot_data ctxt ~level_committee ~slot_size gs_worker Store.Shards.write_all (Store.shards node_store) slot_id shards |> Errors.to_tzresult in - let* () = Attestable_slots.may_notify ctxt ~slot_id in + let* () = + Attestable_slots.may_notify_attestable_slot_or_trap ctxt ~slot_id + in let* () = Store.Slots.add_slot ~slot_size (Store.slots node_store) slot slot_id |> Errors.to_tzresult -- GitLab From 898299bae8ded25398cc65474f07e0206a678624 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 29 Oct 2025 10:39:26 +0000 Subject: [PATCH 7/7] Dal_node: Simplify if-else logic --- src/lib_dal_node/attestable_slots.ml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index a65569aec2c4..a2650c801f1d 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -122,9 +122,9 @@ let is_attestable_slot_or_trap ctxt ~pkh ~(slot_id : Types.slot_id) = shard_indices in let all_stored = number_stored_shards = number_of_assigned_shards in - if not last_known_parameters.incentives_enable then - if all_stored then return_some `Attestable_slot else return_none - else if not all_stored then return_none + if not all_stored then return_none + else if not last_known_parameters.incentives_enable then + return_some `Attestable_slot else let* is_attestable_slot_with_traps = is_attestable_slot_with_traps -- GitLab