From 54e139a8b73f7263ea54890fdd6b851491a9fb73 Mon Sep 17 00:00:00 2001 From: "iguerNL@Functori" Date: Mon, 15 May 2023 14:11:00 +0200 Subject: [PATCH 1/3] DAL/Node: rework the GS/Transport interconnection interface --- src/bin_dal_node/daemon.ml | 12 +++++++++++- src/lib_dal_node/gossipsub/gossipsub.mli | 2 +- .../gossipsub/gs_transport_connection.ml | 2 +- .../gossipsub/gs_transport_connection.mli | 2 +- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index 57ff0bfe7d53..10a1f83dadc8 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -240,6 +240,15 @@ let daemonize handlers = return_unit) |> lwt_map_error (List.fold_left (fun acc errs -> errs @ acc) []) +let connect_gossipsub_with_p2p gs_worker transport_layer = + Lwt.dont_wait + (fun () -> + Gossipsub.Transport_layer_hooks.activate gs_worker transport_layer) + (fun exn -> + "[dal_node] error in Daemon.connect_gossipsub_with_p2p: " + ^ Printexc.to_string exn + |> Stdlib.failwith) + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/3605 Improve general architecture, handle L1 disconnection etc *) @@ -275,7 +284,8 @@ let run ~data_dir cctxt = let* rpc_server = RPC_server.(start config ctxt) in (* activate the p2p instance. *) Gossipsub.Transport_layer.activate ~additional_points:peers transport_layer ; - let () = Gossipsub.Transport_layer_hooks.activate gs_worker transport_layer in + connect_gossipsub_with_p2p gs_worker transport_layer ; + let _ = RPC_server.install_finalizer rpc_server in let*! () = Event.(emit rpc_server_is_ready (rpc_addr, rpc_port)) in (* Start daemon to resolve current protocol plugin *) diff --git a/src/lib_dal_node/gossipsub/gossipsub.mli b/src/lib_dal_node/gossipsub/gossipsub.mli index 88d603de4da4..8e96c44ad141 100644 --- a/src/lib_dal_node/gossipsub/gossipsub.mli +++ b/src/lib_dal_node/gossipsub/gossipsub.mli @@ -110,5 +110,5 @@ end {!Transport_layer_hooks.activate} function below. *) module Transport_layer_hooks : sig (** See {!Gs_transport_connection.activate}. *) - val activate : Worker.t -> Transport_layer.t -> unit + val activate : Worker.t -> Transport_layer.t -> unit Lwt.t end diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.ml b/src/lib_dal_node/gossipsub/gs_transport_connection.ml index d9d1099d32fe..6b1033f03d6a 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.ml +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.ml @@ -43,4 +43,4 @@ let activate gs_worker p2p_layer = new_connection_handler gs_worker p2p_layer |> P2p.on_new_connection p2p_layer in - () + Lwt.return_unit diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.mli b/src/lib_dal_node/gossipsub/gs_transport_connection.mli index 7395eeb3f349..9bfb0a336480 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.mli +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.mli @@ -34,4 +34,4 @@ val activate : Transport_layer_interface.peer_metadata, Transport_layer_interface.connection_metadata ) P2p.t -> - unit + unit Lwt.t -- GitLab From 20627be072c29a97133cd1a76debf4ace43f229a Mon Sep 17 00:00:00 2001 From: "iguerNL@Functori" Date: Sat, 13 May 2023 13:39:08 +0200 Subject: [PATCH 2/3] DAL/Node: add handlers to forward messages from/to GS to/from P2P --- .../gossipsub/gs_transport_connection.ml | 123 +++++++++++++++++- 1 file changed, 122 insertions(+), 1 deletion(-) diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.ml b/src/lib_dal_node/gossipsub/gs_transport_connection.ml index 6b1033f03d6a..cdce544f3b38 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.ml +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.ml @@ -25,6 +25,26 @@ (*****************************************************************************) module Worker = Gs_interface.Worker_instance +open Gs_interface.Worker_instance + +module Events = struct + include Internal_event.Simple + + let section = ["gossipsub"; "transport"; "event"] + + let prefix = + let prefix = String.concat "_" section in + fun s -> prefix ^ "-" ^ s + + let no_connection_for_peer = + declare_1 + ~section + ~name:(prefix "no_connection_for_peer") + ~msg:"No running connection found for peer {peer}" + ~level:Notice + ~pp1:P2p_peer.Id.pp + ("peer", P2p_peer.Id.encoding) +end (** This handler forwards information about connections established by the P2P layer to the Gossipsub worker. *) @@ -37,10 +57,111 @@ let new_connection_handler gs_worker p2p_layer peer conn = let direct = false in Worker.(New_connection {peer; direct; outbound} |> p2p_input gs_worker) +(* This function translates a Worker p2p_message to the type of messages sent + via the P2P layer. The two types don't coincide because of Prune. *) +let wrap_p2p_message = + let module W = Worker in + let open Transport_layer_interface in + function + | W.Graft {topic} -> Graft {topic} + | W.Prune _ -> + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5646 + + Handle Prune messages in GS/P2P interconnection. *) + assert false + | W.IHave {topic; message_ids} -> IHave {topic; message_ids} + | W.IWant {message_ids} -> IWant {message_ids} + | W.Subscribe {topic} -> Subscribe {topic} + | W.Unsubscribe {topic} -> Unsubscribe {topic} + | W.Message_with_header {message; topic; message_id} -> + Message_with_header {message; topic; message_id} + +(* This function translates a message received via the P2P layer to a Worker + p2p_message. The two types don't coincide because of Prune. *) +let unwrap_p2p_message = + let open Worker in + let module I = Transport_layer_interface in + function + | I.Graft {topic} -> Graft {topic} + | I.Prune _ -> + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5646 + + Handle Prune messages in GS/P2P interconnection. *) + assert false + | I.IHave {topic; message_ids} -> IHave {topic; message_ids} + | I.IWant {message_ids} -> IWant {message_ids} + | I.Subscribe {topic} -> Subscribe {topic} + | I.Unsubscribe {topic} -> Unsubscribe {topic} + | I.Message_with_header {message; topic; message_id} -> + Message_with_header {message; topic; message_id} + +(** This handler pops and processes the items put by the worker in the p2p + output stream. The out messages are sent to the corresponding peers and the + directives to the P2P layer to connect or disconnect peers are handled. *) +let gs_worker_p2p_output_handler gs_worker p2p_layer = + let open Lwt_syntax in + let rec loop output_stream = + let* p2p_output = Worker.Stream.pop output_stream in + let* () = + match p2p_output with + | Worker.Out_message {to_peer; p2p_message} -> ( + let conn = P2p.find_connection_by_peer_id p2p_layer to_peer in + match conn with + | None -> + (* This could happen when the peer is disconnected or the + connection is accepted but not running (authenticated) yet. *) + (* TODO: https://gitlab.com/tezos/tezos/-/issues/5649 + + Are there weird cases in which there is no connection + associated to the peer, but the peer is still registered as + connected on the GS side? *) + Events.(emit no_connection_for_peer to_peer) + | Some conn -> + Error_monad.dont_wait + (fun () -> + wrap_p2p_message p2p_message |> P2p.send p2p_layer conn) + (Format.eprintf + "Uncaught error in %s: %a\n%!" + __FUNCTION__ + Error_monad.pp_print_trace) + (fun exc -> + Format.eprintf + "Uncaught exception in %s: %s\n%!" + __FUNCTION__ + (Printexc.to_string exc)) ; + return_unit) + | Disconnect {peer = _} | Connect {peer = _} | Kick {peer = _} -> + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5647 + + Handle Disconnect, Connect and Kick directives from GS *) + assert false + in + loop output_stream + in + Worker.p2p_output_stream gs_worker |> loop + +(** This handler forwards p2p messages received via Octez p2p to the Gossipsub + worker. *) +let transport_layer_inputs_handler gs_worker p2p_layer = + let open Lwt_syntax in + let rec loop () = + let* conn, msg = P2p.recv_any p2p_layer in + let {P2p_connection.Info.peer_id; _} = P2p.connection_info p2p_layer conn in + Worker.( + In_message {from_peer = peer_id; p2p_message = unwrap_p2p_message msg} + |> p2p_input gs_worker) ; + loop () + in + loop () + let activate gs_worker p2p_layer = (* Register a handler to notify new P2P connections to GS. *) let () = new_connection_handler gs_worker p2p_layer |> P2p.on_new_connection p2p_layer in - Lwt.return_unit + Lwt.join + [ + gs_worker_p2p_output_handler gs_worker p2p_layer; + transport_layer_inputs_handler gs_worker p2p_layer; + ] -- GitLab From 9b1539ae0f22c9a7cddd7aa48b771808772c3e54 Mon Sep 17 00:00:00 2001 From: "iguerNL@Functori" Date: Thu, 11 May 2023 17:32:27 +0200 Subject: [PATCH 3/3] Tezt/DAL: test subscribe & graft --- tezt/tests/dal.ml | 66 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index e9289dfa56a8..53538c88c420 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -2596,6 +2596,68 @@ let test_dal_node_join_topic _protocol _parameters _cryptobox _node client let* () = RPC.call dal_node1 (Rollup.Dal.RPC.patch_profile profile1) in event_waiter +let test_dal_node_gs_topic_subscribe_and_graft _protocol _parameters _cryptobox + node client dal_node1 = + (* connect *) + let dal_node2 = Dal_node.create ~node ~client () in + let* _config_file = Dal_node.init_config dal_node2 in + update_known_peers dal_node2 [dal_node1] ; + let conn_ev_in_node1 = + check_new_connection_event + ~main_node:dal_node1 + ~other_node:dal_node2 + ~is_outbound:false + in + let conn_ev_in_node2 = + check_new_connection_event + ~main_node:dal_node2 + ~other_node:dal_node1 + ~is_outbound:true + in + let* () = Dal_node.run dal_node2 in + let* () = conn_ev_in_node1 and* () = conn_ev_in_node2 in + + (* node1 joins topic {pkh} -> it sends subscribe messages to node2. *) + let pkh1 = Constant.bootstrap1.public_key_hash in + let profile1 = Rollup.Dal.RPC.Attestor pkh1 in + let* params = Rollup.Dal.Parameters.from_client client in + let num_slots = params.number_of_slots in + let peer_id1 = + JSON.(Dal_node.read_identity dal_node1 |-> "peer_id" |> as_string) + in + let peer_id2 = + JSON.(Dal_node.read_identity dal_node2 |-> "peer_id" |> as_string) + in + + let event_waiter = + check_events_with_topic + ~event_with_topic:(Subscribe peer_id1) + dal_node2 + ~num_slots + pkh1 + in + let* () = RPC.call dal_node1 (Rollup.Dal.RPC.patch_profile profile1) in + let* () = event_waiter in + + (* node2 joins topic {pkh} -> it sends subscribe and graft messages to + node1. *) + let event_waiter_subscribe = + check_events_with_topic + ~event_with_topic:(Subscribe peer_id2) + dal_node1 + ~num_slots + pkh1 + in + let event_waiter_graft = + check_events_with_topic + ~event_with_topic:(Graft peer_id2) + dal_node1 + ~num_slots + pkh1 + in + let* () = RPC.call dal_node2 (Rollup.Dal.RPC.patch_profile profile1) in + Lwt.join [event_waiter_subscribe; event_waiter_graft] + let register ~protocols = (* Tests with Layer1 node only *) scenario_with_layer1_node @@ -2694,6 +2756,10 @@ let register ~protocols = "GS join topic" test_dal_node_join_topic protocols ; + scenario_with_layer1_and_dal_nodes + "GS topic subscribe and graft" + test_dal_node_gs_topic_subscribe_and_graft + protocols ; (* Tests with all nodes *) scenario_with_all_nodes -- GitLab