From 248c1143406d870b59cf6bfe4873023d17aa57cd Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 15 Oct 2025 16:26:49 +0100 Subject: [PATCH 1/9] Dal_node: Services: Add monitor_attestable_slots RPC --- src/lib_dal_node_services/services.ml | 22 ++++++++++++++++++++++ src/lib_dal_node_services/services.mli | 13 +++++++++++++ 2 files changed, 35 insertions(+) diff --git a/src/lib_dal_node_services/services.ml b/src/lib_dal_node_services/services.ml index 38c7f76bc84d..2025f3bcb086 100644 --- a/src/lib_dal_node_services/services.ml +++ b/src/lib_dal_node_services/services.ml @@ -399,6 +399,28 @@ let get_attestable_slots : open_root / "profiles" /: Signature.Public_key_hash.rpc_arg / "attested_levels" /: Tezos_rpc.Arg.int32 / "attestable_slots") +let monitor_attestable_slots : + < meth : [`GET] + ; input : unit + ; output : Types.slot_id + ; prefix : unit + ; params : unit * Signature.public_key_hash + ; query : unit > + service = + Tezos_rpc.Service.get_service + ~description: + "Stream attestable slot ids for a given public key hash [pkh]. A slot is \ + attestable for attested level L if it was published at (L - \ + attestation_lag) and *all* shards assigned at level L to [pkh] are \ + available in the DAL node's store. If some shards of the slot are \ + detected as traps for the baker, the slot should not be attested, so \ + the id is not sent via the stream." + ~query:Tezos_rpc.Query.empty + ~output:Types.slot_id_encoding + Tezos_rpc.Path.( + open_root / "profiles" /: Signature.Public_key_hash.rpc_arg / "monitor" + / "attestable_slots") + let get_traps : < meth : [`GET] ; input : unit diff --git a/src/lib_dal_node_services/services.mli b/src/lib_dal_node_services/services.mli index d056be7bd99f..4cd913c2fc26 100644 --- a/src/lib_dal_node_services/services.mli +++ b/src/lib_dal_node_services/services.mli @@ -214,6 +214,19 @@ val get_attestable_slots : ; query : unit > service +(** Stream attestable slot_ids for the given public key hash [pkh]. + A slot is attestable at level [L] if it was published at [L - attestation_lag] + and *all* shards assigned at level [L] to [pkh] are available in the DAL + node's store. *) +val monitor_attestable_slots : + < meth : [`GET] + ; input : unit + ; output : Types.slot_id + ; prefix : unit + ; params : unit * Signature.public_key_hash + ; query : unit > + service + (** For a given published level, return all the traps known by the node. *) val get_traps : < meth : [`GET] -- GitLab From 6ca12f517ed985014ad50d68b81b2708426d64e7 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 15 Oct 2025 18:18:51 +0100 Subject: [PATCH 2/9] Dal_node: Node_context: Move traps check function from RPC_server This will be required in the next commits where we will use this functionality for the monitoring RPC backend logic. --- src/lib_dal_node/RPC_server.ml | 36 +------------------------------ src/lib_dal_node/node_context.ml | 36 +++++++++++++++++++++++++++++++ src/lib_dal_node/node_context.mli | 17 +++++++++++++++ 3 files changed, 54 insertions(+), 35 deletions(-) diff --git a/src/lib_dal_node/RPC_server.ml b/src/lib_dal_node/RPC_server.ml index bb0c1ac4b112..29faf55cb6a9 100644 --- a/src/lib_dal_node/RPC_server.ml +++ b/src/lib_dal_node/RPC_server.ml @@ -418,40 +418,6 @@ module Profile_handlers = struct ~current_baker_level else Lwt.return_unit - let is_slot_attestable_with_traps shards_store traps_fraction pkh - assigned_shard_indexes slot_id = - let open Lwt_result_syntax in - List.for_all_es - (fun shard_index -> - let* {index = _; share} = - Store.Shards.read shards_store slot_id shard_index - in - (* Note: here [pkh] should identify the baker using its delegate key - (not the consensus key) *) - let trap_res = Trap.share_is_trap pkh share ~traps_fraction in - match trap_res with - | Ok true -> - let*! () = - Event.emit_cannot_attest_slot_because_of_trap - ~pkh - ~published_level:slot_id.slot_level - ~slot_index:slot_id.slot_index - ~shard_index - in - return_false - | Ok false -> return_true - | Error _ -> - (* assume the worst, that it is a trap *) - let*! () = - Event.emit_trap_check_failure - ~published_level:slot_id.Types.Slot_id.slot_level - ~slot_index:slot_id.slot_index - ~shard_index - ~delegate:pkh - in - return false) - assigned_shard_indexes - let get_attestable_slots ctxt pkh attested_level () () = let get_attestable_slots ~shard_indices store last_known_parameters ~attested_level = @@ -507,7 +473,7 @@ module Profile_handlers = struct return all_stored else if not all_stored then return false else - is_slot_attestable_with_traps + Node_context.Attestable_slots.is_slot_attestable_with_traps shards_store last_known_parameters.traps_fraction pkh diff --git a/src/lib_dal_node/node_context.ml b/src/lib_dal_node/node_context.ml index 6ee9e4cfe6ac..4e38a94f4d1a 100644 --- a/src/lib_dal_node/node_context.ml +++ b/src/lib_dal_node/node_context.ml @@ -266,6 +266,42 @@ let get_disable_shard_validation ctxt = ctxt.disable_shard_validation let get_last_migration_level ctxt = ctxt.last_migration_level +module Attestable_slots = struct + let is_slot_attestable_with_traps shards_store traps_fraction pkh + assigned_shard_indexes slot_id = + let open Lwt_result_syntax in + List.for_all_es + (fun shard_index -> + let* {index = _; share} = + Store.Shards.read shards_store slot_id shard_index + in + (* Note: here [pkh] should identify the baker using its delegate key + (not the consensus key) *) + let trap_res = Trap.share_is_trap pkh share ~traps_fraction in + match trap_res with + | Ok true -> + let*! () = + Event.emit_cannot_attest_slot_because_of_trap + ~pkh + ~published_level:slot_id.slot_level + ~slot_index:slot_id.slot_index + ~shard_index + in + return_false + | Ok false -> return_true + | Error _ -> + (* assume the worst, that it is a trap *) + let*! () = + Event.emit_trap_check_failure + ~published_level:slot_id.Types.Slot_id.slot_level + ~slot_index:slot_id.slot_index + ~shard_index + ~delegate:pkh + in + return_false) + assigned_shard_indexes +end + module P2P = struct let connect {transport_layer; _} ?timeout point = Gossipsub.Transport_layer.connect transport_layer ?timeout point diff --git a/src/lib_dal_node/node_context.mli b/src/lib_dal_node/node_context.mli index 4baead1e6e8a..a5a9f19b102e 100644 --- a/src/lib_dal_node/node_context.mli +++ b/src/lib_dal_node/node_context.mli @@ -227,6 +227,23 @@ val get_disable_shard_validation : t -> bool [Proto_plugins.get_plugin_and_parameters_for_level] for more clarifications. *) val get_last_migration_level : t -> int32 +module Attestable_slots : sig + (** [is_slot_attestable_with_traps shards_store traps_fraction pkh + assigned_shard_indexes slot_id] checks whether the slot identified by [slot_id] + is attestable for delegate [pkh] with respect to the traps mechanism. + + The function iterates over the delegate’s [assigned_shard_indexes], reads each + corresponding stored shard share from [shards_store], and evaluates + [Trap.share_is_trap] on it using [traps_fraction]. *) + val is_slot_attestable_with_traps : + Store.Shards.t -> + Q.t -> + Signature.public_key_hash -> + int trace -> + Types.slot_id -> + (bool, [> Errors.not_found | Errors.other]) result Lwt.t +end + (** Module for P2P-related accessors. *) module P2P : sig (** [connect t ?timeout point] initiates a connection to the point -- GitLab From d25b57272e08878ba2ad8566c6bd255b98d6c1ec Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 15 Oct 2025 17:36:57 +0100 Subject: [PATCH 3/9] Dal_node: Node_context: Add per pkh attestable_slots table --- manifest/product_octez.ml | 1 + opam/tezos-dal-node-services.opam | 1 + src/lib_dal_node/node_context.ml | 98 +++++++++++++++++++++++++++++ src/lib_dal_node/node_context.mli | 19 ++++++ src/lib_dal_node_services/dune | 3 +- src/lib_dal_node_services/types.ml | 43 +++++++++++++ src/lib_dal_node_services/types.mli | 34 ++++++++++ 7 files changed, 198 insertions(+), 1 deletion(-) diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index 7cf3d2d24f4b..38b67a771827 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -5310,6 +5310,7 @@ let octez_dal_node_services = octez_base |> open_ ~m:"TzPervasives" |> open_; octez_rpc; octez_crypto_dal; + lwt_watcher; ] ~linkall:true diff --git a/opam/tezos-dal-node-services.opam b/opam/tezos-dal-node-services.opam index 04b5dd5b82c3..0917f33c47a0 100644 --- a/opam/tezos-dal-node-services.opam +++ b/opam/tezos-dal-node-services.opam @@ -11,6 +11,7 @@ depends: [ "dune" { >= "3.11.1" } "ocaml" { >= "4.14" } "octez-libs" { = version } + "lwt-watcher" { = "0.2" } ] build: [ ["rm" "-rf" "vendors" "contrib"] diff --git a/src/lib_dal_node/node_context.ml b/src/lib_dal_node/node_context.ml index 4e38a94f4d1a..d18503b9224c 100644 --- a/src/lib_dal_node/node_context.ml +++ b/src/lib_dal_node/node_context.ml @@ -47,6 +47,8 @@ type t = { disable_shard_validation : bool; ignore_pkhs : Signature.Public_key_hash.Set.t; mutable last_migration_level : int32; + mutable attestable_slots_watcher_table : + Types.Attestable_slots_watcher_table.t; } let init config ~network_name profile_ctxt cryptobox @@ -74,6 +76,8 @@ let init config ~network_name profile_ctxt cryptobox disable_shard_validation; ignore_pkhs; last_migration_level = 0l; + attestable_slots_watcher_table = + Types.Attestable_slots_watcher_table.create ~initial_size:5; } let get_tezos_node_cctxt ctxt = ctxt.tezos_node_cctxt @@ -266,6 +270,11 @@ let get_disable_shard_validation ctxt = ctxt.disable_shard_validation let get_last_migration_level ctxt = ctxt.last_migration_level +let get_attestation_lag ctxt ~level = + let open Result_syntax in + let+ params = get_proto_parameters ctxt ~level:(`Level level) in + Int32.of_int params.attestation_lag + module Attestable_slots = struct let is_slot_attestable_with_traps shards_store traps_fraction pkh assigned_shard_indexes slot_id = @@ -300,6 +309,95 @@ module Attestable_slots = struct in return_false) assigned_shard_indexes + + let subscribe ctxt ~pkh = + let module T = Types.Attestable_slots_watcher_table in + let watcher = T.get_or_init ctxt.attestable_slots_watcher_table pkh in + let stream, stopper = Lwt_watcher.create_stream (T.get_stream watcher) in + let next () = Lwt_stream.get stream in + let shutdown () = + (* stop this stream, then possibly remove the whole watcher if last subscriber *) + Lwt_watcher.shutdown stopper ; + T.set_num_subscribers watcher (T.get_num_subscribers watcher - 1) ; + if T.get_num_subscribers watcher <= 0 then + T.remove ctxt.attestable_slots_watcher_table pkh + in + Resto_directory.Answer.{next; shutdown} + + let drop_published_at_migration ctxt ~published_level = + let open Result_syntax in + let migration_level = ctxt.last_migration_level in + let* old_lag = get_attestation_lag ctxt ~level:published_level in + let* new_lag = + let attested_level = Int32.(add published_level old_lag) in + get_attestation_lag ctxt ~level:attested_level + in + return + @@ (old_lag > new_lag + && Int32.sub migration_level old_lag < published_level + && published_level <= migration_level) + + let may_notify ctxt ~(slot_id : Types.slot_id) = + let open Lwt_result_syntax in + let module T = Types.Attestable_slots_watcher_table in + let attestable_slots_watcher_table = ctxt.attestable_slots_watcher_table in + let subscribers = T.elements attestable_slots_watcher_table in + if Seq.is_empty subscribers then return_unit + else + let published_level = slot_id.slot_level in + let*? lag = get_attestation_lag ctxt ~level:published_level in + let attested_level = Int32.(add published_level lag) in + let attestation_level = Int32.pred attested_level in + let*? should_drop = drop_published_at_migration ctxt ~published_level in + if should_drop then return_unit + else + let*? last_known_parameters = + get_proto_parameters ctxt ~level:(`Level attested_level) + in + let shards_store = Store.shards (get_store ctxt) in + (* For each subscribed pkh, if it has assigned shards for that level, + check if all those shards are available for [slot_id] and notify watcher, + accordingly. *) + let notify_if_attestable pkh = + (* For retrieving the assigned shard indexes, we consider the committee + at [attestation_level], because the (DAL) attestations in the blocks + at level [attested_level] refer to the predecessor level. *) + let* shard_indices = + fetch_assigned_shard_indices ctxt ~pkh ~level:attestation_level + in + let number_of_assigned_shards = List.length shard_indices in + if number_of_assigned_shards = 0 then return_unit + else if published_level < 1l then return_unit + else + let* number_stored_shards = + Store.Shards.number_of_shards_available + shards_store + slot_id + shard_indices + in + let all_stored = number_stored_shards = number_of_assigned_shards in + if not last_known_parameters.incentives_enable then ( + if all_stored then + T.notify attestable_slots_watcher_table pkh ~slot_id ; + return_unit) + else if not all_stored then return_unit + else + let* is_slot_attestable_with_traps = + is_slot_attestable_with_traps + shards_store + last_known_parameters.traps_fraction + pkh + shard_indices + slot_id + |> Errors.to_option_tzresult + in + (match is_slot_attestable_with_traps with + | Some true -> + T.notify attestable_slots_watcher_table pkh ~slot_id + | _ -> ()) ; + return_unit + in + Seq.iter_ep notify_if_attestable subscribers end module P2P = struct diff --git a/src/lib_dal_node/node_context.mli b/src/lib_dal_node/node_context.mli index a5a9f19b102e..19b661c47ef5 100644 --- a/src/lib_dal_node/node_context.mli +++ b/src/lib_dal_node/node_context.mli @@ -242,6 +242,25 @@ module Attestable_slots : sig int trace -> Types.slot_id -> (bool, [> Errors.not_found | Errors.other]) result Lwt.t + + (** [subscribe ctxt ~pkh] opens a [Resto_directory.Answer] stream that yields + [Types.slot_id] values whenever a slot becomes attestable for [~pkh]. The stream + only emits items produced after subscription. *) + val subscribe : + t -> + pkh:Signature.public_key_hash -> + Types.slot_id Resto_directory.Answer.stream + + (** Let M = migration level (last block of the old protocol). Then, for levels + P included in [M - lag + 1 .. M] (inclusively), we do not PUBLISH any slots, + because the corresponding attested levels would fall in the new protocol. *) + val drop_published_at_migration : + t -> published_level:int32 -> (bool, tztrace) result + + (** [may_notify ctxt ~slot_id] checks, for each subscribed [pkh], whether all shards + assigned to [pkh] at the attestation level corresponding to [~slot_id] are available; + if so, it emits [~slot_id] to that [pkh]’s stream. *) + val may_notify : t -> slot_id:Types.slot_id -> unit tzresult Lwt.t end (** Module for P2P-related accessors. *) diff --git a/src/lib_dal_node_services/dune b/src/lib_dal_node_services/dune index b6eeda6439b0..379f8f05bbf9 100644 --- a/src/lib_dal_node_services/dune +++ b/src/lib_dal_node_services/dune @@ -8,7 +8,8 @@ (libraries octez-libs.base octez-libs.rpc - octez-libs.crypto-dal) + octez-libs.crypto-dal + lwt-watcher) (library_flags (:standard -linkall)) (flags (:standard) diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index f76114b59f52..172b2d8f920a 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -758,3 +758,46 @@ module Health = struct Format.fprintf fmt "(%s: %a)" name pp_status status)) checks end + +module Attestable_slots_watcher_table = struct + (** A watcher used to stream newly-attestable slots for a given delegate (pkh). + - [stream] is the push endpoint used by the DAL node to notify consumers + (RPC layer / baker) that a specific [slot_id] has become attestable. + - [num_subscribers] is the number of active consumers currently subscribed to + this pkh’s stream. *) + type watcher = { + stream : slot_id Lwt_watcher.input; + mutable num_subscribers : int; + } + + let get_stream watcher = watcher.stream + + let get_num_subscribers watcher = watcher.num_subscribers + + let set_num_subscribers watcher value = watcher.num_subscribers <- value + + type t = watcher Signature.Public_key_hash.Table.t + + let create ~initial_size = Signature.Public_key_hash.Table.create initial_size + + let get_or_init t pkh = + match Signature.Public_key_hash.Table.find t pkh with + | Some watcher -> + watcher.num_subscribers <- watcher.num_subscribers + 1 ; + watcher + | None -> + let watcher = + {stream = Lwt_watcher.create_input (); num_subscribers = 1} + in + Signature.Public_key_hash.Table.add t pkh watcher ; + watcher + + let notify t pkh ~slot_id = + match Signature.Public_key_hash.Table.find t pkh with + | None -> () + | Some watcher -> Lwt_watcher.notify watcher.stream slot_id + + let remove = Signature.Public_key_hash.Table.remove + + let elements = Signature.Public_key_hash.Table.to_seq_keys +end diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index 4c668b155f7f..09af4c13f488 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -406,3 +406,37 @@ module Health : sig val pp : Format.formatter -> t -> unit end + +module Attestable_slots_watcher_table : sig + type watcher + + (** Returns the stream currently stored in the watcher. *) + val get_stream : watcher -> slot_id Lwt_watcher.input + + (** Returns the number of subscribers to the stream currently stored in the watcher. *) + val get_num_subscribers : watcher -> shard_index + + (** Sets the value for the number of subscribers to the stream in the watcher. *) + val set_num_subscribers : watcher -> shard_index -> unit + + (** Table where we lazily create a watcher on first subscription for a pkh. When the last + subscriber calls [shutdown], the watcher is removed from the table to avoid leaks. *) + type t = watcher Signature.Public_key_hash.Table.t + + (** [create t ~initial_size] creates an empty table of watchers with an initial + bucket size of [~initial_size] (which is an initial estimation). *) + val create : initial_size:int -> t + + (** [get_or_init t pkh] returns (creating if absent) the watcher entry for [pkh] in [t]. *) + val get_or_init : t -> Signature.public_key_hash -> watcher + + (** [notify t pkh ~slot_id] pushes [slot_id] to the stream for [pkh] if present. *) + val notify : t -> Signature.public_key_hash -> slot_id:slot_id -> unit + + (** [remove t pkh] removes the watcher entry for [pkh] from [t] if present. *) + val remove : t -> Signature.public_key_hash -> unit + + (** [elements t] returns the current set of pkhs that have an active monitoring + subscription in [t]. *) + val elements : t -> Signature.public_key_hash Seq.t +end -- GitLab From cac056079bf60e5c304764634c1d968eab7868e1 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 15 Oct 2025 17:49:28 +0100 Subject: [PATCH 4/9] Dal_node: Node_context: Move drop attested levels at migration from RPC_server Done for the symmetry with the published level at migration case, since we should keep both functions in the same place. Note however that it is not trivial to use only one of them, because the condition changes depending on which perspective you see the problem from (i.e. published level vs. attested level case). --- src/lib_dal_node/RPC_server.ml | 37 +++++-------------------------- src/lib_dal_node/node_context.ml | 13 +++++++++++ src/lib_dal_node/node_context.mli | 5 +++++ 3 files changed, 24 insertions(+), 31 deletions(-) diff --git a/src/lib_dal_node/RPC_server.ml b/src/lib_dal_node/RPC_server.ml index 29faf55cb6a9..8d4ae5454c19 100644 --- a/src/lib_dal_node/RPC_server.ml +++ b/src/lib_dal_node/RPC_server.ml @@ -493,47 +493,22 @@ module Profile_handlers = struct return (Types.Attestable_slots {slots = flags; published_level}) in - (* Decide whether to short-circuit attestation computation at attested level [L] - because of a protocol migration that decreased [attestation_lag]. - - - M = migration level (first level of the new protocol); - - new_lag = attestation_lag at attested level L (new protocol); - - published_level = L - new_lag - - If published_level is still in the old protocol and the old lag > new lag, - then for L in [M .. M + new_lag - 1] the corresponding published data belongs - to the old protocol and should be ignored for attestation purposes. *) - let should_drop_due_to_migration last_known_parameters = - let open Lwt_result_syntax in - let migration_level = Node_context.get_last_migration_level ctxt in - let new_lag = Int32.of_int last_known_parameters.Types.attestation_lag in - let published_level = Int32.(sub attested_level new_lag) in - let* published_level_parameters = - Node_context.get_proto_parameters ctxt ~level:(`Level published_level) - |> Lwt.return - |> lwt_map_error (fun e -> `Other e) - in - let old_lag = - Int32.of_int published_level_parameters.Types.attestation_lag - in - return - (old_lag > new_lag - && migration_level <= attested_level - && attested_level < Int32.add migration_level new_lag) - in - (* TODO: https://gitlab.com/tezos/tezos/-/issues/8064 *) let get_attestable_slots ~shard_indices store last_known_parameters ~attested_level = let open Lwt_result_syntax in let* should_drop_due_to_migration = - should_drop_due_to_migration last_known_parameters + Node_context.Attestable_slots.drop_attested_at_migration + ctxt + ~attested_level + |> Lwt.return + |> lwt_map_error (fun e -> `Other e) in if should_drop_due_to_migration then let*! () = Event.emit_skip_attesting_shards ~level:attested_level in let slots = - Stdlib.List.init last_known_parameters.number_of_slots (fun _ -> + Stdlib.List.init last_known_parameters.Types.number_of_slots (fun _ -> false) in return (Types.Attestable_slots {slots; published_level = 0l}) diff --git a/src/lib_dal_node/node_context.ml b/src/lib_dal_node/node_context.ml index d18503b9224c..78fea5b7d5d2 100644 --- a/src/lib_dal_node/node_context.ml +++ b/src/lib_dal_node/node_context.ml @@ -337,6 +337,19 @@ module Attestable_slots = struct && Int32.sub migration_level old_lag < published_level && published_level <= migration_level) + let drop_attested_at_migration ctxt ~attested_level = + let open Result_syntax in + let migration_level = get_last_migration_level ctxt in + let* new_lag = get_attestation_lag ctxt ~level:attested_level in + let* old_lag = + let published_level = Int32.(sub attested_level new_lag) in + get_attestation_lag ctxt ~level:published_level + in + return + (old_lag > new_lag + && migration_level < attested_level + && attested_level <= Int32.add migration_level new_lag) + let may_notify ctxt ~(slot_id : Types.slot_id) = let open Lwt_result_syntax in let module T = Types.Attestable_slots_watcher_table in diff --git a/src/lib_dal_node/node_context.mli b/src/lib_dal_node/node_context.mli index 19b661c47ef5..012e4f277d1b 100644 --- a/src/lib_dal_node/node_context.mli +++ b/src/lib_dal_node/node_context.mli @@ -257,6 +257,11 @@ module Attestable_slots : sig val drop_published_at_migration : t -> published_level:int32 -> (bool, tztrace) result + (** Let M = migration level (last block of the old protocol). Then, for levels + A included in [M + 1 .. M + lag] (inclusively), we do not ATTEST any slots, + because the corresponding published levels would fall in the old protocol. *) + val drop_attested_at_migration : t -> attested_level:int32 -> bool tzresult + (** [may_notify ctxt ~slot_id] checks, for each subscribed [pkh], whether all shards assigned to [pkh] at the attestation level corresponding to [~slot_id] are available; if so, it emits [~slot_id] to that [pkh]’s stream. *) -- GitLab From b7be126b7cd0a004d6ea67110e073c0ab340d476 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Thu, 2 Oct 2025 12:19:24 +0100 Subject: [PATCH 5/9] Dal_node: RPC_server: Register attestable_slots streaming RPC --- src/lib_dal_node/RPC_server.ml | 10 +++++++ ... Testing DAL node (dal node list RPCs).out | 26 ++++++++++++------- ... Testing DAL node (dal node list RPCs).out | 26 ++++++++++++------- ... Testing DAL node (dal node list RPCs).out | 26 ++++++++++++------- 4 files changed, 61 insertions(+), 27 deletions(-) diff --git a/src/lib_dal_node/RPC_server.ml b/src/lib_dal_node/RPC_server.ml index 8d4ae5454c19..c9e29b577253 100644 --- a/src/lib_dal_node/RPC_server.ml +++ b/src/lib_dal_node/RPC_server.ml @@ -546,6 +546,12 @@ module Profile_handlers = struct store last_known_parameters ~attested_level) + + let monitor_attestable_slots ctxt pkh () () = + let Resto_directory.Answer.{next; shutdown} = + Node_context.Attestable_slots.subscribe ctxt ~pkh + in + Tezos_rpc.Answer.return_stream {next; shutdown} end let version ctxt () () = @@ -751,6 +757,10 @@ let register : Tezos_rpc.Directory.opt_register2 Services.get_attestable_slots (Profile_handlers.get_attestable_slots ctxt) + |> add_service + Tezos_rpc.Directory.gen_register1 + Services.monitor_attestable_slots + (Profile_handlers.monitor_attestable_slots ctxt) |> add_service Tezos_rpc.Directory.register1 Services.get_traps diff --git a/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out b/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out index b10db53d9fe6..afece719412c 100644 --- a/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out +++ b/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out @@ -102,15 +102,23 @@ Available services: Update the list of profiles tracked by the DAL node. Note that it does not take the bootstrap profile as it is incompatible with other profiles. - - GET /profiles//attested_levels//assigned_shard_indices - Return the shard indexes assigned to the given public key hash at the - given level. - - GET /profiles//attested_levels//attestable_slots - Return the currently attestable slots at the given attested level by - the given public key hash. A slot is attestable at level [l] if it is - published at level [l - attestation_lag] and *all* the shards assigned - at level [l] to the given public key hash are available in the DAL - node's store. + + profiles// + - GET /profiles//attested_levels//assigned_shard_indices + Return the shard indexes assigned to the given public key hash at the + given level. + - GET /profiles//attested_levels//attestable_slots + Return the currently attestable slots at the given attested level by + the given public key hash. A slot is attestable at level [l] if it is + published at level [l - attestation_lag] and *all* the shards + assigned at level [l] to the given public key hash are available in + the DAL node's store. + - GET /profiles//monitor/attestable_slots + Stream attestable slot ids for a given public key hash [pkh]. A slot + is attestable for attested level L if it was published at (L - + attestation_lag) and *all* shards assigned at level L to [pkh] are + available in the DAL node's store. If some shards of the slot are + detected as traps for the baker, the slot should not be attested, so + the id is not sent via the stream. - GET /protocol_parameters Returns the protocol parameters as known by the DAL node. An optional 'level' argument can specify for which level to retrieve them. diff --git a/tezt/tests/expected/dal.ml/S023-- Testing DAL node (dal node list RPCs).out b/tezt/tests/expected/dal.ml/S023-- Testing DAL node (dal node list RPCs).out index b10db53d9fe6..afece719412c 100644 --- a/tezt/tests/expected/dal.ml/S023-- Testing DAL node (dal node list RPCs).out +++ b/tezt/tests/expected/dal.ml/S023-- Testing DAL node (dal node list RPCs).out @@ -102,15 +102,23 @@ Available services: Update the list of profiles tracked by the DAL node. Note that it does not take the bootstrap profile as it is incompatible with other profiles. - - GET /profiles//attested_levels//assigned_shard_indices - Return the shard indexes assigned to the given public key hash at the - given level. - - GET /profiles//attested_levels//attestable_slots - Return the currently attestable slots at the given attested level by - the given public key hash. A slot is attestable at level [l] if it is - published at level [l - attestation_lag] and *all* the shards assigned - at level [l] to the given public key hash are available in the DAL - node's store. + + profiles// + - GET /profiles//attested_levels//assigned_shard_indices + Return the shard indexes assigned to the given public key hash at the + given level. + - GET /profiles//attested_levels//attestable_slots + Return the currently attestable slots at the given attested level by + the given public key hash. A slot is attestable at level [l] if it is + published at level [l - attestation_lag] and *all* the shards + assigned at level [l] to the given public key hash are available in + the DAL node's store. + - GET /profiles//monitor/attestable_slots + Stream attestable slot ids for a given public key hash [pkh]. A slot + is attestable for attested level L if it was published at (L - + attestation_lag) and *all* shards assigned at level L to [pkh] are + available in the DAL node's store. If some shards of the slot are + detected as traps for the baker, the slot should not be attested, so + the id is not sent via the stream. - GET /protocol_parameters Returns the protocol parameters as known by the DAL node. An optional 'level' argument can specify for which level to retrieve them. diff --git a/tezt/tests/expected/dal.ml/T024-- Testing DAL node (dal node list RPCs).out b/tezt/tests/expected/dal.ml/T024-- Testing DAL node (dal node list RPCs).out index b10db53d9fe6..afece719412c 100644 --- a/tezt/tests/expected/dal.ml/T024-- Testing DAL node (dal node list RPCs).out +++ b/tezt/tests/expected/dal.ml/T024-- Testing DAL node (dal node list RPCs).out @@ -102,15 +102,23 @@ Available services: Update the list of profiles tracked by the DAL node. Note that it does not take the bootstrap profile as it is incompatible with other profiles. - - GET /profiles//attested_levels//assigned_shard_indices - Return the shard indexes assigned to the given public key hash at the - given level. - - GET /profiles//attested_levels//attestable_slots - Return the currently attestable slots at the given attested level by - the given public key hash. A slot is attestable at level [l] if it is - published at level [l - attestation_lag] and *all* the shards assigned - at level [l] to the given public key hash are available in the DAL - node's store. + + profiles// + - GET /profiles//attested_levels//assigned_shard_indices + Return the shard indexes assigned to the given public key hash at the + given level. + - GET /profiles//attested_levels//attestable_slots + Return the currently attestable slots at the given attested level by + the given public key hash. A slot is attestable at level [l] if it is + published at level [l - attestation_lag] and *all* the shards + assigned at level [l] to the given public key hash are available in + the DAL node's store. + - GET /profiles//monitor/attestable_slots + Stream attestable slot ids for a given public key hash [pkh]. A slot + is attestable for attested level L if it was published at (L - + attestation_lag) and *all* shards assigned at level L to [pkh] are + available in the DAL node's store. If some shards of the slot are + detected as traps for the baker, the slot should not be attested, so + the id is not sent via the stream. - GET /protocol_parameters Returns the protocol parameters as known by the DAL node. An optional 'level' argument can specify for which level to retrieve them. -- GitLab From e8c281c2a2dae965baabeb64d201cf955128e526 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Thu, 2 Oct 2025 14:15:25 +0100 Subject: [PATCH 6/9] Dal_node: Notify subscribed delegates when shards are stored --- src/lib_dal_node/amplificator.ml | 3 +++ src/lib_dal_node/daemon.ml | 1 + src/lib_dal_node/slot_manager.ml | 1 + 3 files changed, 5 insertions(+) diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index a91e5930a706..b08690b904ea 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -391,6 +391,9 @@ let reply_receiver_job {process; query_store; _} node_context = Store.Shards.write_all (Store.shards node_store) slot_id shards |> Errors.to_tzresult in + let* () = + Node_context.Attestable_slots.may_notify node_context ~slot_id + in let level_committee ~level = let* res = Node_context.fetch_committees node_context ~level in return (Signature.Public_key_hash.Map.map fst res) diff --git a/src/lib_dal_node/daemon.ml b/src/lib_dal_node/daemon.ml index b3e2a991b698..b336270e2a3b 100644 --- a/src/lib_dal_node/daemon.ml +++ b/src/lib_dal_node/daemon.ml @@ -163,6 +163,7 @@ let connect_gossipsub_with_p2p proto_parameters gs_worker transport_layer [@profiler.aggregate_s {verbosity = Notice; profiler_module = Profiler} "save_and_notify"] in + let* () = Node_context.Attestable_slots.may_notify node_ctxt ~slot_id in (* Introduce a new store read at each received shard. Not sure it can be a problem, though *) let* number_of_already_stored_shards = diff --git a/src/lib_dal_node/slot_manager.ml b/src/lib_dal_node/slot_manager.ml index d9c382ff319c..3fd317668a16 100644 --- a/src/lib_dal_node/slot_manager.ml +++ b/src/lib_dal_node/slot_manager.ml @@ -751,6 +751,7 @@ let publish_slot_data ctxt ~level_committee ~slot_size gs_worker Store.Shards.write_all (Store.shards node_store) slot_id shards |> Errors.to_tzresult in + let* () = Node_context.Attestable_slots.may_notify ctxt ~slot_id in let* () = Store.Slots.add_slot ~slot_size (Store.slots node_store) slot slot_id |> Errors.to_tzresult -- GitLab From 51acbfc9f18ae34aae26daca46bff8728b3d1fbc Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 15 Oct 2025 17:55:11 +0100 Subject: [PATCH 7/9] Dal_node: Node_context: Add deduplication mechanism for notified slots --- src/lib_dal_node/node_context.ml | 8 +++++- src/lib_dal_node_services/types.ml | 38 ++++++++++++++++++++++++++--- src/lib_dal_node_services/types.mli | 6 +++-- 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/src/lib_dal_node/node_context.ml b/src/lib_dal_node/node_context.ml index 78fea5b7d5d2..3cd48915d964 100644 --- a/src/lib_dal_node/node_context.ml +++ b/src/lib_dal_node/node_context.ml @@ -312,7 +312,13 @@ module Attestable_slots = struct let subscribe ctxt ~pkh = let module T = Types.Attestable_slots_watcher_table in - let watcher = T.get_or_init ctxt.attestable_slots_watcher_table pkh in + let proto_params = + Result.to_option + @@ get_proto_parameters ctxt ~level:(`Level ctxt.last_finalized_level) + in + let watcher = + T.get_or_init ctxt.attestable_slots_watcher_table pkh proto_params + in let stream, stopper = Lwt_watcher.create_stream (T.get_stream watcher) in let next () = Lwt_stream.get stream in let shutdown () = diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index 172b2d8f920a..74bb9478f56c 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -759,15 +759,28 @@ module Health = struct checks end +module SlotIdSet = + Aches.Vache.Set (Aches.Vache.LRU_Sloppy) (Aches.Vache.Strong) + (struct + type t = Slot_id.t + + let equal = Slot_id.equal + + let hash = Slot_id.hash + end) + module Attestable_slots_watcher_table = struct (** A watcher used to stream newly-attestable slots for a given delegate (pkh). - [stream] is the push endpoint used by the DAL node to notify consumers (RPC layer / baker) that a specific [slot_id] has become attestable. - [num_subscribers] is the number of active consumers currently subscribed to - this pkh’s stream. *) + this pkh’s stream. + - [notified_slots] is an LRU set of slot ids already notified, so that we avoid + sending duplicates in the stream. *) type watcher = { stream : slot_id Lwt_watcher.input; mutable num_subscribers : int; + notified_slots : SlotIdSet.t; } let get_stream watcher = watcher.stream @@ -780,14 +793,28 @@ module Attestable_slots_watcher_table = struct let create ~initial_size = Signature.Public_key_hash.Table.create initial_size - let get_or_init t pkh = + let get_or_init t pkh proto_params = match Signature.Public_key_hash.Table.find t pkh with | Some watcher -> watcher.num_subscribers <- watcher.num_subscribers + 1 ; watcher | None -> + (* We only need to remember slot ids from their publication level until the + attestation window closes ([attestation_lag] levels later). There can be + at most [number_of_slots] distinct slot ids per level across that window. + We allow a small (2 levels) additional window for safety. *) + let capacity = + match proto_params with + | Some {number_of_slots; attestation_lag; _} -> + number_of_slots * (attestation_lag + 2) + | None -> 320 (* 32 * (8 + 2) - hardcoded values as of protocol T *) + in let watcher = - {stream = Lwt_watcher.create_input (); num_subscribers = 1} + { + stream = Lwt_watcher.create_input (); + num_subscribers = 1; + notified_slots = SlotIdSet.create capacity; + } in Signature.Public_key_hash.Table.add t pkh watcher ; watcher @@ -795,7 +822,10 @@ module Attestable_slots_watcher_table = struct let notify t pkh ~slot_id = match Signature.Public_key_hash.Table.find t pkh with | None -> () - | Some watcher -> Lwt_watcher.notify watcher.stream slot_id + | Some watcher -> + if not @@ SlotIdSet.mem watcher.notified_slots slot_id then ( + SlotIdSet.add watcher.notified_slots slot_id ; + Lwt_watcher.notify watcher.stream slot_id) let remove = Signature.Public_key_hash.Table.remove diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index 09af4c13f488..916aad70c067 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -427,8 +427,10 @@ module Attestable_slots_watcher_table : sig bucket size of [~initial_size] (which is an initial estimation). *) val create : initial_size:int -> t - (** [get_or_init t pkh] returns (creating if absent) the watcher entry for [pkh] in [t]. *) - val get_or_init : t -> Signature.public_key_hash -> watcher + (** [get_or_init t pkh proto_params] returns (creating if absent) the watcher entry for + [pkh] in [t].*) + val get_or_init : + t -> Signature.public_key_hash -> proto_parameters option -> watcher (** [notify t pkh ~slot_id] pushes [slot_id] to the stream for [pkh] if present. *) val notify : t -> Signature.public_key_hash -> slot_id:slot_id -> unit -- GitLab From f0f874f2622bc3ae8d94480e4d30d252eedaba1d Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 14 Oct 2025 12:39:32 +0100 Subject: [PATCH 8/9] Dal_node: Update CHANGELOG --- CHANGES.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index f3cda53bdf8a..6d1c3ed1acc8 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -115,6 +115,13 @@ Data Availability Layer (DAL) DAL node ~~~~~~~~ +- Added RPC ``GET /profiles/{pkh}/monitor/attestable_slots`` to open a monitoring + stream that emits a JSON ``slot_id`` each time a slot becomes attestable for the + given public key hash (``pkh``). A slot id is emitted when all shards assigned to + that ``pkh`` at the corresponding attestation level are available in the DAL + node's store. If traps are detected within the slot, then it should not be attested, + so the id is not sent via the stream. (MR :gl:`!19459`) + - The DAL node now starts propagating shards one level after the inclusion of the corresponding published slot header operation (i.e., when the operation is finalized), instead of two levels after, when the block is finalized. (MR :gl:`!19366`) -- GitLab From 79b18a9c00c148b8ee4c1b5b8c48f9a696b10f4b Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Thu, 16 Oct 2025 10:29:16 +0100 Subject: [PATCH 9/9] Dal_node: Move Attestable_slots module to its own file --- src/lib_dal_node/RPC_server.ml | 8 +- src/lib_dal_node/amplificator.ml | 4 +- src/lib_dal_node/attestable_slots.ml | 154 ++++++++++++++++++++++++++ src/lib_dal_node/attestable_slots.mli | 48 ++++++++ src/lib_dal_node/daemon.ml | 2 +- src/lib_dal_node/node_context.ml | 147 +----------------------- src/lib_dal_node/node_context.mli | 48 ++------ src/lib_dal_node/slot_manager.ml | 2 +- 8 files changed, 219 insertions(+), 194 deletions(-) create mode 100644 src/lib_dal_node/attestable_slots.ml create mode 100644 src/lib_dal_node/attestable_slots.mli diff --git a/src/lib_dal_node/RPC_server.ml b/src/lib_dal_node/RPC_server.ml index c9e29b577253..3abc918b5391 100644 --- a/src/lib_dal_node/RPC_server.ml +++ b/src/lib_dal_node/RPC_server.ml @@ -473,7 +473,7 @@ module Profile_handlers = struct return all_stored else if not all_stored then return false else - Node_context.Attestable_slots.is_slot_attestable_with_traps + Attestable_slots.is_slot_attestable_with_traps shards_store last_known_parameters.traps_fraction pkh @@ -498,9 +498,7 @@ module Profile_handlers = struct ~attested_level = let open Lwt_result_syntax in let* should_drop_due_to_migration = - Node_context.Attestable_slots.drop_attested_at_migration - ctxt - ~attested_level + Attestable_slots.attested_just_after_migration ctxt ~attested_level |> Lwt.return |> lwt_map_error (fun e -> `Other e) in @@ -549,7 +547,7 @@ module Profile_handlers = struct let monitor_attestable_slots ctxt pkh () () = let Resto_directory.Answer.{next; shutdown} = - Node_context.Attestable_slots.subscribe ctxt ~pkh + Attestable_slots.subscribe ctxt ~pkh in Tezos_rpc.Answer.return_stream {next; shutdown} end diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index b08690b904ea..8b5f80e33a0c 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -391,9 +391,7 @@ let reply_receiver_job {process; query_store; _} node_context = Store.Shards.write_all (Store.shards node_store) slot_id shards |> Errors.to_tzresult in - let* () = - Node_context.Attestable_slots.may_notify node_context ~slot_id - in + let* () = Attestable_slots.may_notify node_context ~slot_id in let level_committee ~level = let* res = Node_context.fetch_committees node_context ~level in return (Signature.Public_key_hash.Map.map fst res) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml new file mode 100644 index 000000000000..6fe71c51e3bc --- /dev/null +++ b/src/lib_dal_node/attestable_slots.ml @@ -0,0 +1,154 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +let is_slot_attestable_with_traps shards_store traps_fraction pkh + assigned_shard_indexes slot_id = + let open Lwt_result_syntax in + List.for_all_es + (fun shard_index -> + let* {index = _; share} = + Store.Shards.read shards_store slot_id shard_index + in + (* Note: here [pkh] should identify the baker using its delegate key + (not the consensus key) *) + let trap_res = Trap.share_is_trap pkh share ~traps_fraction in + match trap_res with + | Ok true -> + let*! () = + Event.emit_cannot_attest_slot_because_of_trap + ~pkh + ~published_level:slot_id.slot_level + ~slot_index:slot_id.slot_index + ~shard_index + in + return_false + | Ok false -> return_true + | Error _ -> + (* assume the worst, that it is a trap *) + let*! () = + Event.emit_trap_check_failure + ~published_level:slot_id.Types.Slot_id.slot_level + ~slot_index:slot_id.slot_index + ~shard_index + ~delegate:pkh + in + return_false) + assigned_shard_indexes + +let subscribe ctxt ~pkh = + let open Node_context in + let module T = Types.Attestable_slots_watcher_table in + let proto_params = + Result.to_option + @@ get_proto_parameters ctxt ~level:(`Level (get_last_finalized_level ctxt)) + in + let attestable_slots_watcher_table = + get_attestable_slots_watcher_table ctxt + in + let watcher = T.get_or_init attestable_slots_watcher_table pkh proto_params in + let stream, stopper = Lwt_watcher.create_stream (T.get_stream watcher) in + let next () = Lwt_stream.get stream in + let shutdown () = + (* stop this stream, then possibly remove the whole watcher if last subscriber *) + Lwt_watcher.shutdown stopper ; + T.set_num_subscribers watcher (T.get_num_subscribers watcher - 1) ; + if T.get_num_subscribers watcher <= 0 then + T.remove attestable_slots_watcher_table pkh + in + Resto_directory.Answer.{next; shutdown} + +let published_just_before_migration ctxt ~published_level = + let open Result_syntax in + let open Node_context in + let migration_level = get_last_migration_level ctxt in + let* old_lag = get_attestation_lag ctxt ~level:published_level in + let* new_lag = + let attested_level = Int32.(add published_level old_lag) in + get_attestation_lag ctxt ~level:attested_level + in + return + @@ (old_lag > new_lag + && Int32.sub migration_level old_lag < published_level + && published_level <= migration_level) + +let attested_just_after_migration ctxt ~attested_level = + let open Result_syntax in + let open Node_context in + let migration_level = get_last_migration_level ctxt in + let* new_lag = get_attestation_lag ctxt ~level:attested_level in + let* old_lag = + let published_level = Int32.(sub attested_level new_lag) in + get_attestation_lag ctxt ~level:published_level + in + return + (old_lag > new_lag + && migration_level < attested_level + && attested_level <= Int32.add migration_level new_lag) + +let may_notify ctxt ~(slot_id : Types.slot_id) = + let open Lwt_result_syntax in + let open Node_context in + let module T = Types.Attestable_slots_watcher_table in + let attestable_slots_watcher_table = + get_attestable_slots_watcher_table ctxt + in + let subscribers = T.elements attestable_slots_watcher_table in + if Seq.is_empty subscribers then return_unit + else + let published_level = slot_id.slot_level in + let*? lag = get_attestation_lag ctxt ~level:published_level in + let attested_level = Int32.(add published_level lag) in + let attestation_level = Int32.pred attested_level in + let*? should_drop = published_just_before_migration ctxt ~published_level in + if should_drop then return_unit + else + let*? last_known_parameters = + get_proto_parameters ctxt ~level:(`Level attested_level) + in + let shards_store = Store.shards (get_store ctxt) in + (* For each subscribed pkh, if it has assigned shards for that level, + check if all those shards are available for [slot_id] and notify watcher, + accordingly. *) + let notify_if_attestable pkh = + (* For retrieving the assigned shard indexes, we consider the committee + at [attestation_level], because the (DAL) attestations in the blocks + at level [attested_level] refer to the predecessor level. *) + let* shard_indices = + fetch_assigned_shard_indices ctxt ~pkh ~level:attestation_level + in + let number_of_assigned_shards = List.length shard_indices in + if number_of_assigned_shards = 0 then return_unit + else if published_level < 1l then return_unit + else + let* number_stored_shards = + Store.Shards.number_of_shards_available + shards_store + slot_id + shard_indices + in + let all_stored = number_stored_shards = number_of_assigned_shards in + if not last_known_parameters.incentives_enable then ( + if all_stored then + T.notify attestable_slots_watcher_table pkh ~slot_id ; + return_unit) + else if not all_stored then return_unit + else + let* is_slot_attestable_with_traps = + is_slot_attestable_with_traps + shards_store + last_known_parameters.traps_fraction + pkh + shard_indices + slot_id + |> Errors.to_option_tzresult + in + (match is_slot_attestable_with_traps with + | Some true -> T.notify attestable_slots_watcher_table pkh ~slot_id + | _ -> ()) ; + return_unit + in + Seq.iter_ep notify_if_attestable subscribers diff --git a/src/lib_dal_node/attestable_slots.mli b/src/lib_dal_node/attestable_slots.mli new file mode 100644 index 000000000000..fee2b7f6a783 --- /dev/null +++ b/src/lib_dal_node/attestable_slots.mli @@ -0,0 +1,48 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +(** [is_slot_attestable_with_traps shards_store traps_fraction pkh + assigned_shard_indexes slot_id] checks whether the slot identified by [slot_id] + is attestable for delegate [pkh] with respect to the traps mechanism. + + The function iterates over the delegate’s [assigned_shard_indexes], reads each + corresponding stored shard share from [shards_store], and evaluates + [Trap.share_is_trap] on it using [traps_fraction]. *) +val is_slot_attestable_with_traps : + Store.Shards.t -> + Q.t -> + Signature.public_key_hash -> + int trace -> + Types.slot_id -> + (bool, [> Errors.not_found | Errors.other]) result Lwt.t + +(** [subscribe ctxt ~pkh] opens a [Resto_directory.Answer] stream that yields + [Types.slot_id] values whenever a slot becomes attestable for [~pkh]. The stream + only emits items produced after subscription. *) +val subscribe : + Node_context.t -> + pkh:Signature.public_key_hash -> + Types.slot_id Resto_directory.Answer.stream + +(** Let M = migration level (last block of the old protocol). This function + attempts to determine whether [~published_level] is included in `[M - lag + 1 .. M]` + (inclusively), where [lag] is the lag at [~published_level]. + In this case, the corresponding attested levels would fall in the new protocol. *) +val published_just_before_migration : + Node_context.t -> published_level:int32 -> bool tzresult + +(** Let M = migration level (last block of the old protocol). This function + attempts to determine whether [~attested_level] is included in `[M + 1 .. M + lag]` + (inclusively), where [lag] is the lag at [~attested_level]. + In this case, the corresponding attested levels would fall in the old protocol. *) +val attested_just_after_migration : + Node_context.t -> attested_level:int32 -> bool tzresult + +(** [may_notify ctxt ~slot_id] checks, for each subscribed [pkh], whether all shards + assigned to [pkh] at the attestation level corresponding to [~slot_id] are available; + if so, it emits [~slot_id] to that [pkh]’s stream. *) +val may_notify : Node_context.t -> slot_id:Types.slot_id -> unit tzresult Lwt.t diff --git a/src/lib_dal_node/daemon.ml b/src/lib_dal_node/daemon.ml index b336270e2a3b..ba4e96e2f8c8 100644 --- a/src/lib_dal_node/daemon.ml +++ b/src/lib_dal_node/daemon.ml @@ -163,7 +163,7 @@ let connect_gossipsub_with_p2p proto_parameters gs_worker transport_layer [@profiler.aggregate_s {verbosity = Notice; profiler_module = Profiler} "save_and_notify"] in - let* () = Node_context.Attestable_slots.may_notify node_ctxt ~slot_id in + let* () = Attestable_slots.may_notify node_ctxt ~slot_id in (* Introduce a new store read at each received shard. Not sure it can be a problem, though *) let* number_of_already_stored_shards = diff --git a/src/lib_dal_node/node_context.ml b/src/lib_dal_node/node_context.ml index 3cd48915d964..3a0ba5c6a364 100644 --- a/src/lib_dal_node/node_context.ml +++ b/src/lib_dal_node/node_context.ml @@ -270,155 +270,14 @@ let get_disable_shard_validation ctxt = ctxt.disable_shard_validation let get_last_migration_level ctxt = ctxt.last_migration_level +let get_attestable_slots_watcher_table ctxt = + ctxt.attestable_slots_watcher_table + let get_attestation_lag ctxt ~level = let open Result_syntax in let+ params = get_proto_parameters ctxt ~level:(`Level level) in Int32.of_int params.attestation_lag -module Attestable_slots = struct - let is_slot_attestable_with_traps shards_store traps_fraction pkh - assigned_shard_indexes slot_id = - let open Lwt_result_syntax in - List.for_all_es - (fun shard_index -> - let* {index = _; share} = - Store.Shards.read shards_store slot_id shard_index - in - (* Note: here [pkh] should identify the baker using its delegate key - (not the consensus key) *) - let trap_res = Trap.share_is_trap pkh share ~traps_fraction in - match trap_res with - | Ok true -> - let*! () = - Event.emit_cannot_attest_slot_because_of_trap - ~pkh - ~published_level:slot_id.slot_level - ~slot_index:slot_id.slot_index - ~shard_index - in - return_false - | Ok false -> return_true - | Error _ -> - (* assume the worst, that it is a trap *) - let*! () = - Event.emit_trap_check_failure - ~published_level:slot_id.Types.Slot_id.slot_level - ~slot_index:slot_id.slot_index - ~shard_index - ~delegate:pkh - in - return_false) - assigned_shard_indexes - - let subscribe ctxt ~pkh = - let module T = Types.Attestable_slots_watcher_table in - let proto_params = - Result.to_option - @@ get_proto_parameters ctxt ~level:(`Level ctxt.last_finalized_level) - in - let watcher = - T.get_or_init ctxt.attestable_slots_watcher_table pkh proto_params - in - let stream, stopper = Lwt_watcher.create_stream (T.get_stream watcher) in - let next () = Lwt_stream.get stream in - let shutdown () = - (* stop this stream, then possibly remove the whole watcher if last subscriber *) - Lwt_watcher.shutdown stopper ; - T.set_num_subscribers watcher (T.get_num_subscribers watcher - 1) ; - if T.get_num_subscribers watcher <= 0 then - T.remove ctxt.attestable_slots_watcher_table pkh - in - Resto_directory.Answer.{next; shutdown} - - let drop_published_at_migration ctxt ~published_level = - let open Result_syntax in - let migration_level = ctxt.last_migration_level in - let* old_lag = get_attestation_lag ctxt ~level:published_level in - let* new_lag = - let attested_level = Int32.(add published_level old_lag) in - get_attestation_lag ctxt ~level:attested_level - in - return - @@ (old_lag > new_lag - && Int32.sub migration_level old_lag < published_level - && published_level <= migration_level) - - let drop_attested_at_migration ctxt ~attested_level = - let open Result_syntax in - let migration_level = get_last_migration_level ctxt in - let* new_lag = get_attestation_lag ctxt ~level:attested_level in - let* old_lag = - let published_level = Int32.(sub attested_level new_lag) in - get_attestation_lag ctxt ~level:published_level - in - return - (old_lag > new_lag - && migration_level < attested_level - && attested_level <= Int32.add migration_level new_lag) - - let may_notify ctxt ~(slot_id : Types.slot_id) = - let open Lwt_result_syntax in - let module T = Types.Attestable_slots_watcher_table in - let attestable_slots_watcher_table = ctxt.attestable_slots_watcher_table in - let subscribers = T.elements attestable_slots_watcher_table in - if Seq.is_empty subscribers then return_unit - else - let published_level = slot_id.slot_level in - let*? lag = get_attestation_lag ctxt ~level:published_level in - let attested_level = Int32.(add published_level lag) in - let attestation_level = Int32.pred attested_level in - let*? should_drop = drop_published_at_migration ctxt ~published_level in - if should_drop then return_unit - else - let*? last_known_parameters = - get_proto_parameters ctxt ~level:(`Level attested_level) - in - let shards_store = Store.shards (get_store ctxt) in - (* For each subscribed pkh, if it has assigned shards for that level, - check if all those shards are available for [slot_id] and notify watcher, - accordingly. *) - let notify_if_attestable pkh = - (* For retrieving the assigned shard indexes, we consider the committee - at [attestation_level], because the (DAL) attestations in the blocks - at level [attested_level] refer to the predecessor level. *) - let* shard_indices = - fetch_assigned_shard_indices ctxt ~pkh ~level:attestation_level - in - let number_of_assigned_shards = List.length shard_indices in - if number_of_assigned_shards = 0 then return_unit - else if published_level < 1l then return_unit - else - let* number_stored_shards = - Store.Shards.number_of_shards_available - shards_store - slot_id - shard_indices - in - let all_stored = number_stored_shards = number_of_assigned_shards in - if not last_known_parameters.incentives_enable then ( - if all_stored then - T.notify attestable_slots_watcher_table pkh ~slot_id ; - return_unit) - else if not all_stored then return_unit - else - let* is_slot_attestable_with_traps = - is_slot_attestable_with_traps - shards_store - last_known_parameters.traps_fraction - pkh - shard_indices - slot_id - |> Errors.to_option_tzresult - in - (match is_slot_attestable_with_traps with - | Some true -> - T.notify attestable_slots_watcher_table pkh ~slot_id - | _ -> ()) ; - return_unit - in - Seq.iter_ep notify_if_attestable subscribers -end - module P2P = struct let connect {transport_layer; _} ?timeout point = Gossipsub.Transport_layer.connect transport_layer ?timeout point diff --git a/src/lib_dal_node/node_context.mli b/src/lib_dal_node/node_context.mli index 012e4f277d1b..a84933cf5206 100644 --- a/src/lib_dal_node/node_context.mli +++ b/src/lib_dal_node/node_context.mli @@ -227,46 +227,14 @@ val get_disable_shard_validation : t -> bool [Proto_plugins.get_plugin_and_parameters_for_level] for more clarifications. *) val get_last_migration_level : t -> int32 -module Attestable_slots : sig - (** [is_slot_attestable_with_traps shards_store traps_fraction pkh - assigned_shard_indexes slot_id] checks whether the slot identified by [slot_id] - is attestable for delegate [pkh] with respect to the traps mechanism. - - The function iterates over the delegate’s [assigned_shard_indexes], reads each - corresponding stored shard share from [shards_store], and evaluates - [Trap.share_is_trap] on it using [traps_fraction]. *) - val is_slot_attestable_with_traps : - Store.Shards.t -> - Q.t -> - Signature.public_key_hash -> - int trace -> - Types.slot_id -> - (bool, [> Errors.not_found | Errors.other]) result Lwt.t - - (** [subscribe ctxt ~pkh] opens a [Resto_directory.Answer] stream that yields - [Types.slot_id] values whenever a slot becomes attestable for [~pkh]. The stream - only emits items produced after subscription. *) - val subscribe : - t -> - pkh:Signature.public_key_hash -> - Types.slot_id Resto_directory.Answer.stream - - (** Let M = migration level (last block of the old protocol). Then, for levels - P included in [M - lag + 1 .. M] (inclusively), we do not PUBLISH any slots, - because the corresponding attested levels would fall in the new protocol. *) - val drop_published_at_migration : - t -> published_level:int32 -> (bool, tztrace) result - - (** Let M = migration level (last block of the old protocol). Then, for levels - A included in [M + 1 .. M + lag] (inclusively), we do not ATTEST any slots, - because the corresponding published levels would fall in the old protocol. *) - val drop_attested_at_migration : t -> attested_level:int32 -> bool tzresult - - (** [may_notify ctxt ~slot_id] checks, for each subscribed [pkh], whether all shards - assigned to [pkh] at the attestation level corresponding to [~slot_id] are available; - if so, it emits [~slot_id] to that [pkh]’s stream. *) - val may_notify : t -> slot_id:Types.slot_id -> unit tzresult Lwt.t -end +(** [get_attestable_slots_watcher_table ctxt] return the table of streams containing + attestable slots per pkh. *) +val get_attestable_slots_watcher_table : + t -> Types.Attestable_slots_watcher_table.t + +(** [get_attestation_lag ctxt ~level] returns the attestation lag found at [~level] + using protocol parameters obtained using [ctxt]. *) +val get_attestation_lag : t -> level:int32 -> int32 tzresult (** Module for P2P-related accessors. *) module P2P : sig diff --git a/src/lib_dal_node/slot_manager.ml b/src/lib_dal_node/slot_manager.ml index 3fd317668a16..ce8c506ff469 100644 --- a/src/lib_dal_node/slot_manager.ml +++ b/src/lib_dal_node/slot_manager.ml @@ -751,7 +751,7 @@ let publish_slot_data ctxt ~level_committee ~slot_size gs_worker Store.Shards.write_all (Store.shards node_store) slot_id shards |> Errors.to_tzresult in - let* () = Node_context.Attestable_slots.may_notify ctxt ~slot_id in + let* () = Attestable_slots.may_notify ctxt ~slot_id in let* () = Store.Slots.add_slot ~slot_size (Store.slots node_store) slot slot_id |> Errors.to_tzresult -- GitLab