From 836bd4556a229b1115cf9950539d9562b0b3184b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Wed, 23 Oct 2024 16:53:05 +0200 Subject: [PATCH 1/6] DAL/Node/Types: Implement a connection datatype --- src/lib_dal_node_services/types.ml | 32 ++++++++++++++++++++++++----- src/lib_dal_node_services/types.mli | 27 ++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index b14a57b86b68..7b24df1b0c9d 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -180,24 +180,46 @@ module Message = struct (req "shard_proof" Cryptobox.shard_proof_encoding)) end -module Peer = struct - type t = P2p_peer.Id.t +module Connection = struct + type t = {peer_id : P2p_peer.Id.t; maybe_reachable_point : P2p_point.Id.t} module Cmp = struct type nonrec t = t - let compare p1 p2 = P2p_peer.Id.compare p1 p2 + (* We only compare the peer id here. The reason is that it is safe to assume: + + 1. For connected peers, they can be identified by a unique IP address. + + 2. For not connected peers we want to advertise only one point + that can change with time. *) + let compare p1 p2 = P2p_peer.Id.compare p1.peer_id p2.peer_id end include Compare.Make (Cmp) module Set = Set.Make (Cmp) module Map = Map.Make (Cmp) - let pp = P2p_peer.Id.pp + let encoding = + let open Data_encoding in + conv + (fun {peer_id; maybe_reachable_point} -> (peer_id, maybe_reachable_point)) + (fun (peer_id, maybe_reachable_point) -> {peer_id; maybe_reachable_point}) + (obj2 + (req "peer_id" P2p_peer.Id.encoding) + (req "maybe_reachable_point" P2p_point.Id.encoding)) - let encoding = P2p_peer.Id.encoding + let pp fmt {peer_id; maybe_reachable_point} = + Format.fprintf + fmt + "{peer_id=%a;@,maybe_reachable_point=%a}" + P2p_peer.Id.pp + peer_id + P2p_point.Id.pp + maybe_reachable_point end +module Peer = P2p_peer.Id + module Point = struct type t = P2p_point.Id.t diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index cc8139f5db03..17b79fb3bb74 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -112,6 +112,33 @@ module Message : sig include ENCODABLE with type t := t end +(** A connection from the point of view of gossipsub. *) +module Peer : sig + (** For incoming connections, we know the peer is reachable, and the peer's + address and the port are provided by the octez-p2p layer. + + For outgoing connections, if the remote peers has not specified + its address, nor its port, then the peer's address is + : (at the time of + writing 11732). However, the remote node's user can override both + the address and the port if desired and in that case, the + specified values will be used. *) + type t = {peer_id : P2p_peer.Id.t; maybe_reachable_point : P2p_point.Id.t} + + (** Comparison is not a structural one, instead only the [peer_id] + is used. *) + + include PRINTABLE with type t := t + + include ENCODABLE with type t := t + + include COMPARABLE with type t := t + + module Set : Set.S with type elt = t + + module Map : Map.S with type key = t +end + (** From the Gossipsub point of view, a peer is given by a cryptographic node identity {!P2p_peer.Id.t}. It's up to the caller to associate the {!P2p_peer.Id.t} to a {!P2p_point.Id.t} if needed (to e.g. implement peers -- GitLab From cf5224f18ef62480c32c4ef54a525b71477b6d23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Wed, 23 Oct 2024 17:50:21 +0200 Subject: [PATCH 2/6] DAL/Node: Use the previously datatype as the new peer for gossipsub --- src/lib_dal_node/gossipsub/gs_logging.ml | 24 ++++-- .../gossipsub/gs_transport_connection.ml | 83 ++++++++++++------- .../gossipsub/transport_layer_interface.ml | 11 +-- .../gossipsub/transport_layer_interface.mli | 2 +- src/lib_dal_node_services/types.ml | 4 +- src/lib_dal_node_services/types.mli | 21 +---- 6 files changed, 77 insertions(+), 68 deletions(-) diff --git a/src/lib_dal_node/gossipsub/gs_logging.ml b/src/lib_dal_node/gossipsub/gs_logging.ml index 61e50e0c4381..6625749fde75 100644 --- a/src/lib_dal_node/gossipsub/gs_logging.ml +++ b/src/lib_dal_node/gossipsub/gs_logging.ml @@ -207,17 +207,23 @@ let event = | P2P_input event -> ( match event with | New_connection {peer; direct; trusted; bootstrap} -> - emit new_connection (peer, direct, trusted, bootstrap) - | Disconnection {peer} -> emit disconnection peer + emit new_connection (peer.peer_id, direct, trusted, bootstrap) + | Disconnection {peer} -> emit disconnection peer.peer_id | In_message {from_peer; p2p_message} -> ( match p2p_message with | Message_with_header {message = _; topic; message_id} -> - emit message_with_header (from_peer, topic, message_id) - | Subscribe {topic} -> emit subscribe (from_peer, topic) - | Unsubscribe {topic} -> emit unsubscribe (from_peer, topic) - | Graft {topic} -> emit graft (from_peer, topic) + emit message_with_header (from_peer.peer_id, topic, message_id) + | Subscribe {topic} -> emit subscribe (from_peer.peer_id, topic) + | Unsubscribe {topic} -> emit unsubscribe (from_peer.peer_id, topic) + | Graft {topic} -> emit graft (from_peer.peer_id, topic) | Prune {topic; px; backoff} -> - emit prune (from_peer, topic, backoff, List.of_seq px) + emit + prune + ( from_peer.peer_id, + topic, + backoff, + List.of_seq px + |> List.map (fun Types.Peer.{peer_id; _} -> peer_id) ) | IHave {topic; message_ids} -> - emit ihave (from_peer, topic, message_ids) - | IWant {message_ids} -> emit iwant (from_peer, message_ids))) + emit ihave (from_peer.peer_id, topic, message_ids) + | IWant {message_ids} -> emit iwant (from_peer.peer_id, message_ids))) diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.ml b/src/lib_dal_node/gossipsub/gs_transport_connection.ml index 3a155715be79..547274e2c3df 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.ml +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.ml @@ -136,13 +136,14 @@ end = struct P2p_peer.Id.equal px_peer k2.px_peer && match (origin, k2.origin) with - | PX peer1, PX peer2 -> P2p_peer.Id.equal peer1 peer2 + | PX peer1, PX peer2 -> P2p_peer.Id.equal peer1.peer_id peer2.peer_id | Trusted, Trusted -> true | PX _, _ | Trusted, _ -> false let hash {origin; px_peer} = match origin with - | PX peer -> (P2p_peer.Id.hash px_peer * 31) + P2p_peer.Id.hash peer + | PX peer -> + (P2p_peer.Id.hash px_peer * 31) + P2p_peer.Id.hash peer.peer_id | Trusted -> P2p_peer.Id.hash px_peer * 17 end) @@ -164,10 +165,10 @@ end missing from the connection's metadata, the function inspects the Internet connection link's information. It returns [None] if it does manage to get those information with both methods. *) -let px_peer_of_peer p2p_layer peer = +let px_peer_of_peer p2p_layer peer_id = let open Option_syntax in let open Transport_layer_interface in - let* conn = P2p.find_connection_by_peer_id p2p_layer peer in + let* conn = P2p.find_connection_by_peer_id p2p_layer peer_id in (* In general, people either provide an address and a port, or just a port. In any case, we use `P2p.connection_remote_metadata` to get the address and the port of the provided values, and we fall back to the values given by @@ -185,7 +186,7 @@ let px_peer_of_peer p2p_layer peer = in let addr = Option.value advertised_net_addr ~default:conn_addr in let* port = Option.either advertised_net_port conn_port_opt in - return {point = (addr, port); peer} + return Types.Peer.{maybe_reachable_point = (addr, port); peer_id} (* FIXME: https://gitlab.com/tezos/tezos/-/issues/6637 @@ -196,8 +197,29 @@ let px_peer_of_peer p2p_layer peer = table. *) let cache_point_of_trusted_peer p2p_layer px_cache peer = px_peer_of_peer p2p_layer peer - |> Option.iter (fun Transport_layer_interface.{point; peer} -> - PX_cache.insert px_cache ~origin:Trusted ~px_peer:peer point) + |> Option.iter (fun Types.Peer.{maybe_reachable_point; peer_id} -> + PX_cache.insert + px_cache + ~origin:Trusted + ~px_peer:peer_id + maybe_reachable_point) + +let peer_of_connection p2p_layer conn = + let open Types.P2P.Metadata.Connection in + let {P2p_connection.Info.peer_id; id_point; remote_metadata; _} = + P2p.connection_info p2p_layer conn + in + let default_port = + Transport_layer_default_parameters.P2p_config.listening_port + in + let addr = + Option.value remote_metadata.advertised_net_addr ~default:(fst id_point) + in + let port = + Option.value remote_metadata.advertised_net_port ~default:default_port + in + let maybe_reachable_point = (addr, port) in + Types.Peer.{peer_id; maybe_reachable_point} (** This handler forwards information about connections established by the P2P layer to the Gossipsub worker. @@ -212,7 +234,7 @@ let cache_point_of_trusted_peer p2p_layer px_cache peer = outbound to avoid possible love bombing attacks. The Rust version also implements a way to mitigate this risk, but not the Go implementation. *) -let new_connections_handler px_cache gs_worker p2p_layer peer conn = +let new_connections_handler px_cache gs_worker p2p_layer peer_id conn = let P2p_connection.Info.{id_point = addr, port_opt; _} = P2p.connection_info p2p_layer conn in @@ -226,7 +248,7 @@ let new_connections_handler px_cache gs_worker p2p_layer peer conn = ~none:true (* It doesn't matter in fake networks where pool is None *) ~some:(fun pool -> f pool arg) in - let trusted_peer = fold_pool_opt P2p_pool.Peers.get_trusted peer in + let trusted_peer = fold_pool_opt P2p_pool.Peers.get_trusted peer_id in let trusted_point = Option.fold port_opt ~none:false ~some:(fun port -> fold_pool_opt P2p_pool.Points.get_trusted (addr, port)) @@ -236,25 +258,27 @@ let new_connections_handler px_cache gs_worker p2p_layer peer conn = Add the ability to have direct peers. *) let direct = false in - if trusted then cache_point_of_trusted_peer p2p_layer px_cache peer ; + if trusted then cache_point_of_trusted_peer p2p_layer px_cache peer_id ; + let peer = peer_of_connection p2p_layer conn in Worker.( New_connection {peer; direct; trusted; bootstrap} |> p2p_input gs_worker) (** This handler forwards information about P2P disconnections to the Gossipsub worker. *) -let disconnections_handler gs_worker peer = +let disconnections_handler gs_worker _peer = + (* Will be fixed in the next commit. The diff of this commit will + already be quite difficult to review. *) + let peer = assert false in Worker.(Disconnection {peer} |> p2p_input gs_worker) (* This function translates a Worker p2p_message to the type of messages sent via the P2P layer. The two types don't coincide because of Prune. *) -let wrap_p2p_message p2p_layer = +let wrap_p2p_message _p2p_layer = let module W = Worker in let open Transport_layer_interface in function | W.Graft {topic} -> Graft {topic} - | W.Prune {topic; px; backoff} -> - let px = Seq.filter_map (fun peer -> px_peer_of_peer p2p_layer peer) px in - Prune {topic; px; backoff} + | W.Prune {topic; px; backoff} -> Prune {topic; px; backoff} | W.IHave {topic; message_ids} -> IHave {topic; message_ids} | W.IWant {message_ids} -> IWant {message_ids} | W.Subscribe {topic} -> Subscribe {topic} @@ -272,14 +296,15 @@ let unwrap_p2p_message p2p_layer ~from_peer px_cache = | I.Prune {topic; px; backoff} -> let px = Seq.map - (fun I.{point; peer} -> - if Option.is_none @@ P2p.find_connection_by_peer_id p2p_layer peer + (fun (Types.Peer.{maybe_reachable_point; peer_id} as peer) -> + if + Option.is_none @@ P2p.find_connection_by_peer_id p2p_layer peer_id then PX_cache.insert px_cache ~origin:(PX from_peer) - ~px_peer:peer - point ; + ~px_peer:peer_id + maybe_reachable_point ; peer) px in @@ -367,7 +392,7 @@ let gs_worker_p2p_output_handler gs_worker p2p_layer px_cache = let* () = match p2p_output with | Worker.Out_message {to_peer; p2p_message} -> ( - let conn = P2p.find_connection_by_peer_id p2p_layer to_peer in + let conn = P2p.find_connection_by_peer_id p2p_layer to_peer.peer_id in match conn with | None -> (* This could happen when the peer is disconnected or the @@ -377,13 +402,13 @@ let gs_worker_p2p_output_handler gs_worker p2p_layer px_cache = Are there weird cases in which there is no connection associated to the peer, but the peer is still registered as connected on the GS side? *) - Events.(emit no_connection_for_peer to_peer) + Events.(emit no_connection_for_peer to_peer.peer_id) | Some conn -> ( let* (res : unit tzresult) = let msg = wrap_p2p_message p2p_layer p2p_message in let* () = if log_sending_message p2p_message then - Events.(emit send_p2p_message (to_peer, msg)) + Events.(emit send_p2p_message (to_peer.peer_id, msg)) else return_unit in P2p.send p2p_layer conn msg @@ -391,20 +416,20 @@ let gs_worker_p2p_output_handler gs_worker p2p_layer px_cache = match res with | Ok () -> return_unit | Error err -> - Events.(emit send_p2p_message_failed (to_peer, err)))) + Events.(emit send_p2p_message_failed (to_peer.peer_id, err)))) | Disconnect {peer} -> - P2p.find_connection_by_peer_id p2p_layer peer + P2p.find_connection_by_peer_id p2p_layer peer.peer_id |> Option.iter_s (P2p.disconnect ~reason:"disconnected by Gossipsub" p2p_layer) | Connect {peer; origin} -> - try_connect p2p_layer px_cache ~px_peer:peer ~origin + try_connect p2p_layer px_cache ~px_peer:peer.peer_id ~origin | Connect_point {point} -> try_connect_point p2p_layer point | Forget {peer; origin} -> - PX_cache.drop px_cache ~px_peer:peer ~origin:(PX origin) ; + PX_cache.drop px_cache ~px_peer:peer.peer_id ~origin:(PX origin) ; return_unit | Kick {peer} -> P2p.pool p2p_layer - |> Option.iter_s (fun pool -> P2p_pool.Peers.ban pool peer) + |> Option.iter_s (fun pool -> P2p_pool.Peers.ban pool peer.peer_id) in loop output_stream in @@ -416,9 +441,7 @@ let transport_layer_inputs_handler gs_worker p2p_layer advertised_px_cache = let open Lwt_syntax in let rec loop () = let* conn, msg = P2p.recv_any p2p_layer in - let {P2p_connection.Info.peer_id = from_peer; _} = - P2p.connection_info p2p_layer conn - in + let from_peer = peer_of_connection p2p_layer conn in Worker.( In_message { diff --git a/src/lib_dal_node/gossipsub/transport_layer_interface.ml b/src/lib_dal_node/gossipsub/transport_layer_interface.ml index 4b48d16fcf6c..30d9100fede2 100644 --- a/src/lib_dal_node/gossipsub/transport_layer_interface.ml +++ b/src/lib_dal_node/gossipsub/transport_layer_interface.ml @@ -30,7 +30,7 @@ module Types = Tezos_dal_node_services.Types Version this type to ease future migrations. *) module P2p_message_V1 = struct - type px_peer = {point : P2p_point.Id.t; peer : P2p_peer.Id.t} + type px_peer = Types.Peer.t type p2p_message = | Graft of {topic : Types.Topic.t} @@ -52,10 +52,11 @@ module P2p_message_V1 = struct let px_peer_encoding = let open Data_encoding in conv - (fun {point; peer} -> (point, peer)) - (fun (point, peer) -> {point; peer}) + (fun Types.Peer.{maybe_reachable_point; peer_id} -> + (maybe_reachable_point, peer_id)) + (fun (maybe_reachable_point, peer_id) -> {maybe_reachable_point; peer_id}) (obj2 - (req "point" P2p_point.Id.encoding) + (req "maybe_reachable_point" P2p_point.Id.encoding) (req "peer" P2p_peer.Id.encoding)) (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5564 @@ -161,7 +162,7 @@ module P2p_message_V1 = struct Types.Topic.pp topic (pp_list Types.Peer.pp) - (List.of_seq px |> List.map (fun px_peer -> px_peer.peer)) + (List.of_seq px) Types.Span.pp backoff | IHave {topic; message_ids} -> diff --git a/src/lib_dal_node/gossipsub/transport_layer_interface.mli b/src/lib_dal_node/gossipsub/transport_layer_interface.mli index b33114a0c2fa..f3b7a9102560 100644 --- a/src/lib_dal_node/gossipsub/transport_layer_interface.mli +++ b/src/lib_dal_node/gossipsub/transport_layer_interface.mli @@ -41,7 +41,7 @@ module Types = Tezos_dal_node_services.Types (** Peers exchanged via PX. [point] represents the (address, port) pair of the exchanged peer, while [peer] represents the cryptographic identity of the peer. *) -type px_peer = {point : P2p_point.Id.t; peer : P2p_peer.Id.t} +type px_peer = Types.Peer.t (** Without piggybacking, {!p2p_message} is almost identical to {!Gs_interface.p2p_message}, except that for the [Prune] case, diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index 7b24df1b0c9d..d86eb58ff154 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -180,7 +180,7 @@ module Message = struct (req "shard_proof" Cryptobox.shard_proof_encoding)) end -module Connection = struct +module Peer = struct type t = {peer_id : P2p_peer.Id.t; maybe_reachable_point : P2p_point.Id.t} module Cmp = struct @@ -218,8 +218,6 @@ module Connection = struct maybe_reachable_point end -module Peer = P2p_peer.Id - module Point = struct type t = P2p_point.Id.t diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index 17b79fb3bb74..de4931e32503 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -112,7 +112,7 @@ module Message : sig include ENCODABLE with type t := t end -(** A connection from the point of view of gossipsub. *) +(** A peer from the point of view of gossipsub. *) module Peer : sig (** For incoming connections, we know the peer is reachable, and the peer's address and the port are provided by the octez-p2p layer. @@ -139,25 +139,6 @@ module Peer : sig module Map : Map.S with type key = t end -(** From the Gossipsub point of view, a peer is given by a cryptographic node - identity {!P2p_peer.Id.t}. It's up to the caller to associate the - {!P2p_peer.Id.t} to a {!P2p_point.Id.t} if needed (to e.g. implement peers - exchange, which needs addresses and ports instead of cryptographic - identities). *) -module Peer : sig - type t = P2p_peer.Id.t - - include PRINTABLE with type t := t - - include ENCODABLE with type t := t - - include COMPARABLE with type t := t - - module Set : Set.S with type elt = t - - module Map : Map.S with type key = t -end - (** A point is made of an IP address and a port. Only the worker knows about the notion. The automaton only sees peers (i.e. cryptographic identities of nodes). *) -- GitLab From 37d69465c99429699f34217a26ba4b8ef3062b94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Wed, 23 Oct 2024 18:59:11 +0200 Subject: [PATCH 3/6] DAL/Node: Handle the disconnection properly --- .../gossipsub/gs_transport_connection.ml | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.ml b/src/lib_dal_node/gossipsub/gs_transport_connection.ml index 547274e2c3df..e34f5ca7dd02 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.ml +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.ml @@ -265,11 +265,32 @@ let new_connections_handler px_cache gs_worker p2p_layer peer_id conn = (** This handler forwards information about P2P disconnections to the Gossipsub worker. *) -let disconnections_handler gs_worker _peer = - (* Will be fixed in the next commit. The diff of this commit will - already be quite difficult to review. *) - let peer = assert false in - Worker.(Disconnection {peer} |> p2p_input gs_worker) +let disconnections_handler gs_worker peer_id = + let open GS.Introspection in + (* When this callback is called, we only have the [peer_id] and not + the [maybe_reachable_point]. + + It can be reconstructed in many ways: + + - We could do it via the octez-p2p + + - We can find it in the automaton state (choosen option) + + Last option does not have the good complexity but is simple enough. + *) + let view = Worker.state gs_worker in + let {connections; _} = view in + (* Complexity is wrong, but the number of connections should not be + too large, so it should be ok in practice. *) + let bindings = Connections.bindings connections in + let value = + List.find_opt + (fun (peer, _) -> P2p_peer.Id.equal peer.Types.Peer.peer_id peer_id) + bindings + in + match value with + | None -> (* Something is off, we should log something probably. *) () + | Some (peer, _) -> Worker.(Disconnection {peer} |> p2p_input gs_worker) (* This function translates a Worker p2p_message to the type of messages sent via the P2P layer. The two types don't coincide because of Prune. *) -- GitLab From 34bbe8ed8abfcbef77294869acdca1c53a9f9d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Wed, 23 Oct 2024 17:58:21 +0200 Subject: [PATCH 4/6] DAL/Node: Remove the PX cache With the new representation for peers, the cache is not needed anymore. This commit also removes a check whether we are already connected to a peer. I don't think it is necessary and does not cover some cases: - It can't detect if we are already connected to a peer with an incoming connection - We are already connected to this peer ID with a different point --- .../gossipsub/gs_transport_connection.ml | 245 +++--------------- 1 file changed, 34 insertions(+), 211 deletions(-) diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.ml b/src/lib_dal_node/gossipsub/gs_transport_connection.ml index e34f5ca7dd02..c59539b0dc75 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.ml +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.ml @@ -88,122 +88,6 @@ module Events = struct ("message", Transport_layer_interface.p2p_message_encoding) end -(** This module implements a cache of alternative peers (aka PX peers), that is, - peers we are not (yet) connected to, advertised by P2P neighbors within - Prune messages. - - The cache associates a P2P point to each pair made of the advertised peer - and the peer that advertised it. In case of redundancy, the last advertised - point is kept in the cache. - - The cache is in theory not bounded. However, some invariants on the way - {!insert} and {!drop} are used below ensures that we remove the added entries very - quickly. -*) -module PX_cache : sig - (** The cache data structure for advertised alternative PXs. *) - type t - - type origin = Worker.peer_origin - - (** Create a new cache data structure. The [size] parameter is an indication - on the size of internal table storing data. Its default value is - [2048]. *) - val create : ?size:int -> unit -> t - - (** [insert t ~origin ~px_peer point] associates the given [point] to - [(origin, px_peer)] pair of peers. If a point already exists for the pair, - it is overwritten. *) - val insert : - t -> origin:origin -> px_peer:P2p_peer.Id.t -> P2p_point.Id.t -> unit - - (** [find_opt t ~origin ~px_peer] returns the content associated to the entry - [(origin, px)] in the cache, if any. *) - val find_opt : - t -> origin:origin -> px_peer:P2p_peer.Id.t -> P2p_point.Id.t option - - (** [drop t ~origin ~px_peer] drops the entry [(origin, px_peer)] from the cache. *) - val drop : t -> origin:origin -> px_peer:P2p_peer.Id.t -> unit -end = struct - type origin = Worker.peer_origin - - type key = {origin : origin; px_peer : P2p_peer.Id.t} - - module Table = Hashtbl.Make (struct - type t = key - - let equal {origin; px_peer} k2 = - P2p_peer.Id.equal px_peer k2.px_peer - && - match (origin, k2.origin) with - | PX peer1, PX peer2 -> P2p_peer.Id.equal peer1.peer_id peer2.peer_id - | Trusted, Trusted -> true - | PX _, _ | Trusted, _ -> false - - let hash {origin; px_peer} = - match origin with - | PX peer -> - (P2p_peer.Id.hash px_peer * 31) + P2p_peer.Id.hash peer.peer_id - | Trusted -> P2p_peer.Id.hash px_peer * 17 - end) - - type t = P2p_point.Id.t Table.t - - let create ?(size = 2048) () = Table.create size - - let insert table ~origin ~px_peer point = - Table.replace table {px_peer; origin} point - - let drop table ~origin ~px_peer = Table.remove table {origin; px_peer} - - let find_opt table ~origin ~px_peer = Table.find_opt table {origin; px_peer} -end - -(* [px_peer_of_peer p2p_layer peer] returns the public IP address and port at which - [peer] could be reached. For that, it first inspects information transmitted - via connection metadata by the remote [peer]. If the address or port are - missing from the connection's metadata, the function inspects the Internet - connection link's information. It returns [None] if it does manage to get - those information with both methods. *) -let px_peer_of_peer p2p_layer peer_id = - let open Option_syntax in - let open Transport_layer_interface in - let* conn = P2p.find_connection_by_peer_id p2p_layer peer_id in - (* In general, people either provide an address and a port, or just a port. - In any case, we use `P2p.connection_remote_metadata` to get the address and - the port of the provided values, and we fall back to the values given by - `P2p.connection_info` if the former are not available - (respectively/independently for the address and the port). The first case - is covered by {!P2p.connection_remote_metadata}. But if the IP address is - not explicitly given, we rely on the function {!P2p.connection_info - p2p_layer conn}. *) - let Types.P2P.Metadata.Connection. - {advertised_net_addr; advertised_net_port; is_bootstrap_peer = _} = - P2p.connection_remote_metadata p2p_layer conn - in - let {P2p_connection.Info.id_point = conn_addr, conn_port_opt; _} = - P2p.connection_info p2p_layer conn - in - let addr = Option.value advertised_net_addr ~default:conn_addr in - let* port = Option.either advertised_net_port conn_port_opt in - return Types.Peer.{maybe_reachable_point = (addr, port); peer_id} - -(* FIXME: https://gitlab.com/tezos/tezos/-/issues/6637 - - When adding an RPC to trust peers/points. Use this function when trusting a peer - via an RPC. *) - -(* This function inserts the point of a trusted peer into the [px_cache] - table. *) -let cache_point_of_trusted_peer p2p_layer px_cache peer = - px_peer_of_peer p2p_layer peer - |> Option.iter (fun Types.Peer.{maybe_reachable_point; peer_id} -> - PX_cache.insert - px_cache - ~origin:Trusted - ~px_peer:peer_id - maybe_reachable_point) - let peer_of_connection p2p_layer conn = let open Types.P2P.Metadata.Connection in let {P2p_connection.Info.peer_id; id_point; remote_metadata; _} = @@ -234,7 +118,7 @@ let peer_of_connection p2p_layer conn = outbound to avoid possible love bombing attacks. The Rust version also implements a way to mitigate this risk, but not the Go implementation. *) -let new_connections_handler px_cache gs_worker p2p_layer peer_id conn = +let new_connections_handler gs_worker p2p_layer peer_id conn = let P2p_connection.Info.{id_point = addr, port_opt; _} = P2p.connection_info p2p_layer conn in @@ -258,7 +142,6 @@ let new_connections_handler px_cache gs_worker p2p_layer peer_id conn = Add the ability to have direct peers. *) let direct = false in - if trusted then cache_point_of_trusted_peer p2p_layer px_cache peer_id ; let peer = peer_of_connection p2p_layer conn in Worker.( New_connection {peer; direct; trusted; bootstrap} |> p2p_input gs_worker) @@ -309,27 +192,12 @@ let wrap_p2p_message _p2p_layer = (* This function translates a message received via the P2P layer to a Worker p2p_message. The two types don't coincide because of Prune. *) -let unwrap_p2p_message p2p_layer ~from_peer px_cache = +let unwrap_p2p_message _p2p_layer ~from_peer:_ = let open Worker in let module I = Transport_layer_interface in function | I.Graft {topic} -> Graft {topic} - | I.Prune {topic; px; backoff} -> - let px = - Seq.map - (fun (Types.Peer.{maybe_reachable_point; peer_id} as peer) -> - if - Option.is_none @@ P2p.find_connection_by_peer_id p2p_layer peer_id - then - PX_cache.insert - px_cache - ~origin:(PX from_peer) - ~px_peer:peer_id - maybe_reachable_point ; - peer) - px - in - Prune {topic; px; backoff} + | I.Prune {topic; px; backoff} -> Prune {topic; px; backoff} | I.IHave {topic; message_ids} -> IHave {topic; message_ids} | I.IWant {message_ids} -> IWant {message_ids} | I.Subscribe {topic} -> Subscribe {topic} @@ -337,71 +205,32 @@ let unwrap_p2p_message p2p_layer ~from_peer px_cache = | I.Message_with_header {message; topic; message_id} -> Message_with_header {message; topic; message_id} -let try_connect_point ?expected_peer_id p2p_layer point = - let open Lwt_syntax in - match P2p.pool p2p_layer with - | None -> return_unit - | Some pool -> - if Option.is_some @@ P2p_pool.Connection.find_by_point pool point then - return_unit (* already connected. *) - else - (* We don't wait for the promise to resolve here, because if the - advertised peer is not reachable or is not responding, we might block - until connection timeout is reached (we observed a timeout of 10 - seconds in some case). Blocking here means that processing of other - messages from p2p_output_stream (including shards propagation) will - be delayed. *) - Lwt.dont_wait - (fun () -> - let* (_ : _ P2p.connection tzresult) = - P2p.connect ?expected_peer_id p2p_layer point - in - return_unit) - (fun exn -> - Format.eprintf - "Warning: got an exception while trying to connect to %a: %s@." - Point.pp - point - (Printexc.to_string exn)) - |> return - -let try_connect p2p_layer px_cache ~px_peer ~origin = +let try_connect ?expected_peer_id p2p_layer point = let open Lwt_syntax in - (* If there is some [point] associated to [px_peer] and advertised by [origin] - on the [px_cache], we will try to connect to it. *) - match PX_cache.find_opt px_cache ~px_peer ~origin with - | Some point -> - (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5799 - - We may have an issue as described by the following scenario: - - A legit pair [(point, peer)] is already known by P2P, but the remote - peer is disconnected for some reason; - - Some (malicious) peer advertises a fake px_peer [(point, peer')], where - [peer'] is not the real peer id associated to the [point]; - - We try to connect to [point], setting at the same time that [peer'] - is the expected peer id for this point; - - The connection fails, but the old (correct) association [(point, - peer)] is lost. - - We may want to revert [peer'] association to [point]. But we sill have - an issue in case [(point, peer)] is actually the fake info and [(point, - peer')] the legit association but the node is disconnected when trying - to connect to it. - - This implementation will be hardened once we add the notion of "signed - records" found, e.g., in Rust version, to check that the advertised - (peer, point) pair alongside a timestamp are not faked. *) - let* () = try_connect_point ~expected_peer_id:px_peer p2p_layer point in - (match origin with - | Trusted -> () (* Don't drop trusted points. *) - | PX _ -> PX_cache.drop px_cache ~px_peer ~origin) ; - return_unit - | _ -> return_unit + (* We don't wait for the promise to resolve here, because if the + advertised peer is not reachable or is not responding, we might block + until connection timeout is reached (we observed a timeout of 10 + seconds in some case). Blocking here means that processing of other + messages from p2p_output_stream (including shards propagation) will + be delayed. *) + Lwt.dont_wait + (fun () -> + let* (_ : _ P2p.connection tzresult) = + P2p.connect ?expected_peer_id p2p_layer point + in + return_unit) + (fun exn -> + Format.eprintf + "Warning: got an exception while trying to connect to %a: %s@." + Point.pp + point + (Printexc.to_string exn)) + |> return (** This handler pops and processes the items put by the worker in the p2p output stream. The out messages are sent to the corresponding peers and the directives to the P2P layer to connect or disconnect peers are handled. *) -let gs_worker_p2p_output_handler gs_worker p2p_layer px_cache = +let gs_worker_p2p_output_handler gs_worker p2p_layer = let open Lwt_syntax in (* only log sending of GS control messages *) let log_sending_message = function @@ -442,12 +271,11 @@ let gs_worker_p2p_output_handler gs_worker p2p_layer px_cache = P2p.find_connection_by_peer_id p2p_layer peer.peer_id |> Option.iter_s (P2p.disconnect ~reason:"disconnected by Gossipsub" p2p_layer) - | Connect {peer; origin} -> - try_connect p2p_layer px_cache ~px_peer:peer.peer_id ~origin - | Connect_point {point} -> try_connect_point p2p_layer point - | Forget {peer; origin} -> - PX_cache.drop px_cache ~px_peer:peer.peer_id ~origin:(PX origin) ; - return_unit + | Connect {peer; origin = _} -> + let Types.Peer.{maybe_reachable_point; peer_id} = peer in + try_connect ~expected_peer_id:peer_id p2p_layer maybe_reachable_point + | Connect_point {point} -> try_connect p2p_layer point + | Forget _ -> return_unit | Kick {peer} -> P2p.pool p2p_layer |> Option.iter_s (fun pool -> P2p_pool.Peers.ban pool peer.peer_id) @@ -458,18 +286,14 @@ let gs_worker_p2p_output_handler gs_worker p2p_layer px_cache = (** This handler forwards p2p messages received via Octez p2p to the Gossipsub worker. *) -let transport_layer_inputs_handler gs_worker p2p_layer advertised_px_cache = +let transport_layer_inputs_handler gs_worker p2p_layer = let open Lwt_syntax in let rec loop () = let* conn, msg = P2p.recv_any p2p_layer in let from_peer = peer_of_connection p2p_layer conn in Worker.( In_message - { - from_peer; - p2p_message = - unwrap_p2p_message p2p_layer ~from_peer advertised_px_cache msg; - } + {from_peer; p2p_message = unwrap_p2p_message p2p_layer ~from_peer msg} |> p2p_input gs_worker) ; loop () in @@ -494,17 +318,16 @@ let app_messages_handler gs_worker ~app_messages_callback = Worker.app_output_stream gs_worker |> loop let activate gs_worker p2p_layer ~app_messages_callback = - let px_cache = PX_cache.create () in (* Register a handler to notify new P2P connections to GS. *) let () = - new_connections_handler px_cache gs_worker p2p_layer + new_connections_handler gs_worker p2p_layer |> P2p.on_new_connection p2p_layer in (* Register a handler to notify P2P disconnections to GS. *) let () = disconnections_handler gs_worker |> P2p.on_disconnection p2p_layer in Lwt.join [ - gs_worker_p2p_output_handler gs_worker p2p_layer px_cache; - transport_layer_inputs_handler gs_worker p2p_layer px_cache; + gs_worker_p2p_output_handler gs_worker p2p_layer; + transport_layer_inputs_handler gs_worker p2p_layer; app_messages_handler gs_worker ~app_messages_callback; ] -- GitLab From c9a9851e32fa5d4284ed406c550cdca966eab68e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Wed, 23 Oct 2024 18:04:59 +0200 Subject: [PATCH 5/6] DAL/Node: Unify messages --- src/lib_dal_node/gossipsub/gossipsub.ml | 2 +- .../gossipsub/gs_transport_connection.ml | 43 +++---------------- .../gossipsub/gs_transport_connection.mli | 2 +- .../gossipsub/transport_layer_interface.ml | 22 +++------- .../gossipsub/transport_layer_interface.mli | 26 +++-------- 5 files changed, 18 insertions(+), 77 deletions(-) diff --git a/src/lib_dal_node/gossipsub/gossipsub.ml b/src/lib_dal_node/gossipsub/gossipsub.ml index e1693b30a9a8..5bb7763d3886 100644 --- a/src/lib_dal_node/gossipsub/gossipsub.ml +++ b/src/lib_dal_node/gossipsub/gossipsub.ml @@ -39,7 +39,7 @@ module Transport_layer = struct module Default_parameters = Transport_layer_default_parameters type t = - ( Interface.p2p_message, + ( Gs_interface.Worker_instance.p2p_message, Types.P2P.Metadata.Peer.t, Types.P2P.Metadata.Connection.t ) P2p.t diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.ml b/src/lib_dal_node/gossipsub/gs_transport_connection.ml index c59539b0dc75..2f4c192bd3fd 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.ml +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.ml @@ -175,36 +175,6 @@ let disconnections_handler gs_worker peer_id = | None -> (* Something is off, we should log something probably. *) () | Some (peer, _) -> Worker.(Disconnection {peer} |> p2p_input gs_worker) -(* This function translates a Worker p2p_message to the type of messages sent - via the P2P layer. The two types don't coincide because of Prune. *) -let wrap_p2p_message _p2p_layer = - let module W = Worker in - let open Transport_layer_interface in - function - | W.Graft {topic} -> Graft {topic} - | W.Prune {topic; px; backoff} -> Prune {topic; px; backoff} - | W.IHave {topic; message_ids} -> IHave {topic; message_ids} - | W.IWant {message_ids} -> IWant {message_ids} - | W.Subscribe {topic} -> Subscribe {topic} - | W.Unsubscribe {topic} -> Unsubscribe {topic} - | W.Message_with_header {message; topic; message_id} -> - Message_with_header {message; topic; message_id} - -(* This function translates a message received via the P2P layer to a Worker - p2p_message. The two types don't coincide because of Prune. *) -let unwrap_p2p_message _p2p_layer ~from_peer:_ = - let open Worker in - let module I = Transport_layer_interface in - function - | I.Graft {topic} -> Graft {topic} - | I.Prune {topic; px; backoff} -> Prune {topic; px; backoff} - | I.IHave {topic; message_ids} -> IHave {topic; message_ids} - | I.IWant {message_ids} -> IWant {message_ids} - | I.Subscribe {topic} -> Subscribe {topic} - | I.Unsubscribe {topic} -> Unsubscribe {topic} - | I.Message_with_header {message; topic; message_id} -> - Message_with_header {message; topic; message_id} - let try_connect ?expected_peer_id p2p_layer point = let open Lwt_syntax in (* We don't wait for the promise to resolve here, because if the @@ -255,13 +225,13 @@ let gs_worker_p2p_output_handler gs_worker p2p_layer = Events.(emit no_connection_for_peer to_peer.peer_id) | Some conn -> ( let* (res : unit tzresult) = - let msg = wrap_p2p_message p2p_layer p2p_message in let* () = if log_sending_message p2p_message then - Events.(emit send_p2p_message (to_peer.peer_id, msg)) + Events.( + emit send_p2p_message (to_peer.peer_id, p2p_message)) else return_unit in - P2p.send p2p_layer conn msg + P2p.send p2p_layer conn p2p_message in match res with | Ok () -> return_unit @@ -289,12 +259,9 @@ let gs_worker_p2p_output_handler gs_worker p2p_layer = let transport_layer_inputs_handler gs_worker p2p_layer = let open Lwt_syntax in let rec loop () = - let* conn, msg = P2p.recv_any p2p_layer in + let* conn, p2p_message = P2p.recv_any p2p_layer in let from_peer = peer_of_connection p2p_layer conn in - Worker.( - In_message - {from_peer; p2p_message = unwrap_p2p_message p2p_layer ~from_peer msg} - |> p2p_input gs_worker) ; + Worker.(In_message {from_peer; p2p_message} |> p2p_input gs_worker) ; loop () in loop () diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.mli b/src/lib_dal_node/gossipsub/gs_transport_connection.mli index a9de70f67293..e9f7c2bc8dea 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.mli +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.mli @@ -35,7 +35,7 @@ *) val activate : Gs_interface.Worker_instance.t -> - ( Transport_layer_interface.p2p_message, + ( Gs_interface.Worker_instance.p2p_message, Types.P2P.Metadata.Peer.t, Types.P2P.Metadata.Connection.t ) P2p.t -> diff --git a/src/lib_dal_node/gossipsub/transport_layer_interface.ml b/src/lib_dal_node/gossipsub/transport_layer_interface.ml index 30d9100fede2..1f2bb98034c1 100644 --- a/src/lib_dal_node/gossipsub/transport_layer_interface.ml +++ b/src/lib_dal_node/gossipsub/transport_layer_interface.ml @@ -32,22 +32,7 @@ module Types = Tezos_dal_node_services.Types module P2p_message_V1 = struct type px_peer = Types.Peer.t - type p2p_message = - | Graft of {topic : Types.Topic.t} - | Prune of { - topic : Types.Topic.t; - px : px_peer Seq.t; - backoff : Types.Span.t; - } - | IHave of {topic : Types.Topic.t; message_ids : Types.Message_id.t list} - | IWant of {message_ids : Types.Message_id.t list} - | Subscribe of {topic : Types.Topic.t} - | Unsubscribe of {topic : Types.Topic.t} - | Message_with_header of { - message : Types.Message.t; - topic : Types.Topic.t; - message_id : Types.Message_id.t; - } + type p2p_message = Gs_interface.Worker_instance.p2p_message let px_peer_encoding = let open Data_encoding in @@ -64,6 +49,7 @@ module P2p_message_V1 = struct DAL/GS: bound the lists/seqs in exchanged p2p messages. *) let p2p_message_app_encoding = let open Data_encoding in + let open Gs_interface.Worker_instance in let case ?max_length ~tag ~title encoding unwrap wrap = P2p_params.Encoding {tag; title; encoding; wrap; unwrap; max_length} in @@ -153,7 +139,9 @@ module P2p_message_V1 = struct let pp_list pp_elt = Format.pp_print_list ~pp_sep:(fun fmt () -> Format.fprintf fmt "; ") pp_elt - let pp_p2p_message fmt = function + let pp_p2p_message fmt = + let open Gs_interface.Worker_instance in + function | Graft {topic} -> Format.fprintf fmt "Graft{topic=%a}" Types.Topic.pp topic | Prune {topic; px; backoff} -> Format.fprintf diff --git a/src/lib_dal_node/gossipsub/transport_layer_interface.mli b/src/lib_dal_node/gossipsub/transport_layer_interface.mli index f3b7a9102560..508275946999 100644 --- a/src/lib_dal_node/gossipsub/transport_layer_interface.mli +++ b/src/lib_dal_node/gossipsub/transport_layer_interface.mli @@ -43,29 +43,15 @@ module Types = Tezos_dal_node_services.Types peer. *) type px_peer = Types.Peer.t -(** Without piggybacking, {!p2p_message} is almost identical to - {!Gs_interface.p2p_message}, except that for the [Prune] case, - {!P2p_peer.Id.t} elements in [px] are augmented by their {!P2p_point.Id.t} - counterpart. *) -type p2p_message = - | Graft of {topic : Types.Topic.t} - | Prune of {topic : Types.Topic.t; px : px_peer Seq.t; backoff : Types.Span.t} - | IHave of {topic : Types.Topic.t; message_ids : Types.Message_id.t list} - | IWant of {message_ids : Types.Message_id.t list} - | Subscribe of {topic : Types.Topic.t} - | Unsubscribe of {topic : Types.Topic.t} - | Message_with_header of { - message : Types.Message.t; - topic : Types.Topic.t; - message_id : Types.Message_id.t; - } +val p2p_message_encoding : + Gs_interface.Worker_instance.p2p_message Data_encoding.t -val p2p_message_encoding : p2p_message Data_encoding.t - -val pp_p2p_message : Format.formatter -> p2p_message -> unit +val pp_p2p_message : + Format.formatter -> Gs_interface.Worker_instance.p2p_message -> unit (** A P2P message config is parameterized by the network's name. *) val message_config : - network_name:string -> p2p_message P2p_params.message_config + network_name:string -> + Gs_interface.Worker_instance.p2p_message P2p_params.message_config val version : network_name:string -> Network_version.t -- GitLab From 1a1964bc627777899a93f97e5d2f4c34bfdff987 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Wed, 23 Oct 2024 22:57:37 +0200 Subject: [PATCH 6/6] Tezt/DAL: Adapt to the new RPC format --- tezt/lib_tezos/dal_common.ml | 5 ++++- tezt/tests/dal.ml | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/tezt/lib_tezos/dal_common.ml b/tezt/lib_tezos/dal_common.ml index c0bbc99fee51..017cd7b7a930 100644 --- a/tezt/lib_tezos/dal_common.ml +++ b/tezt/lib_tezos/dal_common.ml @@ -430,7 +430,10 @@ module Dal_RPC = struct in let as_topic_and_peers json = let topic = get "topic" json |> as_topic in - let peers = get "peers" json |> as_list |> List.map as_string in + let peers = + get "peers" json |> as_list + |> List.map (fun x -> x |-> "peer_id" |> as_string) + in (topic, peers) in make ~query_string GET ["p2p"; "gossipsub"; "topics"; "peers"] (fun json -> diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index f15496f5f091..be396b67c104 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -7021,7 +7021,7 @@ let test_rpc_get_connections _protocol dal_parameters _cryptobox node client |> List.map JSON.( fun json -> - let peer = json |-> "peer" |> as_string in + let peer = json |-> "peer" |-> "peer_id" |> as_string in let connection = json |-> "connection" in let bootstrap = connection |-> "bootstrap" |> as_bool in let topics = -- GitLab