diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index a8264cf3b5034654a2db21b8203ea9a3ca4bfe94..a83375895591a6311ddfc3c2b15f16b4866569d5 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -3941,6 +3941,28 @@ let waiter_successful_shards_app_notification l1_committee dal_node commitment | exception Not_found -> Test.fail "Should not happen as %s is part of the committee" pkh +(* Create two promises for Grafts between [node1] and [node2] on topic + [(slot_index, pkh)], one in each direction. *) +let check_grafts ~number_of_slots ~slot_index (node1, node1_peer_id) + (node2, node2_peer_id) pkh = + (* The connections have no reason to be grafted on other slot indices than + the one they are both subscribed to, so we instruct + [check_events_with_topic] to skip all events but the one for [index]. *) + let already_seen_slots = + Array.init number_of_slots (fun index -> slot_index <> index) + in + let check_graft on_node to_peer_id pkh = + check_events_with_topic + ~event_with_topic:(Graft to_peer_id) + on_node + ~num_slots:number_of_slots + ~already_seen_slots + pkh + in + let graft_from_node1 = check_graft node1 node2_peer_id pkh in + let graft_from_node2 = check_graft node2 node1_peer_id pkh in + [graft_from_node1; graft_from_node2] + let test_dal_node_p2p_connection_and_disconnection _protocol _parameters _cryptobox node _client dal_node1 = let dal_node2 = Dal_node.create ~node () in @@ -5165,7 +5187,7 @@ let test_attestation_through_p2p _protocol dal_parameters _cryptobox node client *) let index = 0 in let slot_size = dal_parameters.Dal.Parameters.cryptobox.slot_size in - let num_slots = dal_parameters.Dal.Parameters.number_of_slots in + let number_of_slots = dal_parameters.Dal.Parameters.number_of_slots in let attestation_lag = dal_parameters.Dal.Parameters.attestation_lag in let number_of_shards = dal_parameters.Dal.Parameters.cryptobox.number_of_shards @@ -5208,34 +5230,16 @@ let test_attestation_through_p2p _protocol dal_parameters _cryptobox node client let client = Client.with_dal_node client ~dal_node:attester in - (* The connections between attesters and the slot producer have no - reason to be grafted on other slot indices than the one the slot - producer is subscribed to, so we instruct - [check_events_with_topic] to skip all events but the one for - [index]. *) - let already_seen_slots = - Array.init num_slots (fun slot_index -> slot_index <> index) - in (* Wait for a GRAFT message between the attester and the producer, in any direction. *) let check_graft pkh = - let graft_from_attester = - check_events_with_topic - ~event_with_topic:(Graft attester_peer_id) - producer - ~num_slots - ~already_seen_slots - pkh - in - let graft_from_producer = - check_events_with_topic - ~event_with_topic:(Graft producer_peer_id) - attester - ~num_slots - ~already_seen_slots - pkh - in - Lwt.pick [graft_from_attester; graft_from_producer] + Lwt.pick + @@ check_grafts + ~number_of_slots + ~slot_index:index + (attester, attester_peer_id) + (producer, producer_peer_id) + pkh in let check_graft_promises = List.map check_graft all_pkhs in Log.info "Waiting for grafting of the attester - producer connection" ; @@ -5258,7 +5262,7 @@ let test_attestation_through_p2p _protocol dal_parameters _cryptobox node client - the producer on all topics with slot_index=index *) let* () = let expected topic_pkh = - Seq.ints 0 |> Seq.take num_slots + Seq.ints 0 |> Seq.take number_of_slots |> Seq.map (fun topic_slot_index -> ( {Dal_RPC.topic_slot_index; topic_pkh}, bootstrap_peer_id @@ -5863,31 +5867,17 @@ module Amplification = struct other slot indices than the one the slot producer is subscribed to, so we instruct [check_events_with_topic] to skip all events but the one for [slot_index]. *) - let already_seen_slots = - Array.init number_of_slots (fun index -> slot_index <> index) - in (* Wait for a GRAFT message between an attester and either an operator (legacy producer) or an observer, in any direction. *) let check_graft_promise (operator_or_observer, peer_id) attester = - let graft_from_attester_promise = - let* attester_peer_id = attester_peer_id attester in - check_events_with_topic - ~event_with_topic:(Graft attester_peer_id) - operator_or_observer - ~num_slots:number_of_slots - ~already_seen_slots - attester.pkh - in - let graft_from_operator_or_observer_promise = - check_events_with_topic - ~event_with_topic:(Graft peer_id) - attester.dal_node - ~num_slots:number_of_slots - ~already_seen_slots - attester.pkh - in + let* attester_peer_id = attester_peer_id attester in Lwt.pick - [graft_from_attester_promise; graft_from_operator_or_observer_promise] + @@ check_grafts + ~number_of_slots + ~slot_index + (operator_or_observer, peer_id) + (attester.dal_node, attester_peer_id) + attester.pkh in (* We don't care if the slot producer establishes full connections with the DAL nodes it is about to ban so we only wait for the @@ -6135,7 +6125,7 @@ module Amplification = struct observer performs an amplification. *) let index = 0 in let slot_size = dal_parameters.Dal.Parameters.cryptobox.slot_size in - let num_slots = dal_parameters.Dal.Parameters.number_of_slots in + let number_of_slots = dal_parameters.number_of_slots in let peers = [Dal_node.listen_addr dal_bootstrap] in let peer_id dal_node = Dal_node.read_identity dal_node in @@ -6176,36 +6166,18 @@ module Amplification = struct |> List.map (fun account -> account.Account.public_key_hash) in - (* The connections between the slot producer and the observer have - no reason to be grafted on other slot indices than the one they - are both subscribed to, so we instruct - [check_events_with_topic] to skip all events but the one for - [index]. *) - let already_seen_slots = - Array.init num_slots (fun slot_index -> slot_index <> index) - in - (* Wait for a GRAFT message between the observer and the producer, - in any direction. *) - let check_graft pkh = - let graft_from_observer = - check_events_with_topic - ~event_with_topic:(Graft observer_peer_id) - producer - ~num_slots - ~already_seen_slots - pkh - in - let graft_from_producer = - check_events_with_topic - ~event_with_topic:(Graft producer_peer_id) - observer - ~num_slots - ~already_seen_slots - pkh - in - Lwt.pick [graft_from_observer; graft_from_producer] + let check_graft_promises = + List.map + (fun pkh -> + Lwt.pick + @@ check_grafts + ~number_of_slots + ~slot_index:index + (observer, observer_peer_id) + (producer, producer_peer_id) + pkh) + all_pkhs in - let check_graft_promises = List.map check_graft all_pkhs in Log.info "Waiting for grafting of the observer - producer connection" ; (* We need to bake some blocks until the L1 node notifies the DAL @@ -6282,6 +6254,232 @@ module Amplification = struct ~error_msg:"Unexpected publication level, actual:%L, expected:%R" ; unit + + let check_slot_attested node ~number_of_slots ~attested_level + ~expected_attestation = + let* metadata = + Node.RPC.call node + @@ RPC.get_chain_block_metadata ~block:(string_of_int attested_level) () + in + + let attestation = + match metadata.dal_attestation with + | None -> + Test.fail + "Missing dal_attestation field in the metadata of the block at \ + level %d" + attested_level + | Some v -> + (* [v]'s length may be smaller than [number_of_slots]; we fill it with [false] *) + List.init number_of_slots (fun i -> + if i < Array.length v then v.(i) else false) + in + Check.( + (attestation = expected_attestation) + (list bool) + ~error_msg:"Expected %R, got %L") ; + unit + + let test_by_ignoring_topics _protocol dal_parameters _cryptobox node client + dal_bootstrap = + let peer_id dal_node = Dal_node.read_identity dal_node in + + let slot_size = dal_parameters.Dal.Parameters.cryptobox.slot_size in + let number_of_slots = dal_parameters.number_of_slots in + let attestation_lag = dal_parameters.attestation_lag in + + let index = 0 in + let peers = [Dal_node.listen_addr dal_bootstrap] in + + let producer = + Dal_node.create + ~name:"producer" + ~node + ~ignore_pkhs: + [ + Constant.bootstrap1.Account.public_key_hash; + Constant.bootstrap2.Account.public_key_hash; + Constant.bootstrap3.Account.public_key_hash; + ] + () + in + let* () = Dal_node.init_config ~operator_profiles:[index] ~peers producer in + let* () = + let env = + String_map.singleton Dal_node.ignore_topics_environment_variable "yes" + in + Dal_node.run ~env ~wait_ready:true producer + in + let* producer_peer_id = peer_id producer in + + let observer = Dal_node.create ~name:"observer" ~node () in + let* () = Dal_node.init_config ~observer_profiles:[index] ~peers observer in + let* () = Dal_node.run ~wait_ready:true observer in + let* observer_peer_id = peer_id observer in + + let all_pkhs = + Account.Bootstrap.keys |> Array.to_list + |> List.map (fun account -> account.Account.public_key_hash) + in + + let attester = Dal_node.create ~name:"attester" ~node () in + let* () = + Dal_node.init_config ~attester_profiles:all_pkhs ~peers attester + in + let* () = Dal_node.run ~wait_ready:true attester in + let* attester_peer_id = peer_id attester in + + Log.info + "Waiting for grafting of the observer - producer connection, and \ + observer - attester connection" ; + let check_producer_observer_grafts pkh = + Lwt.pick + @@ check_grafts + ~number_of_slots + ~slot_index:index + (observer, observer_peer_id) + (producer, producer_peer_id) + pkh + in + let check_observer_attester_grafts pkh = + Lwt.pick + @@ check_grafts + ~number_of_slots + ~slot_index:index + (observer, observer_peer_id) + (attester, attester_peer_id) + pkh + in + let check_graft_promises = + List.map check_producer_observer_grafts all_pkhs + @ List.map check_observer_attester_grafts all_pkhs + in + + (* We need to bake some blocks until the L1 node notifies the DAL + nodes that some L1 block is final so that the topic pkhs are + known. *) + let* () = bake_for ~count:3 client in + let* () = Lwt.join check_graft_promises in + Log.info "Connections grafted" ; + + let* before_publication_level = Client.level client in + let published_slots = 3 in + let published_levels = + List.init published_slots (fun offset -> + before_publication_level + offset + 1) + in + Log.info + "Produce and publish %d slots at levels %a." + published_slots + (Format.pp_print_list + ~pp_sep:(fun fmt () -> Format.pp_print_string fmt ",") + Format.pp_print_int) + published_levels ; + let source = Constant.bootstrap1 in + (* Build the [wait_for] promises. *) + let wait_reconstruction ~published_level ~slot_index = + Dal_node.wait_for observer "dal_reconstruct_finished.v0" (fun event -> + if + JSON.( + event |-> "level" |> as_int = published_level + && event |-> "slot_index" |> as_int = slot_index) + then ( + Log.info + "Finished reconstruction for slot at level %d" + published_level ; + Some ()) + else None) + in + let wait_for_promises = + List.map + (fun published_level -> + wait_reconstruction ~published_level ~slot_index:index) + published_levels + in + let rec repeat_publish offset = + if offset > published_slots then unit + else ( + Log.info + "Publish a slot at level %d" + (before_publication_level + offset + 1) ; + let* (`OpHash op_hash) = + let content = + Helpers.make_slot ~slot_size ("slot " ^ string_of_int offset) + in + let* commitment, proof = + Helpers.store_slot producer ~slot_index:index content + in + (* TODO: we should not need to set a fee! *) + publish_commitment + ~source + ~index + ~commitment + ~proof + client + ~fee:20_000 + in + (* Bake a block to include the operation. *) + let* () = bake_for client in + (* Check that the operation is included. *) + let* included_manager_operations = + let manager_operation_pass = 3 in + Node.RPC.( + call node + @@ get_chain_block_operation_hashes_of_validation_pass + manager_operation_pass) + in + let () = + Check.list_mem + Check.string + ~__LOC__ + op_hash + included_manager_operations + ~error_msg: + "DAL commitment publishment operation not found in head block." + in + repeat_publish (offset + 1)) + in + let* () = repeat_publish 1 in + (* We bake two blocks so that the last publish operation becomes final, and + therefore all reconstructions have started. *) + let* () = bake_for client ~count:2 in + Log.info "Waiting for finished reconstruction events" ; + let* () = Lwt.join wait_for_promises in + + Log.info + "Bake [attestation_lag] blocks and check that the slots are attested" ; + let* () = + let dal_node_endpoint = + Dal_node.as_rpc_endpoint attester |> Endpoint.as_string + in + (* Using [bake_for ~count:attestation_lag], make the test fail, because + "unable to get DAL attestation for in time". To be on the safe + side, we wait a bit before baking the next block. *) + repeat attestation_lag (fun () -> + let* () = Lwt_unix.sleep 0.1 in + bake_for client ~dal_node_endpoint) + in + + let expected_attestation = + assert (index = 0) ; + List.init number_of_slots (fun i -> i = index) + in + let rec check_attestation offset = + if offset > published_slots then unit + else + let attested_level = + before_publication_level + attestation_lag + offset + in + let* () = + check_slot_attested + node + ~number_of_slots + ~attested_level + ~expected_attestation + in + check_attestation (offset + 1) + in + check_attestation 1 end module Garbage_collection = struct @@ -6476,26 +6674,18 @@ module Garbage_collection = struct Log.info "Attester DAL node is running" ; (* Now that all the DAL nodes are running, we need some of them to - establish grafted connections. The connections between the - attester and the slot producer have no reason to be grafted on - other slot indices than the one the slot producer is subscribed - to, so we instruct [check_events_with_topic] to skip all events - but the one for [slot_index]. *) - let already_seen_slots = - Array.init number_of_slots (fun index -> slot_index <> index) - in + establish grafted connections. *) (* Wait for a GRAFT message between all nodes. *) let check_graft node1 node2 attester_pkh = - let check_graft ~from:node1 node2 = - let* id1 = Dal_node.read_identity node1 in - check_events_with_topic - ~event_with_topic:(Graft id1) - node2 - ~num_slots:number_of_slots - ~already_seen_slots - attester_pkh - in - Lwt.pick [check_graft ~from:node1 node2; check_graft ~from:node2 node1] + let* id1 = Dal_node.read_identity node1 in + let* id2 = Dal_node.read_identity node2 in + Lwt.pick + @@ check_grafts + ~number_of_slots + ~slot_index + (node1, id1) + (node2, id2) + attester_pkh in let check_graft_promises = List.map (check_graft slot_producer attester) bootstrap_pkhs @@ -7310,7 +7500,7 @@ let test_rpc_get_connections _protocol dal_parameters _cryptobox node client - a slot producer on slot 0, - an observer on slot 0. *) let index = 0 in - let num_slots = dal_parameters.Dal.Parameters.number_of_slots in + let number_of_slots = dal_parameters.Dal.Parameters.number_of_slots in let peers = [Dal_node.listen_addr dal_bootstrap] in let peer_id dal_node = Dal_node.read_identity dal_node in @@ -7349,34 +7539,16 @@ let test_rpc_get_connections _protocol dal_parameters _cryptobox node client |> List.map (fun account -> account.Account.public_key_hash) in - (* The connections between the slot producer and the observer have - no reason to be grafted on other slot indices than the one they - are both subscribed to, so we instruct - [check_events_with_topic] to skip all events but the one for - [index]. *) - let already_seen_slots = - Array.init num_slots (fun slot_index -> slot_index <> index) - in (* Wait for a GRAFT message between the observer and the producer, in any direction. *) let check_graft pkh = - let graft_from_observer = - check_events_with_topic - ~event_with_topic:(Graft observer_peer_id) - producer - ~num_slots - ~already_seen_slots - pkh - in - let graft_from_producer = - check_events_with_topic - ~event_with_topic:(Graft producer_peer_id) - observer - ~num_slots - ~already_seen_slots - pkh - in - Lwt.pick [graft_from_observer; graft_from_producer] + Lwt.pick + @@ check_grafts + ~number_of_slots + ~slot_index:index + (observer, observer_peer_id) + (producer, producer_peer_id) + pkh in let check_graft_promises = List.map check_graft all_pkhs in Log.info "Waiting for grafting of the observer - producer connection" ; @@ -8367,7 +8539,7 @@ let test_new_attester_attests _protocol dal_parameters _cryptobox node client dal_bootstrap = let peer_id dal_node = Dal_node.read_identity dal_node in let slot_size = dal_parameters.Dal.Parameters.cryptobox.slot_size in - let num_slots = dal_parameters.Dal.Parameters.number_of_slots in + let number_of_slots = dal_parameters.Dal.Parameters.number_of_slots in let slot_index = 0 in let* () = check_profiles ~__LOC__ dal_bootstrap ~expected:Dal_RPC.Bootstrap in Log.info "Bootstrap DAL node is running" ; @@ -8473,27 +8645,14 @@ let test_new_attester_attests _protocol dal_parameters _cryptobox node client let* id_attester = peer_id attester in let* id_producer = peer_id producer in - let already_seen_slots = - Array.init num_slots (fun index -> slot_index <> index) - in let check_graft_promises = - let graft_from_attester = - check_events_with_topic - ~event_with_topic:(Graft id_attester) - producer - ~num_slots - ~already_seen_slots - new_account.public_key_hash - in - let graft_from_producer = - check_events_with_topic - ~event_with_topic:(Graft id_producer) - attester - ~num_slots - ~already_seen_slots - new_account.public_key_hash - in - Lwt.pick [graft_from_attester; graft_from_producer] + Lwt.pick + @@ check_grafts + ~number_of_slots + ~slot_index + (attester, id_attester) + (producer, id_producer) + new_account.public_key_hash in let* assigned_shard_indexes = Dal_RPC.( @@ -11005,18 +11164,11 @@ let register ~protocols = test_ignore_topics_wrong_env protocols ; scenario_with_layer1_and_dal_nodes - ~operator_profiles:[0] + ~tags:["amplification"; "ignore_topics"] + ~bootstrap_profile:true ~wait_ready:true - ~env: - (String_map.singleton Dal_node.ignore_topics_environment_variable "yes") - ~ignore_pkhs: - [ - Constant.bootstrap1.Account.public_key_hash; - Constant.bootstrap2.Account.public_key_hash; - ] - "DAL node ignore topics correct CLI" - (fun _protocol _parameters _cryptobox _node _client dal_node -> - Dal_node.terminate dal_node) + "Test amplification by ignoring topics" + Amplification.test_by_ignoring_topics protocols let tests_start_dal_node_around_migration ~migrate_from ~migrate_to =