From 39e5f943588c95db56b31c4d99caab6907186a26 Mon Sep 17 00:00:00 2001 From: "iguerNL@Functori" Date: Tue, 9 May 2023 23:53:46 +0200 Subject: [PATCH 1/5] DAL/Node: init the store with the GS worker --- src/bin_dal_node/daemon.ml | 2 +- src/bin_dal_node/store.ml | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index da01fd0930b2..57ff0bfe7d53 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -270,7 +270,7 @@ let run ~data_dir cctxt = let* p2p_config = p2p_config config in Gossipsub.Transport_layer.create p2p_config p2p_limits ~network_name in - let* store = Store.init config in + let* store = Store.init gs_worker config in let ctxt = Node_context.init config store gs_worker transport_layer cctxt in let* rpc_server = RPC_server.(start config ctxt) in (* activate the p2p instance. *) diff --git a/src/bin_dal_node/store.ml b/src/bin_dal_node/store.ml index 916a51c47d13..2e941bfd9448 100644 --- a/src/bin_dal_node/store.ml +++ b/src/bin_dal_node/store.ml @@ -95,6 +95,7 @@ type node_store = { store : t; shard_store : Shards.t; shards_watcher : Cryptobox.Commitment.t Lwt_watcher.input; + gs_worker : Gossipsub.Worker.t; } (** [open_shards_stream node_store] opens a stream that should be notified when @@ -102,8 +103,9 @@ type node_store = { let open_shards_stream {shards_watcher; _} = Lwt_watcher.create_stream shards_watcher -(** [init config] inits the store on the filesystem using the given [config]. *) -let init config = +(** [init gs_worker config] inits the store on the filesystem using the + given [config] and [gs_worker]. *) +let init gs_worker config = let open Lwt_result_syntax in let base_dir = Configuration.data_dir_path config path in let shards_watcher = Lwt_watcher.create_input () in @@ -111,7 +113,7 @@ let init config = let*! store = main repo in let shard_store = Shards.init base_dir shard_store_dir in let*! () = Event.(emit store_is_ready ()) in - return {shard_store; store; shards_watcher} + return {shard_store; store; shards_watcher; gs_worker} let trace_decoding_error ~data_kind ~tztrace_of_error r = let open Result_syntax in -- GitLab From c201d115b0e0bceb75e3b57e851264c2af88a9b4 Mon Sep 17 00:00:00 2001 From: "iguerNL@Functori" Date: Thu, 11 May 2023 13:40:55 +0200 Subject: [PATCH 2/5] DAL/Node: expose implem details of topic, message, and message_id --- src/lib_dal_node/gossipsub/gossipsub.mli | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/lib_dal_node/gossipsub/gossipsub.mli b/src/lib_dal_node/gossipsub/gossipsub.mli index a75116dcca1e..88d603de4da4 100644 --- a/src/lib_dal_node/gossipsub/gossipsub.mli +++ b/src/lib_dal_node/gossipsub/gossipsub.mli @@ -31,11 +31,23 @@ (** Below, we expose the main types needed for the integration with the existing DAL node alongside their encodings. *) -type topic = Gs_interface.topic - -type message_id = Gs_interface.message_id - -type message = Gs_interface.message +type topic = Gs_interface.topic = { + slot_index : int; + pkh : Signature.Public_key_hash.t; +} + +type message_id = Gs_interface.message_id = { + commitment : Cryptobox.Commitment.t; + level : int32; + slot_index : int; + shard_index : int; + pkh : Signature.Public_key_hash.t; +} + +type message = Gs_interface.message = { + share : Cryptobox.share; + shard_proof : Cryptobox.shard_proof; +} type peer = Gs_interface.peer -- GitLab From 8dc466f67738809a93c2d716b87bee45e83a4e82 Mon Sep 17 00:00:00 2001 From: "iguerNL@Functori" Date: Mon, 15 May 2023 15:58:51 +0200 Subject: [PATCH 3/5] DAL/Node: rework/fix worker events logging --- src/lib_dal_node/gossipsub/gs_logging.ml | 72 ++++++++++++++++-------- 1 file changed, 50 insertions(+), 22 deletions(-) diff --git a/src/lib_dal_node/gossipsub/gs_logging.ml b/src/lib_dal_node/gossipsub/gs_logging.ml index 581ba448fd7c..0f1e36a3bcd4 100644 --- a/src/lib_dal_node/gossipsub/gs_logging.ml +++ b/src/lib_dal_node/gossipsub/gs_logging.ml @@ -24,6 +24,8 @@ (* *) (*****************************************************************************) +open Gs_interface.Worker_instance + module Events = struct include Internal_event.Simple open Data_encoding @@ -39,42 +41,48 @@ module Events = struct declare_0 ~section ~name:(prefix "heartbeat") - ~msg:"Heartbeat" + ~msg:"Process Heartbeat" ~level:Info () let publish_message = - declare_3 + declare_2 ~section ~name:(prefix "publish_message") - ~msg:"Processing publish_message" + ~msg:"Process Publish_message id {message_id} with topic {topic}" ~level:Info + ~pp1:GS.Topic.pp + ~pp2:GS.Message_id.pp ("topic", topic_encoding) - ("message", message_encoding) ("message_id", message_id_encoding) let join = declare_1 ~section ~name:(prefix "join") - ~msg:"Processing join" + ~msg:"Process Join {topic}" ~level:Info + ~pp1:GS.Topic.pp ("topic", topic_encoding) let leave = declare_1 ~section ~name:(prefix "leave") - ~msg:"Processing leave" + ~msg:"Process Leave {topic}" ~level:Info + ~pp1:GS.Topic.pp ("topic", topic_encoding) let new_connection = declare_3 ~section ~name:(prefix "new_connection") - ~msg:"new_connection" + ~msg: + "Process New_connection from/to {peer} (direct={direct}, \ + outbound={outbound})" ~level:Info + ~pp1:P2p_peer.Id.pp ("peer", P2p_peer.Id.encoding) ("direct", bool) ("outbound", bool) @@ -83,27 +91,34 @@ module Events = struct declare_1 ~section ~name:(prefix "disconnection") - ~msg:"Disconnection" + ~msg:"Process Disconnection of {peer}" ~level:Info + ~pp1:P2p_peer.Id.pp ("peer", P2p_peer.Id.encoding) let message_with_header = - declare_4 + declare_3 ~section ~name:(prefix "message_with_header") - ~msg:"Processing Message_with_header" + ~msg: + "Process Message_with_header from {peer} with id {message_id} and \ + topic {topic}" ~level:Info + ~pp1:P2p_peer.Id.pp + ~pp2:GS.Topic.pp + ~pp3:GS.Message_id.pp ("peer", P2p_peer.Id.encoding) ("topic", topic_encoding) - ("message", message_encoding) ("message_id", message_id_encoding) let subscribe = declare_2 ~section ~name:(prefix "subscribe") - ~msg:"Processing subscribe" + ~msg:"Process Subscribe {peer} to {topic}" ~level:Info + ~pp1:P2p_peer.Id.pp + ~pp2:GS.Topic.pp ("peer", P2p_peer.Id.encoding) ("topic", topic_encoding) @@ -111,8 +126,10 @@ module Events = struct declare_2 ~section ~name:(prefix "unsubscribe") - ~msg:"Processing unsubscribe" + ~msg:"Process Unsubscribe {peer} from {topic}" ~level:Info + ~pp1:P2p_peer.Id.pp + ~pp2:GS.Topic.pp ("peer", P2p_peer.Id.encoding) ("topic", topic_encoding) @@ -120,8 +137,10 @@ module Events = struct declare_2 ~section ~name:(prefix "graft") - ~msg:"Processing graft" + ~msg:"Process Graft {peer} for {topic}" ~level:Info + ~pp1:P2p_peer.Id.pp + ~pp2:GS.Topic.pp ("peer", P2p_peer.Id.encoding) ("topic", topic_encoding) @@ -129,8 +148,12 @@ module Events = struct declare_4 ~section ~name:(prefix "prune") - ~msg:"Processing prune" + ~msg:"Process Prune {peer} for {topic} with backoff {backoff} and px {px}" ~level:Info + ~pp1:P2p_peer.Id.pp + ~pp2:GS.Topic.pp + ~pp3:Span.pp + ~pp4:(Format.pp_print_list P2p_peer.Id.pp) ("peer", P2p_peer.Id.encoding) ("topic", topic_encoding) ("backoff", span_encoding) @@ -140,8 +163,12 @@ module Events = struct declare_3 ~section ~name:(prefix "ihave") - ~msg:"Processing IHave" + ~msg: + "Process IHave from {peer} for {topic} with message_ids {message_ids}" ~level:Info + ~pp1:P2p_peer.Id.pp + ~pp2:GS.Topic.pp + ~pp3:(Format.pp_print_list GS.Message_id.pp) ("peer", P2p_peer.Id.encoding) ("topic", topic_encoding) ("message_ids", list message_id_encoding) @@ -150,21 +177,22 @@ module Events = struct declare_2 ~section ~name:(prefix "iwant") - ~msg:"Processing IWant" + ~msg:"Process IWant from {peer} with message_ids {message_ids}" ~level:Info + ~pp1:P2p_peer.Id.pp + ~pp2:(Format.pp_print_list GS.Message_id.pp) ("peer", P2p_peer.Id.encoding) ("message_ids", list message_id_encoding) end let event = let open Events in - let open Gs_interface.Worker_instance in function | Heartbeat -> emit heartbeat () | App_input event -> ( match event with - | Publish_message {message; message_id; topic} -> - emit publish_message (topic, message, message_id) + | Publish_message {message = _; message_id; topic} -> + emit publish_message (topic, message_id) | Join topic -> emit join topic | Leave topic -> emit leave topic) | P2P_input event -> ( @@ -174,8 +202,8 @@ let event = | Disconnection {peer} -> emit disconnection peer | In_message {from_peer; p2p_message} -> ( match p2p_message with - | Message_with_header {message; topic; message_id} -> - emit message_with_header (from_peer, topic, message, message_id) + | Message_with_header {message = _; topic; message_id} -> + emit message_with_header (from_peer, topic, message_id) | Subscribe {topic} -> emit subscribe (from_peer, topic) | Unsubscribe {topic} -> emit unsubscribe (from_peer, topic) | Graft {topic} -> emit graft (from_peer, topic) -- GitLab From 7dca3a2820ac945813a929a283618aebd714000d Mon Sep 17 00:00:00 2001 From: "iguerNL@Functori" Date: Thu, 11 May 2023 13:42:43 +0200 Subject: [PATCH 4/5] DAL/Node: join the corresponding topics when adding a profile --- src/bin_dal_node/RPC_server.ml | 3 ++- src/bin_dal_node/profile_manager.ml | 4 ++-- src/bin_dal_node/profile_manager.mli | 5 ++++- src/bin_dal_node/store.ml | 23 +++++++++++++++++------ 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/src/bin_dal_node/RPC_server.ml b/src/bin_dal_node/RPC_server.ml index 02f7f6dc7677..379f079e9a54 100644 --- a/src/bin_dal_node/RPC_server.ml +++ b/src/bin_dal_node/RPC_server.ml @@ -128,7 +128,8 @@ end module Profile_handlers = struct let patch_profile ctxt () profile = - call_handler1 ctxt (fun store -> Profile_manager.add_profile store profile) + call_handler2 ctxt (fun store {proto_parameters; _} -> + Profile_manager.add_profile proto_parameters store profile) let get_profiles ctxt () () = call_handler1 ctxt (fun store -> diff --git a/src/bin_dal_node/profile_manager.ml b/src/bin_dal_node/profile_manager.ml index 2ffafbe13267..dfd7c9673f80 100644 --- a/src/bin_dal_node/profile_manager.ml +++ b/src/bin_dal_node/profile_manager.ml @@ -27,9 +27,9 @@ Node profiles should be stored into the memory as well so that we can cache them *) -let add_profile node_store profile = +let add_profile proto_parameters node_store profile = let open Lwt_result_syntax in - let*! () = Store.Legacy.add_profile node_store profile in + let*! () = Store.Legacy.add_profile proto_parameters node_store profile in return_unit let get_profiles node_store = Store.Legacy.get_profiles node_store diff --git a/src/bin_dal_node/profile_manager.mli b/src/bin_dal_node/profile_manager.mli index 9ce2b3722997..f9e66f2b876b 100644 --- a/src/bin_dal_node/profile_manager.mli +++ b/src/bin_dal_node/profile_manager.mli @@ -28,7 +28,10 @@ (** Adds a profile to the dal [node_store]. If already present, the store does not change. *) val add_profile : - Store.node_store -> Services.Types.profile -> unit tzresult Lwt.t + Dal_plugin.proto_parameters -> + Store.node_store -> + Services.Types.profile -> + unit tzresult Lwt.t (** [get_profiles node_store] returns the list of profiles that the node tracks *) val get_profiles : diff --git a/src/bin_dal_node/store.ml b/src/bin_dal_node/store.ml index 2e941bfd9448..2daec09167d6 100644 --- a/src/bin_dal_node/store.ml +++ b/src/bin_dal_node/store.ml @@ -476,13 +476,24 @@ module Legacy = struct let* profiles = list node_store.store path in return @@ List.map_e (fun (p, _) -> decode_profile p) profiles - let add_profile node_store profile = + let add_profile Dal_plugin.{number_of_slots; _} node_store profile = + let open Lwt_syntax in let path = Path.Profile.profile profile in - set - ~msg:(Printf.sprintf "New profile added: %s" (Path.to_string path)) - node_store.store - path - "" + let* () = + set + ~msg:(Printf.sprintf "New profile added: %s" (Path.to_string path)) + node_store.store + path + "" + in + match profile with + | Attestor pkh -> + List.iter + (fun slot_index -> + Join Gossipsub.{slot_index; pkh} + |> Gossipsub.Worker.(app_input node_store.gs_worker)) + Utils.Infix.(0 -- (number_of_slots - 1)) ; + return_unit (** Filter the given list of indices according to the values of the given slot level and index. *) -- GitLab From 4dfd646d2b645c3f5aca09a669ea85e6461de887 Mon Sep 17 00:00:00 2001 From: "iguerNL@Functori" Date: Thu, 11 May 2023 13:43:49 +0200 Subject: [PATCH 5/5] Tezt/Dal: add a test for GS topic join --- tezt/tests/dal.ml | 79 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index 9e0a6bc1c1f2..e9289dfa56a8 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -2500,6 +2500,69 @@ let check_new_connection_event ~main_node ~other_node ~is_outbound = in check_expected is_outbound JSON.(event |-> "outbound" |> as_bool)) +type peer_id = string + +type event_with_topic = + | Subscribe of peer_id + | Unsubscribe of peer_id + | Graft of peer_id + | Join + | Leave + +let event_with_topic_to_string = function + | Subscribe _ -> "subscribe" + | Unsubscribe _ -> "unsubscribe" + | Graft _ -> "graft" + | Join -> "join" + | Leave -> "leave" + +(** This function monitors the Gossipsub worker events whose name is given by + [event_with_topic]. + + More precisely, since topics depend on a pkh and the number of DAL slots, + this function monitors all the events {pkh; slot_index = 0} ... {pkh; + slot_index = num_slots - 1}. + + Depending on the value of [event_with_topic], some extra checks, such as the + peer id in case of Graft and Subscribe, are also done. +*) +let check_events_with_topic ~event_with_topic dal_node ~num_slots expected_pkh = + let remaining = ref num_slots in + let seen = Array.make num_slots false in + let get_slot_index_opt event = + let*?? topic = + match event_with_topic with + | Subscribe expected_peer + | Unsubscribe expected_peer + | Graft expected_peer -> + let*?? () = + check_expected + expected_peer + JSON.(event |-> "peer" |> JSON.as_string) + in + Some JSON.(event |-> "topic") + | Join | Leave -> Some event + in + let*?? () = + check_expected expected_pkh JSON.(topic |-> "pkh" |> JSON.as_string) + in + Some JSON.(topic |-> "slot_index" |> as_int) + in + wait_for_gossipsub_worker_event + dal_node + ~name:(event_with_topic_to_string event_with_topic) + (fun event -> + let*?? slot_index = get_slot_index_opt event in + Check.( + (seen.(slot_index) = false) + bool + ~error_msg: + (sf "Slot_index %d already seen. Invariant broken" slot_index)) ; + seen.(slot_index) <- true ; + let () = remaining := !remaining - 1 in + if !remaining = 0 && Array.for_all (fun b -> b) seen then Some () + else None) + let test_dal_node_p2p_connection _protocol _parameters _cryptobox node client dal_node1 = let dal_node2 = Dal_node.create ~node ~client () in @@ -2521,6 +2584,18 @@ let test_dal_node_p2p_connection _protocol _parameters _cryptobox node client let* () = conn_ev_in_node1 and* () = conn_ev_in_node2 in unit +let test_dal_node_join_topic _protocol _parameters _cryptobox _node client + dal_node1 = + let pkh1 = Constant.bootstrap1.public_key_hash in + let profile1 = Rollup.Dal.RPC.Attestor pkh1 in + let* params = Rollup.Dal.Parameters.from_client client in + let num_slots = params.number_of_slots in + let event_waiter = + check_events_with_topic ~event_with_topic:Join dal_node1 ~num_slots pkh1 + in + let* () = RPC.call dal_node1 (Rollup.Dal.RPC.patch_profile profile1) in + event_waiter + let register ~protocols = (* Tests with Layer1 node only *) scenario_with_layer1_node @@ -2615,6 +2690,10 @@ let register ~protocols = "GS/P2P connection" test_dal_node_p2p_connection protocols ; + scenario_with_layer1_and_dal_nodes + "GS join topic" + test_dal_node_join_topic + protocols ; (* Tests with all nodes *) scenario_with_all_nodes -- GitLab