diff --git a/src/lib_dal_node/RPC_server.ml b/src/lib_dal_node/RPC_server.ml index 675c12b758be6546cdf80adf63ae6d2855b22819..d43ce3309b6d0bfc2a7a010841db1f8c5ff83b0b 100644 --- a/src/lib_dal_node/RPC_server.ml +++ b/src/lib_dal_node/RPC_server.ml @@ -540,7 +540,8 @@ module Profile_handlers = struct ~attested_level) let monitor_attestable_slots ctxt pkh () () = - let Resto_directory.Answer.{next; shutdown} = + let open Lwt_syntax in + let* Resto_directory.Answer.{next; shutdown} = Attestable_slots.subscribe ctxt ~pkh in Tezos_rpc.Answer.return_stream {next; shutdown} diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index a2650c801f1dba6fd1c04b104b72a8faea8741f6..fc3cbdbf53171aa6cb20797ce87adf07cf3d7cd9 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -39,28 +39,6 @@ let is_attestable_slot_with_traps shards_store traps_fraction pkh return_false) assigned_shard_indexes -let subscribe ctxt ~pkh = - let open Node_context in - let module T = Types.Attestable_slots_watcher_table in - let proto_params = - Result.to_option - @@ get_proto_parameters ctxt ~level:(`Level (get_last_finalized_level ctxt)) - in - let attestable_slots_watcher_table = - get_attestable_slots_watcher_table ctxt - in - let watcher = T.get_or_init attestable_slots_watcher_table pkh proto_params in - let stream, stopper = Lwt_watcher.create_stream (T.get_stream watcher) in - let next () = Lwt_stream.get stream in - let shutdown () = - (* stop this stream, then possibly remove the whole watcher if last subscriber *) - Lwt_watcher.shutdown stopper ; - T.set_num_subscribers watcher (T.get_num_subscribers watcher - 1) ; - if T.get_num_subscribers watcher <= 0 then - T.remove attestable_slots_watcher_table pkh - in - Resto_directory.Answer.{next; shutdown} - let published_just_before_migration ctxt ~published_level = let open Result_syntax in let open Node_context in @@ -188,3 +166,109 @@ let may_notify_not_in_committee ctxt committee ~attestation_level = pkh ~attestation_level) subscribers + +(** [get_backfill_payload ctxt ~pkh] computes a compact “backfill” payload for + the freshly subscribed delegate [~pkh]. + + The payload can be used to pre-populate a structure that requires information from + the past of the creation of the stream corresponding to [~pkh] with: + - all attestable [slot_id]'s observed in a recent window; + - attestation levels where [pkh] has no assigned shards. + + The "backfill" window is calculated in the following way: + - Let [L] be the current last-finalized level at subscription time; + - `start = max (1, L - attestation_lag + 1)`, as this is the oldest level where + slots can be published and attested in the near future; + - `stop = L`, as this is the newest level where we did not have time to obtain + the information about the published slots. + + For each level in [start .. stop] (inclusively), we accumulate the attestation status + information about each slot id. *) +let get_backfill_payload ctxt ~pkh = + let open Lwt_result_syntax in + let open Node_context in + let module E = Types.Attestable_slots_watcher_table.Attestable_event in + let last_finalized_level = get_last_finalized_level ctxt in + let*? attestation_lag = + get_attestation_lag ctxt ~level:last_finalized_level + in + let published_levels = + let count = Int32.(to_int @@ min last_finalized_level attestation_lag) in + Stdlib.List.init count (fun i -> + Int32.(sub last_finalized_level (of_int i))) + |> List.rev + in + List.fold_left_es + (fun acc published_level -> + let attestation_level = + Int32.(pred (add published_level attestation_lag)) + in + let* committee = + Node_context.fetch_committees ctxt ~level:attestation_level + in + if is_not_in_committee committee ~pkh then + (* If not in committee, record and skip per-slot checks. *) + return + E. + { + acc with + no_shards_attestation_levels = + attestation_level :: acc.no_shards_attestation_levels; + } + else + let*? proto_params = + get_proto_parameters ctxt ~level:(`Level published_level) + in + (* Check each slot for attestability. *) + let number_of_slots = proto_params.number_of_slots in + let slot_indices = Stdlib.List.init number_of_slots Fun.id in + List.fold_left_es + (fun acc slot_index -> + let slot_id = + Types.Slot_id.{slot_level = published_level; slot_index} + in + let* is_attestable_slot_or_trap = + is_attestable_slot_or_trap ctxt ~pkh ~slot_id + in + match is_attestable_slot_or_trap with + | Some `Attestable_slot -> + return E.{acc with slot_ids = slot_id :: acc.slot_ids} + | Some `Trap | None -> return acc) + acc + slot_indices) + {slot_ids = []; no_shards_attestation_levels = []} + published_levels + +let subscribe ctxt ~pkh = + let open Lwt_syntax in + let open Node_context in + let module T = Types.Attestable_slots_watcher_table in + let proto_params = + Result.to_option + @@ get_proto_parameters ctxt ~level:(`Level (get_last_finalized_level ctxt)) + in + let attestable_slots_watcher_table = + get_attestable_slots_watcher_table ctxt + in + let watcher = T.get_or_init attestable_slots_watcher_table pkh proto_params in + let stream, stopper = Lwt_watcher.create_stream (T.get_stream watcher) in + let* () = + let* backfill_payload = get_backfill_payload ctxt ~pkh in + match backfill_payload with + | Ok backfill_payload -> + T.notify_backfill_payload + attestable_slots_watcher_table + pkh + ~backfill_payload ; + return_unit + | Error error -> Event.emit_backfill_error ~error + in + let next () = Lwt_stream.get stream in + let shutdown () = + (* stop this stream, then possibly remove the whole watcher if last subscriber *) + Lwt_watcher.shutdown stopper ; + T.set_num_subscribers watcher (T.get_num_subscribers watcher - 1) ; + if T.get_num_subscribers watcher <= 0 then + T.remove attestable_slots_watcher_table pkh + in + return Resto_directory.Answer.{next; shutdown} diff --git a/src/lib_dal_node/attestable_slots.mli b/src/lib_dal_node/attestable_slots.mli index 3e0f75caa0ad5213607e2bdc0b45742646a22fba..12af2bb57db4ac6f3f34627fa0c11d29b8212eeb 100644 --- a/src/lib_dal_node/attestable_slots.mli +++ b/src/lib_dal_node/attestable_slots.mli @@ -20,15 +20,6 @@ val is_attestable_slot_with_traps : Types.slot_id -> (bool, [> Errors.not_found | Errors.other]) result Lwt.t -(** [subscribe ctxt ~pkh] opens a [Resto_directory.Answer] stream that yields - [Types.Attestable_slots_watcher_table.Attestable_event.t] values. The stream - only emits items produced after subscription. *) -val subscribe : - Node_context.t -> - pkh:Signature.public_key_hash -> - Types.Attestable_slots_watcher_table.Attestable_event.t - Resto_directory.Answer.stream - (** Let M = migration level (last block of the old protocol). This function attempts to determine whether [~published_level] is included in `[M - lag + 1 .. M]` (inclusively), where [lag] is the lag at [~published_level]. @@ -54,3 +45,13 @@ val may_notify_attestable_slot_or_trap : If so, it emits an event to that [pkh]'s stream. *) val may_notify_not_in_committee : Node_context.t -> Committee_cache.committee -> attestation_level:int32 -> unit + +(** [subscribe ctxt ~pkh] opens a [Resto_directory.Answer] stream that yields + [Types.Attestable_slots_watcher_table.Attestable_event.t] values. The stream + only emits items produced after subscription. *) +val subscribe : + Node_context.t -> + pkh:Signature.public_key_hash -> + Types.Attestable_slots_watcher_table.Attestable_event.t + Resto_directory.Answer.stream + Lwt.t diff --git a/src/lib_dal_node/event.ml b/src/lib_dal_node/event.ml index 3ed0128bf97a54fde8e7e27d39c078e13b9ed0e2..69013d6daf2f09fadfcaa000f99aeec4acd794f4 100644 --- a/src/lib_dal_node/event.ml +++ b/src/lib_dal_node/event.ml @@ -1263,6 +1263,15 @@ open struct migration level." ~level:Warning ("level", Data_encoding.int32) + + let backfill_error = + declare_1 + ~section + ~name:"backfill_error" + ~msg:"Backfill failed with error: {error}" + ~level:Error + ~pp1:Error_monad.pp_print_trace + ("error", Error_monad.trace_encoding) end (* DAL node event emission functions *) @@ -1623,3 +1632,5 @@ let emit_reception_of_shard_detailed ~level ~slot_index ~shard_index ~sender = emit reception_of_shard_detailed (level, slot_index, shard_index, sender) let emit_skip_attesting_shards ~level = emit skip_attesting_shards level + +let emit_backfill_error ~error = emit backfill_error error diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index c377443acad1eb4811d189a83613ca692458833b..89c11e0659a1d9508fa113f260f340cebbeb764a 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -790,10 +790,27 @@ module SlotIdSet = module Attestable_slots_watcher_table = struct module Attestable_event = struct + type backfill_payload = { + slot_ids : slot_id list; + no_shards_attestation_levels : level list; + } + type t = | Attestable_slot of {slot_id : slot_id} | No_shards_assigned of {attestation_level : level} | Slot_has_trap of {slot_id : slot_id} + | Backfill of {backfill_payload : backfill_payload} + + let backfill_payload_encoding = + let open Data_encoding in + conv + (fun {slot_ids; no_shards_attestation_levels} -> + (slot_ids, no_shards_attestation_levels)) + (fun (slot_ids, no_shards_attestation_levels) -> + {slot_ids; no_shards_attestation_levels}) + (obj2 + (req "slot_ids" (list slot_id_encoding)) + (req "no_shards_attestation_levels" (list int32))) let encoding = let open Data_encoding in @@ -829,6 +846,16 @@ module Attestable_slots_watcher_table = struct (function | Slot_has_trap {slot_id} -> Some ((), slot_id) | _ -> None) (fun ((), slot_id) -> Slot_has_trap {slot_id}); + case + ~title:"backfill" + (Tag 3) + (obj2 + (req "kind" (constant "backfill")) + (req "backfill_payload" backfill_payload_encoding)) + (function + | Backfill {backfill_payload} -> Some ((), backfill_payload) + | _ -> None) + (fun ((), backfill_payload) -> Backfill {backfill_payload}); ] end @@ -905,6 +932,12 @@ module Attestable_slots_watcher_table = struct SlotIdSet.add watcher.notified_slots slot_id ; Lwt_watcher.notify watcher.stream (Slot_has_trap {slot_id})) + let notify_backfill_payload t pkh ~backfill_payload = + match Signature.Public_key_hash.Table.find t pkh with + | None -> () + | Some watcher -> + Lwt_watcher.notify watcher.stream (Backfill {backfill_payload}) + let remove = Signature.Public_key_hash.Table.remove let elements = Signature.Public_key_hash.Table.to_seq_keys diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index c2d8919df90d30f006ba6bdc6da09cc151f450dc..e0f5dec1e198d7f6fda028f163ced8344cb01f4c 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -413,6 +413,13 @@ end module Attestable_slots_watcher_table : sig module Attestable_event : sig + type backfill_payload = { + slot_ids : slot_id list; + (** All slots that should be marked attestable for this delegate *) + no_shards_attestation_levels : level list; + (** All attestation levels where this delegate has no shards *) + } + (** DAL attestability items emitted on a per-delegate stream. The delegate is implicit, as each stream is bound to a single delegate. *) type t = @@ -422,6 +429,8 @@ module Attestable_slots_watcher_table : sig (** the delegate has no assigned shards at [attestation_level] *) | Slot_has_trap of {slot_id : slot_id} (** the [slot_id] is a trap for the delegate *) + | Backfill of {backfill_payload : backfill_payload} + (** information about the delegate attestation status from the past *) val encoding : t Data_encoding.t end @@ -465,6 +474,14 @@ module Attestable_slots_watcher_table : sig val notify_slot_has_trap : t -> Signature.public_key_hash -> slot_id:slot_id -> unit + (** [notify_backfill_payload t pkh ~backfill_payload] pushes a [Backfill] event for + [~backfill_payload] to the stream for [pkh], if present. *) + val notify_backfill_payload : + t -> + Signature.public_key_hash -> + backfill_payload:Attestable_event.backfill_payload -> + unit + (** [remove t pkh] removes the watcher entry for [pkh] from [t] if present. *) val remove : t -> Signature.public_key_hash -> unit