From 534f8c3c5d5d33a959d8231a4c5c24be3fb9acc9 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Thu, 25 Sep 2025 10:06:58 +0100 Subject: [PATCH 1/4] Dal_node: Refactor commitments processing code in Block_handler This will be helpful for processing DAL commitments one level earlier than TB finalisation point. --- src/lib_dal_node/block_handler.ml | 92 +++++++++++++++++-------------- 1 file changed, 52 insertions(+), 40 deletions(-) diff --git a/src/lib_dal_node/block_handler.ml b/src/lib_dal_node/block_handler.ml index 415ae47224b2..d4f311bb50a8 100644 --- a/src/lib_dal_node/block_handler.ml +++ b/src/lib_dal_node/block_handler.ml @@ -364,6 +364,50 @@ let check_attesters_attested node_ctxt committee slot_to_committee parameters in return_unit +let process_commitments ctxt cctxt store proto_parameters block_level + (module Plugin : Dal_plugin.T) = + let open Lwt_result_syntax in + let* slot_headers = + (Plugin.get_published_slot_headers + ~block_level + cctxt [@profiler.record_s {verbosity = Notice} "slot_headers"]) + in + let* () = + (Slot_manager.store_slot_headers + ~number_of_slots:proto_parameters.Types.number_of_slots + ~block_level + slot_headers + store [@profiler.record_s {verbosity = Notice} "store_slot_headers"]) + in + (* If a slot header was posted to the L1 and we have the corresponding + data, post it to gossipsub. Note that this is done independently + of the profile. *) + let level_committee ~level = + let* res = + (Node_context.fetch_committees + ctxt + ~level [@profiler.record_f {verbosity = Notice} "fetch_committee"]) + in + return (Signature.Public_key_hash.Map.map fst res) + in + let slot_size = proto_parameters.cryptobox_parameters.slot_size in + let gs_worker = Node_context.get_gs_worker ctxt in + List.iter_es + (fun Dal_plugin.{slot_index; commitment; published_level} -> + let slot_id : Types.slot_id = + {slot_level = published_level; slot_index} + in + (Slot_manager.publish_slot_data + ctxt + ~level_committee + ~slot_size + gs_worker + proto_parameters + commitment + slot_id + [@profiler.aggregate_s {verbosity = Notice} "publish_slot_data"])) + slot_headers + let process_block_data ctxt cctxt store proto_parameters block_level (module Plugin : Dal_plugin.T) = let open Lwt_result_syntax in @@ -385,47 +429,15 @@ let process_block_data ctxt cctxt store proto_parameters block_level [@profiler.record_s {verbosity = Notice} "store_skip_list_cells"] else return_unit in - let* slot_headers = - (Plugin.get_published_slot_headers - ~block_level - cctxt [@profiler.record_s {verbosity = Notice} "slot_headers"]) - in - let* () = - (Slot_manager.store_slot_headers - ~number_of_slots:proto_parameters.Types.number_of_slots - ~block_level - slot_headers - store [@profiler.record_s {verbosity = Notice} "store_slot_headers"]) - in let* () = - (* If a slot header was posted to the L1 and we have the corresponding - data, post it to gossipsub. Note that this is done independently - of the profile. *) - let level_committee ~level = - let* res = - (Node_context.fetch_committees - ctxt - ~level [@profiler.record_f {verbosity = Notice} "fetch_committee"]) - in - return (Signature.Public_key_hash.Map.map fst res) - in - let slot_size = proto_parameters.cryptobox_parameters.slot_size in - let gs_worker = Node_context.get_gs_worker ctxt in - List.iter_es - (fun Dal_plugin.{slot_index; commitment; published_level} -> - let slot_id : Types.slot_id = - {slot_level = published_level; slot_index} - in - (Slot_manager.publish_slot_data - ctxt - ~level_committee - ~slot_size - gs_worker - proto_parameters - commitment - slot_id - [@profiler.aggregate_s {verbosity = Notice} "publish_slot_data"])) - slot_headers + (process_commitments + ctxt + cctxt + store + proto_parameters + block_level + (module Plugin) + [@profiler.record_s {verbosity = Notice} "process_commitments"]) in let*? dal_attestation = (Plugin.dal_attestation -- GitLab From 0a90a589ba40ab2923aa33b5e5030b86511594e2 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Fri, 3 Oct 2025 14:14:25 +0100 Subject: [PATCH 2/4] Dal_node: Block_handler: Add new_finalized_payload_level --- src/lib_dal_node/amplificator.ml | 6 +-- src/lib_dal_node/block_handler.ml | 66 ++++++++++++++++++++----------- src/lib_dal_node/crawler.ml | 5 +-- 3 files changed, 46 insertions(+), 31 deletions(-) diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index c0825ba042e2..a91e5930a706 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -433,12 +433,12 @@ let determine_amplification_delays node_ctxt = let+ parameters = Node_context.get_proto_parameters ~level:`Last_proto node_ctxt in - (* The propagation window is the attestation lag minus 4 levels, because: - - the daemon waits 2 levels for the head to be finalized + (* The propagation window is the attestation lag minus 3 levels, because: + - the daemon waits 1 level for the head to be finalized - attestation operations are included in the block at the next level - the baker asks attestation information one level in advance *) let propagation_period = - (parameters.attestation_lag - 4) + (parameters.attestation_lag - 3) * Int64.to_int parameters.minimal_block_delay in (* We split this window in 3: one third for the normal propagation round, one diff --git a/src/lib_dal_node/block_handler.ml b/src/lib_dal_node/block_handler.ml index d4f311bb50a8..8e6982e161f6 100644 --- a/src/lib_dal_node/block_handler.ml +++ b/src/lib_dal_node/block_handler.ml @@ -129,8 +129,9 @@ let remove_unattested_slots_and_shards proto_parameters ctxt ~published_level remove_slots_and_shards ~slot_size store slot_id) (0 -- (number_of_slots - 1)) -(* Here [block_level] is the level of the currently processed block, that is, - when the DAL node is up-to-date, the L1 head level minus 2. *) +(* Here [block_level] is the same as in [new_finalized_payload_level]. When the + DAL node is up-to-date and the current L1 head is at level L, we call this + function with [block_level = L - 1]. *) let may_update_topics ctxt proto_parameters ~block_level = let open Lwt_result_syntax in (* If a slot is published in a block at some level [n], it is important to @@ -140,18 +141,17 @@ let may_update_topics ctxt proto_parameters ~block_level = get new peers for these (possibly new) topics, this must be done in advance. - We do it [attestation_lag] levels in advance. This means the node has the + We do it [attestation_lag + 1] levels in advance. This means the node has the current "block time" (so 5-10 seconds) to prepare. This should be sufficient. Note that this does not affect processing messages for levels before [n + - attestation_lag], because the node does not unsubscribe, and message + attestation_lag - 1], because the node does not unsubscribe, and message validation does not depend on the subscribed topics. *) let+ committee = let level = - Int32.add - block_level - (Int32.of_int proto_parameters.Types.attestation_lag) + Int32.( + succ @@ add block_level (of_int proto_parameters.Types.attestation_lag)) in Node_context.fetch_committees ctxt ~level in @@ -408,7 +408,7 @@ let process_commitments ctxt cctxt store proto_parameters block_level [@profiler.aggregate_s {verbosity = Notice} "publish_slot_data"])) slot_headers -let process_block_data ctxt cctxt store proto_parameters block_level +let process_finalized_block_data ctxt cctxt store proto_parameters block_level (module Plugin : Dal_plugin.T) = let open Lwt_result_syntax in let* block_info = @@ -429,16 +429,6 @@ let process_block_data ctxt cctxt store proto_parameters block_level [@profiler.record_s {verbosity = Notice} "store_skip_list_cells"] else return_unit in - let* () = - (process_commitments - ctxt - cctxt - store - proto_parameters - block_level - (module Plugin) - [@profiler.record_s {verbosity = Notice} "process_commitments"]) - in let*? dal_attestation = (Plugin.dal_attestation block_info [@profiler.record_f {verbosity = Notice} "dal_attestation"]) @@ -517,17 +507,16 @@ let process_block ctxt cctxt l1_crawler proto_parameters finalized_shell_header in let* () = if proto_parameters.Types.feature_enable then - let* () = may_update_topics ctxt proto_parameters ~block_level in if Node_context.is_bootstrap_node ctxt then return_unit else - process_block_data + process_finalized_block_data ctxt cctxt store proto_parameters block_level (module Plugin) - [@profiler.record_s {verbosity = Notice} "process_block_data"] + [@profiler.record_s {verbosity = Notice} "process_finalized_block_data"] else return_unit in let*? block_round = Plugin.get_round finalized_shell_header.fitness in @@ -579,12 +568,36 @@ let rec try_process_block ~retries ctxt cctxt l1_crawler proto_parameters finalized_block_hash | _ -> return res +(** [new_finalized_payload_level ctxt cctxt block_level] processes a new finalized + payload level. It performs only slot (header) publication tasks: store + published DAL slot headers and publish shards to Gossipsub. It does NOT run + slot attestation tasks (like cleanup or skip-list updates). *) +let new_finalized_payload_level ctxt cctxt block_level = + let open Lwt_result_syntax in + if Int32.equal block_level 1l then return_unit + else + let*? (module Plugin), proto_parameters = + Node_context.get_plugin_and_parameters_for_level ctxt ~level:block_level + in + let store = Node_context.get_store ctxt in + let* () = + if proto_parameters.Types.feature_enable then + may_update_topics ctxt proto_parameters ~block_level + else return_unit + in + (process_commitments + ctxt + cctxt + store + proto_parameters + block_level + (module Plugin) + [@profiler.record_s {verbosity = Notice} "process_commitments"]) + (* Process a finalized head and store *finalized* published slot headers indexed by block hash. A slot header is considered finalized when it is in a block with at least two other blocks on top of it, as guaranteed by - Tenderbake. Note that this means that shard propagation is delayed by two - levels with respect to the publication level of the corresponding slot - header. However, plugin registration is based on the latest L1 head not + Tenderbake. However, plugin registration is based on the latest L1 head not on the finalized block. This ensures new plugins are registered immediately after migration, rather than waiting for finalization. *) let new_finalized_head ctxt cctxt l1_crawler cryptobox finalized_block_hash @@ -600,6 +613,11 @@ let new_finalized_head ctxt cctxt l1_crawler cryptobox finalized_block_hash in Node_context.may_add_plugin ctxt cctxt ~proto_level ~block_level in + + (* If L = HEAD~2, then HEAD~1 is payload final. *) + let finalized_payload_level = Int32.succ level in + let* () = new_finalized_payload_level ctxt cctxt finalized_payload_level in + let*? proto_parameters = Node_context.get_proto_parameters ctxt ~level:(`Level level) in diff --git a/src/lib_dal_node/crawler.ml b/src/lib_dal_node/crawler.ml index abb5d0153c4e..92962c10bbb9 100644 --- a/src/lib_dal_node/crawler.ml +++ b/src/lib_dal_node/crawler.ml @@ -121,10 +121,7 @@ let finalized_heads_monitor ~name ~last_notified_level ~last_seen_head_ref Dal_metrics.new_layer1_head ~head_level:shell_header_level ; cache_shell_header headers_cache hash shell_header ; if shell_header_level <= !last_notified_level then return_unit - else if Int32.equal shell_header_level 1l then ( - stream_push (Some (hash, shell_header)) ; - last_notified_level := shell_header_level ; - return_unit) + else if shell_header_level <= 2l then return_unit else let* pred_hash, pred_level = get_predecessor crawler_lib hash shell_header_level -- GitLab From 3c537a30f944189fefe501ddc603d2b599bcfab2 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Fri, 3 Oct 2025 15:15:26 +0100 Subject: [PATCH 3/4] Tezt: Dal: Add one level reorganisation test for slot publishing --- tezt/tests/dal.ml | 247 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 217 insertions(+), 30 deletions(-) diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index ff399ca872c4..2bffb1db24dc 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -8751,31 +8751,6 @@ let test_new_attester_attests protocol dal_parameters _cryptobox node client Log.info "Bake blocks up to level %d" (published_level - 1) ; let* () = bake_for ~count:(published_level - 1 - level) client in - let* level = Client.level client in - Log.info - "Current level is %d, publish a slot for level %d" - level - published_level ; - let* _ = - Helpers.publish_and_store_slot - client - producer - Constant.bootstrap2 - ~index:slot_index - @@ Helpers.make_slot ~slot_size "SLOTDATA" - in - let* () = bake_for client in - let* manager_ops = - Node.RPC.call node - @@ RPC.get_chain_block_operations_validation_pass ~validation_pass:3 () - in - Check.( - (JSON.as_list manager_ops |> List.length <> 0) - int - ~error_msg: - "Expected the commitment to be published, but no manager operation was \ - included.") ; - let* id_attester = peer_id attester in let* id_producer = peer_id producer in let check_graft_promises = @@ -8802,12 +8777,32 @@ let test_new_attester_attests protocol dal_parameters _cryptobox node client ~published_level ~slot_index in - + let* level = Client.level client in Log.info - "Bake another block, so that the attester node fetches the DAL committee \ - for level %d and changes topics" - first_level_in_committee ; + "Current level is %d, publish a slot for level %d" + level + published_level ; + let* _ = + Helpers.publish_and_store_slot + client + producer + Constant.bootstrap2 + ~index:slot_index + @@ Helpers.make_slot ~slot_size "SLOTDATA" + in let* () = bake_for client in + (* At this point the attester node fetches the DAL committee for level + [first_level_in_committee] and changes topics. *) + let* manager_ops = + Node.RPC.call node + @@ RPC.get_chain_block_operations_validation_pass ~validation_pass:3 () + in + Check.( + (JSON.as_list manager_ops |> List.length <> 0) + int + ~error_msg: + "Expected the commitment to be published, but no manager operation was \ + included.") ; Log.info "Waiting for grafting of the attester - producer connection" ; (* This is important because the attester and the producer should connect and @@ -8823,7 +8818,7 @@ let test_new_attester_attests protocol dal_parameters _cryptobox node client let* () = wait_for_shards_promises in Log.info "Bake blocks up to level %d" (first_level_in_committee - 1) ; - let* () = bake_for ~count:(lag - 4) client in + let* () = bake_for ~count:(lag - 3) client in let* level = Client.level client in Log.info "Current level is %d" level ; Check.( @@ -11159,6 +11154,191 @@ let test_ignore_topics_wrong_env _protocol _parameters _cryptobox _node _client variable to ignore topics %s was not set.*" Dal_node.ignore_topics_environment_variable) +let wait_for_branch_switch node level = + let filter json = + match JSON.(json |-> "level" |> as_int_opt) with + | Some l when l = level -> Some () + | Some _ -> None + | None -> None + in + Node.wait_for node "branch_switch.v0" filter + +let baker_at_round_n ?level round client : string Lwt.t = + let* json = + Client.RPC.call client @@ RPC.get_chain_block_helper_baking_rights ?level () + in + match JSON.(json |=> round |-> "delegate" |> as_string_opt) with + | Some delegate_id -> return delegate_id + | None -> + Test.fail + "Could not find the baker at round %d for level %s" + round + (match level with None -> "head" | Some level -> string_of_int level) + +(* Simulate a fork at level n where two competing blocks exist: + - n + - [node1]: A1 (round 0, no DAL commitment) + - [node2]: A2 (round 1, with DAL commitment) + Then, build one more block on top of A2: + - n+1 + - [node2]: B (on top of A2) + After reconnecting the nodes, node1 switches to the A2 -> B branch (due to + higher fitness). + Then, test that shards are propagated after one level is baked on top of + the block which included the commitment publication, at (n+1). *) +let test_dal_one_level_reorg protocol dal_parameters _cryptobox node1 client1 + dal_bootstrap = + (* Helpers / Constants *) + let slot_size = dal_parameters.Dal.Parameters.cryptobox.slot_size in + let slot_index = 0 in + + (* Spin up a second L1 node and connect it *) + let nodes_args = Node.[Synchronisation_threshold 0; Connections 1] in + let* node2, client2 = + Client.init_with_protocol ~protocol ~nodes_args `Client () + in + let* () = + Client.Admin.connect_address ~endpoint:(Node node1) ~peer:node2 client1 + in + + (* DAL Bootstrap *) + let* () = check_profiles ~__LOC__ dal_bootstrap ~expected:Dal_RPC.Bootstrap in + Log.info "Bootstrap DAL node is running" ; + let peers = [Dal_node.listen_addr dal_bootstrap] in + + (* DAL Producer on [node1] *) + let producer = Dal_node.create ~name:"producer" ~node:node1 () in + let* () = + Dal_node.init_config ~operator_profiles:[slot_index] ~peers producer + in + let* () = Dal_node.run ~wait_ready:true producer in + let* () = + check_profiles + ~__LOC__ + producer + ~expected:Dal_RPC.(Controller [Operator slot_index]) + in + Log.info "Slot producer DAL node is running" ; + + (* DAL Attester on [node1] *) + let* proto_params = + Node.RPC.call node1 @@ RPC.get_chain_block_context_constants () + in + let consensus_rights_delay = + JSON.(proto_params |-> "consensus_rights_delay" |> as_int) + in + let blocks_per_cycle = JSON.(proto_params |-> "blocks_per_cycle" |> as_int) in + let* balance = + Client.get_balance_for ~account:Constant.bootstrap1.alias client1 + in + let amount = Tez.(balance - one) in + let* new_account = Client.gen_and_show_keys client1 in + let* () = + Client.transfer + ~giver:Constant.bootstrap1.alias + ~receiver:new_account.alias + ~amount + ~burn_cap:Tez.one + client1 + in + let* () = bake_for client1 in + let*! () = Client.reveal ~fee:Tez.one ~src:new_account.alias client1 in + let* () = bake_for client1 in + let* () = Client.register_key new_account.alias client1 in + let* () = bake_for client1 in + let* () = Client.stake ~staker:new_account.alias Tez.(amount /! 2L) client1 in + let attester = Dal_node.create ~name:"attester" ~node:node1 () in + let* () = + Dal_node.init_config + ~attester_profiles:[new_account.public_key_hash] + ~peers + attester + in + let* () = Dal_node.run ~event_level:`Debug attester in + let client1 = Client.with_dal_node client1 ~dal_node:attester in + + (* Compute the DAL attestation level for publication *) + let num_cycles = 1 + consensus_rights_delay in + let* level = Client.level client1 in + let lag = dal_parameters.attestation_lag in + let attestation_level = (num_cycles * blocks_per_cycle) + 1 in + let published_level = attestation_level + 1 - lag in + Log.info "Bake blocks up to level %d" (published_level - 1) ; + let* () = bake_for ~count:(published_level - 1 - level) client1 in + let* current_level = Client.level client1 in + Log.info + "current_level = %d; published_level = %d; attestation_level = %d" + current_level + published_level + attestation_level ; + + (* Align node2 to current level, then disconnect to fork *) + let* _ = Node.wait_for_level node2 current_level in + let* node2_id = Node.wait_for_identity node2 in + let* () = + Client.Admin.kick_peer ~endpoint:(Node node1) ~peer:node2_id client1 + in + + (* Branch 1 on node1: bake A1 at round 0 (lower fitness) *) + let* baker_low_round = baker_at_round_n ~level:published_level 0 client1 in + let* () = bake_for ~delegates:(`For [baker_low_round]) client1 in + Log.info "node1 baked A1 at (level = %d, round = 0)" published_level ; + + (* Branch 2 on node2: bake A2 with commitment at round 1, then B *) + let* _ = + Helpers.publish_and_store_slot + client2 + producer + Constant.bootstrap2 + ~index:slot_index + @@ Helpers.make_slot ~slot_size "REORG-SLOTDATA" + in + let* baker_high_round = baker_at_round_n ~level:published_level 1 client2 in + let* () = bake_for ~delegates:(`For [baker_high_round]) client2 in + Log.info "node1 baked A2 at (level = %d, round = 1)" published_level ; + let* manager_ops = + Node.RPC.call node2 + @@ RPC.get_chain_block_operations_validation_pass ~validation_pass:3 () + in + Check.( + (JSON.as_list manager_ops |> List.length <> 0) + int + ~error_msg: + "Expected the commitment to be published, but no manager operation was \ + included.") ; + let* assigned_shard_indexes = + Dal_RPC.( + call attester + @@ get_assigned_shard_indices + ~level:attestation_level + ~pkh:new_account.public_key_hash) + in + let wait_for_shards_promises = + wait_for_shards_promises + ~dal_node:attester + ~storage_profile:`Cache_only + ~shards:assigned_shard_indexes + ~published_level + ~slot_index + in + let* () = bake_for client2 in + Log.info "node2 baked B at level = %d" (published_level + 1) ; + + (* Reconnect & wait for node1 to switch to node2’s higher-fitness branch *) + let wait_switch = wait_for_branch_switch node1 published_level in + let* () = + Client.Admin.connect_address ~endpoint:(Node node1) ~peer:node2 client1 + in + let* () = wait_switch in + Log.info "node1 switched to branch with round = 1 at level %d" published_level ; + + let* _ = Node.wait_for_level node1 (published_level + 1) in + Log.info "node1 synchronised with node2" ; + Log.info "Waiting for attester to receive its assigned shards" ; + let* () = wait_for_shards_promises in + + unit + let register ~protocols = (* Tests with Layer1 node only *) scenario_with_layer1_node @@ -11492,6 +11672,13 @@ let register ~protocols = "new attester attests" test_new_attester_attests protocols ; + scenario_with_layer1_and_dal_nodes + ~bootstrap_profile:true + ~l1_history_mode:Default_with_refutation + ~number_of_slots:1 + "publish slot in one level reorganisation" + test_dal_one_level_reorg + protocols ; scenario_with_layer1_and_dal_nodes ~number_of_slots:1 ~operator_profiles:[0] -- GitLab From 5825d96e5833a074fa3661230bccfb225035cd6f Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Fri, 3 Oct 2025 10:24:40 +0100 Subject: [PATCH 4/4] Dal_node: Update CHANGELOG --- CHANGES.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 0e8e1ef16cf3..51f9718d58e7 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -111,6 +111,10 @@ Data Availability Layer (DAL) DAL node ~~~~~~~~ +- The DAL node now starts propagating shards one level after the inclusion of the + corresponding published slot header operation (i.e., when the operation is finalized), + instead of two levels after, when the block is finalized. (MR :gl:`!19366`) + - **Breaking change** Enforced stricter validation for the JSON configuration file. Previously, the parser would silently ignore any content that appeared after the first valid JSON object. Now, any extraneous data will cause the -- GitLab