From 383e529d7619f8338b43e91fdf11e26f73f9df4c Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 18 Nov 2025 16:01:43 +0000 Subject: [PATCH 1/5] Dal_node: Attestable_slots: Change fold_left_es to a filter_map_ep computation --- src/lib_dal_node/attestable_slots.ml | 29 ++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index c1f5dd6bbd35..27b432a11678 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -233,20 +233,21 @@ let get_backfill_payload ctxt ~pkh = (* Check each slot for attestability. *) let number_of_slots = proto_params.number_of_slots in let slot_indices = Stdlib.List.init number_of_slots Fun.id in - List.fold_left_es - (fun acc slot_index -> - let slot_id = - Types.Slot_id.{slot_level = published_level; slot_index} - in - let* is_attestable_slot_or_trap = - is_attestable_slot_or_trap ctxt ~pkh ~slot_id - in - match is_attestable_slot_or_trap with - | Some `Attestable_slot -> - return E.{acc with slot_ids = slot_id :: acc.slot_ids} - | Some `Trap | None -> return acc) - acc - slot_indices) + let* new_slot_ids = + List.filter_map_ep + (fun slot_index -> + let slot_id = + Types.Slot_id.{slot_level = published_level; slot_index} + in + let* is_attestable_slot_or_trap = + is_attestable_slot_or_trap ctxt ~pkh ~slot_id + in + match is_attestable_slot_or_trap with + | Some `Attestable_slot -> return_some slot_id + | Some `Trap | None -> return_none) + slot_indices + in + return E.{acc with slot_ids = List.append new_slot_ids acc.slot_ids}) {slot_ids = []; no_shards_attestation_levels = []} published_levels -- GitLab From 30cb400a001aa58532443873153aa69baeea8425 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 18 Nov 2025 16:02:41 +0000 Subject: [PATCH 2/5] Dal_node: Attestable_slots: Remove reverse order for slots published levels backfill result --- src/lib_dal_node/attestable_slots.ml | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index 27b432a11678..7178cc505ac2 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -207,7 +207,6 @@ let get_backfill_payload ctxt ~pkh = in Stdlib.List.init count (fun i -> Int32.(sub (succ last_finalized_level) (of_int i))) - |> List.rev in List.fold_left_es (fun acc published_level -> -- GitLab From 49dd7aeb7d8cae873393796bb0e5871a89a8457d Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Fri, 14 Nov 2025 17:44:56 +0000 Subject: [PATCH 3/5] Dal_node: Attestable_slots: Create a new stream (with backfill) for each subscriber --- src/lib_dal_node/attestable_slots.ml | 34 +++++++++++++++++++--------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index 7178cc505ac2..79a0b24a0cd7 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -250,6 +250,25 @@ let get_backfill_payload ctxt ~pkh = {slot_ids = []; no_shards_attestation_levels = []} published_levels +(** [create_new_stream_with_backfill stream backfill_payload_opt] builds a new + event stream that delivers at most one [Backfill] event before any live items, + then transparently forwards all events from the provided live [stream]. *) +let create_new_stream_with_backfill stream backfill_payload_opt = + let open Lwt_syntax in + let* backfill_stream = + match backfill_payload_opt with + | Ok backfill_payload -> + let stream, push = Lwt_stream.create () in + (* Start with a [Backfill] event *) + push (Some (Types.Attestable_event.Backfill {backfill_payload})) ; + push None ; + return stream + | Error error -> + let* () = Event.emit_backfill_error ~error in + return (Lwt_stream.of_list []) + in + return @@ Lwt_stream.append backfill_stream stream + let subscribe ctxt ~pkh = let open Lwt_syntax in let open Node_context in @@ -263,18 +282,11 @@ let subscribe ctxt ~pkh = in let watcher = T.get_or_init attestable_slots_watcher_table pkh proto_params in let stream, stopper = Lwt_watcher.create_stream (T.get_stream watcher) in - let* () = - let* backfill_payload = get_backfill_payload ctxt ~pkh in - match backfill_payload with - | Ok backfill_payload -> - T.notify_backfill_payload - attestable_slots_watcher_table - pkh - ~backfill_payload ; - return_unit - | Error error -> Event.emit_backfill_error ~error + let* backfill_stream = + let* backfill_payload_opt = get_backfill_payload ctxt ~pkh in + create_new_stream_with_backfill stream backfill_payload_opt in - let next () = Lwt_stream.get stream in + let next () = Lwt_stream.get backfill_stream in let shutdown () = (* stop this stream, then possibly remove the whole watcher if last subscriber *) Lwt_watcher.shutdown stopper ; -- GitLab From 9322338fdc221e0f71f7a48cc0cd7050258da88e Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 18 Nov 2025 16:24:08 +0000 Subject: [PATCH 4/5] Dal_node: Attestable_slots: Extract common logic from map function optimisation For the same published level, there is information (like attestation lag, the shards store etc.) which is common, so we extract it to not recompute it every slot index. --- src/lib_dal_node/attestable_slots.ml | 79 ++++++++++++++++++---------- 1 file changed, 52 insertions(+), 27 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index 79a0b24a0cd7..d729819a723e 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -71,6 +71,34 @@ let attested_just_after_migration ctxt ~attested_level = && migration_level < attested_level && attested_level <= Int32.add migration_level new_lag) +let check_is_attestable_slot_or_trap ~pkh ~(slot_id : Types.slot_id) + ~last_known_parameters ~shard_indices ~shards_store + ~number_of_assigned_shards = + let open Lwt_result_syntax in + if number_of_assigned_shards = 0 || slot_id.slot_level < 1l then return_none + else + let* number_stored_shards = + Store.Shards.number_of_shards_available shards_store slot_id shard_indices + in + let all_stored = number_stored_shards = number_of_assigned_shards in + if not all_stored then return_none + else if not last_known_parameters.Types.incentives_enable then + return_some `Attestable_slot + else + let* is_attestable_slot_with_traps = + is_attestable_slot_with_traps + shards_store + last_known_parameters.traps_fraction + pkh + shard_indices + slot_id + |> Errors.to_option_tzresult + in + match is_attestable_slot_with_traps with + | Some true -> return_some `Attestable_slot + | Some false -> return_some `Trap + | None -> return_none + (** [is_attestable_slot_or_trap ctxt ~pkh ~slot_id] decides whether [~slot_id] is attestable for delegate [pkh] at the time of calling, and when DAL incentives are enabled, whether it should be considered as a trap. *) @@ -95,32 +123,13 @@ let is_attestable_slot_or_trap ctxt ~pkh ~(slot_id : Types.slot_id) = fetch_assigned_shard_indices ctxt ~pkh ~level:attestation_level in let number_of_assigned_shards = List.length shard_indices in - if number_of_assigned_shards = 0 || published_level < 1l then return_none - else - let* number_stored_shards = - Store.Shards.number_of_shards_available - shards_store - slot_id - shard_indices - in - let all_stored = number_stored_shards = number_of_assigned_shards in - if not all_stored then return_none - else if not last_known_parameters.incentives_enable then - return_some `Attestable_slot - else - let* is_attestable_slot_with_traps = - is_attestable_slot_with_traps - shards_store - last_known_parameters.traps_fraction - pkh - shard_indices - slot_id - |> Errors.to_option_tzresult - in - match is_attestable_slot_with_traps with - | Some true -> return_some `Attestable_slot - | Some false -> return_some `Trap - | None -> return_none + check_is_attestable_slot_or_trap + ~pkh + ~slot_id + ~last_known_parameters + ~shard_indices + ~shards_store + ~number_of_assigned_shards let may_notify_attestable_slot_or_trap ctxt ~(slot_id : Types.slot_id) = let open Lwt_result_syntax in @@ -232,6 +241,16 @@ let get_backfill_payload ctxt ~pkh = (* Check each slot for attestability. *) let number_of_slots = proto_params.number_of_slots in let slot_indices = Stdlib.List.init number_of_slots Fun.id in + let*? lag = get_attestation_lag ctxt ~level:published_level in + let*? last_known_parameters = + let attested_level = Int32.(add published_level lag) in + get_proto_parameters ctxt ~level:(`Level attested_level) + in + let shards_store = Store.shards (get_store ctxt) in + let* shard_indices = + fetch_assigned_shard_indices ctxt ~pkh ~level:attestation_level + in + let number_of_assigned_shards = List.length shard_indices in let* new_slot_ids = List.filter_map_ep (fun slot_index -> @@ -239,7 +258,13 @@ let get_backfill_payload ctxt ~pkh = Types.Slot_id.{slot_level = published_level; slot_index} in let* is_attestable_slot_or_trap = - is_attestable_slot_or_trap ctxt ~pkh ~slot_id + check_is_attestable_slot_or_trap + ~pkh + ~slot_id + ~last_known_parameters + ~shard_indices + ~shards_store + ~number_of_assigned_shards in match is_attestable_slot_or_trap with | Some `Attestable_slot -> return_some slot_id -- GitLab From 5e9f480c7798e20f8a9b2944b48a4b43b9c0d084 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 19 Nov 2025 11:27:43 +0000 Subject: [PATCH 5/5] Dal_node: optimize get_backfill_payload by only considering published slots Co-authored-by: Eugen Zalinescu Eugen came up with the idea, I merely rebased it on the branch and nitpicked. --- src/lib_dal_node/attestable_slots.ml | 78 +++++++++++++++++----------- 1 file changed, 48 insertions(+), 30 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index d729819a723e..8a0744b07d01 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -238,40 +238,58 @@ let get_backfill_payload ctxt ~pkh = let*? proto_params = get_proto_parameters ctxt ~level:(`Level published_level) in - (* Check each slot for attestability. *) - let number_of_slots = proto_params.number_of_slots in - let slot_indices = Stdlib.List.init number_of_slots Fun.id in - let*? lag = get_attestation_lag ctxt ~level:published_level in - let*? last_known_parameters = - let attested_level = Int32.(add published_level lag) in - get_proto_parameters ctxt ~level:(`Level attested_level) + let slot_indices = + Stdlib.List.init proto_params.number_of_slots Fun.id in - let shards_store = Store.shards (get_store ctxt) in - let* shard_indices = - fetch_assigned_shard_indices ctxt ~pkh ~level:attestation_level - in - let number_of_assigned_shards = List.length shard_indices in - let* new_slot_ids = - List.filter_map_ep - (fun slot_index -> - let slot_id = - Types.Slot_id.{slot_level = published_level; slot_index} - in - let* is_attestable_slot_or_trap = - check_is_attestable_slot_or_trap - ~pkh - ~slot_id - ~last_known_parameters - ~shard_indices - ~shards_store - ~number_of_assigned_shards + let store = get_store ctxt in + let candidate_slot_indices = + let statuses_cache = Store.statuses_cache store in + List.filter_map + (fun index -> + let status_opt = + Store.Statuses_cache.get_slot_status + statuses_cache + {slot_level = published_level; slot_index = index} in - match is_attestable_slot_or_trap with - | Some `Attestable_slot -> return_some slot_id - | Some `Trap | None -> return_none) + match status_opt with + | Some `Unpublished -> None + | _ -> Some index) slot_indices in - return E.{acc with slot_ids = List.append new_slot_ids acc.slot_ids}) + if candidate_slot_indices = [] then return acc + else + (* Check each slot for attestability. *) + let*? lag = get_attestation_lag ctxt ~level:published_level in + let*? last_known_parameters = + let attested_level = Int32.(add published_level lag) in + get_proto_parameters ctxt ~level:(`Level attested_level) + in + let shards_store = Store.shards store in + let* shard_indices = + fetch_assigned_shard_indices ctxt ~pkh ~level:attestation_level + in + let number_of_assigned_shards = List.length shard_indices in + let* new_slot_ids = + List.filter_map_ep + (fun slot_index -> + let slot_id = + Types.Slot_id.{slot_level = published_level; slot_index} + in + let* is_attestable_slot_or_trap = + check_is_attestable_slot_or_trap + ~pkh + ~slot_id + ~last_known_parameters + ~shard_indices + ~shards_store + ~number_of_assigned_shards + in + match is_attestable_slot_or_trap with + | Some `Attestable_slot -> return_some slot_id + | Some `Trap | None -> return_none) + candidate_slot_indices + in + return E.{acc with slot_ids = List.append new_slot_ids acc.slot_ids}) {slot_ids = []; no_shards_attestation_levels = []} published_levels -- GitLab