From 8d959a36026830e56bd4ba0464a9381c8bcf3525 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Wed, 19 Feb 2025 08:23:11 +0100 Subject: [PATCH 1/4] Gossipsub: introduce a Ping message which is retrocompatible Co-authored-by: Paul Laforgue --- src/lib_dal_node/gossipsub/gs_logging.ml | 4 ++++ .../gossipsub/transport_layer_interface.ml | 22 +++++++++++++++++-- src/lib_gossipsub/gossipsub_intf.ml | 1 + src/lib_gossipsub/gossipsub_worker.ml | 9 ++++++-- 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/lib_dal_node/gossipsub/gs_logging.ml b/src/lib_dal_node/gossipsub/gs_logging.ml index b93d584eb8c9..b899f42c3019 100644 --- a/src/lib_dal_node/gossipsub/gs_logging.ml +++ b/src/lib_dal_node/gossipsub/gs_logging.ml @@ -132,6 +132,9 @@ module Events = struct ("peer", P2p_peer.Id.encoding) ("topic", Types.Topic.encoding) + let ping = + declare_0 ~section ~name:(prefix "ping") ~msg:"Process Ping" ~level:Info () + let unsubscribe = declare_2 ~section @@ -213,6 +216,7 @@ let event = | Disconnection {peer} -> emit disconnection peer | In_message {from_peer; p2p_message} -> ( match p2p_message with + | Ping -> emit ping () | Message_with_header {message = _; topic; message_id} -> emit message_with_header (from_peer.peer_id, topic, message_id) | Subscribe {topic} -> emit subscribe (from_peer.peer_id, topic) diff --git a/src/lib_dal_node/gossipsub/transport_layer_interface.ml b/src/lib_dal_node/gossipsub/transport_layer_interface.ml index e9f00b9f560e..f8a0323b0670 100644 --- a/src/lib_dal_node/gossipsub/transport_layer_interface.ml +++ b/src/lib_dal_node/gossipsub/transport_layer_interface.ml @@ -44,6 +44,11 @@ module P2p_message_V1 = struct (req "maybe_reachable_point" P2p_point.Id.encoding) (req "peer" P2p_peer.Id.encoding)) + (* This ping payload is to be considered as a dummy one using is the + pkh zero address. *) + let ping_topic = + Types.Topic.{slot_index = 0; pkh = Signature.Public_key_hash.zero} + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5564 DAL/GS: bound the lists/seqs in exchanged p2p messages. *) @@ -95,14 +100,26 @@ module P2p_message_V1 = struct (req "message_ids" (list Types.Message_id.encoding))) (function IWant {message_ids} -> Some ((), message_ids) | _ -> None) (fun ((), message_ids) -> IWant {message_ids}); + (* In the following encoding we introduce a special case such that the [Ping] is + encoded as a [Subscribe] using a dummy payload [ping_topic] and the p2p messages + encoding remain compatible. + + FIXME/DAL: https://gitlab.com/tezos/tezos/-/issues/7768 + + Define a [Ping] own message encoding when bumping the p2p message version. + *) case ~tag:0x14 ~title:"Subscribe" (obj2 (req "kind" (constant "subscribe")) (req "topic" Types.Topic.encoding)) - (function Subscribe {topic} -> Some ((), topic) | _ -> None) - (fun ((), topic) -> Subscribe {topic}); + (function + | Subscribe {topic} -> Some ((), topic) + | Ping -> Some ((), ping_topic) + | _ -> None) + (fun ((), topic) -> + if Types.Topic.(topic = ping_topic) then Ping else Subscribe {topic}); case ~tag:0x15 ~title:"Unsubscribe" @@ -183,6 +200,7 @@ module P2p_message_V1 = struct "FullMessage{message_id=%a}" Types.Message_id.pp message_id + | Ping -> Format.fprintf fmt "Ping" let distributed_db_version = Distributed_db_version.zero diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 9f35c46cbd46..fd78727c0ddf 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -1121,6 +1121,7 @@ module type WORKER = sig | Subscribe of {topic : GS.Topic.t} | Unsubscribe of {topic : GS.Topic.t} | Message_with_header of message_with_header + | Ping (** The different kinds of input events that could be received from the P2P layer. *) diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 3ccb3aa7737b..af03976f7be7 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -181,6 +181,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | Subscribe of {topic : Topic.t} | Unsubscribe of {topic : Topic.t} | Message_with_header of message_with_header + | Ping (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5323 @@ -307,7 +308,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | IWant _ -> Introspection.update_count_sent_iwants stats `Incr | Message_with_header _ -> Introspection.update_count_sent_app_messages stats `Incr - | Subscribe _ | Unsubscribe _ -> ()) + | Subscribe _ | Unsubscribe _ | Ping -> ()) | Connect _ | Connect_point _ | Disconnect _ | Forget _ | Kick _ -> () in fun {connected_bootstrap_peers; p2p_output_stream; stats; _} ~mk_output -> @@ -323,7 +324,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : (* Don't emit app messages, send IHave messages or respond to IWant if the remote peer has a bootstrap profile. *) false - | Graft _ | Prune _ | Subscribe _ | Unsubscribe _ -> true) + | Graft _ | Prune _ | Subscribe _ | Unsubscribe _ | Ping -> true) | Connect _ | Connect_point _ | Disconnect _ | Forget _ | Kick _ -> true in @@ -763,6 +764,9 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : GS.handle_prune prune gossip_state |> update_gossip_state state |> handle_prune ~self ~from_peer px + | Ping -> + (* We treat [Ping] message as a no-op and return the current [state]. *) + state (** Handling events received from P2P layer. *) let apply_p2p_event ~self ({gossip_state; _} as state) = function @@ -1032,6 +1036,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Format.fprintf fmt "Unsubscribe{topic=%a}" Topic.pp topic | Message_with_header message_with_header -> pp_message_with_header fmt message_with_header + | Ping -> Format.fprintf fmt "Ping" let pp_p2p_output fmt = function | Disconnect {peer} -> Format.fprintf fmt "Disconnect{peer=%a}" Peer.pp peer -- GitLab From 13e73e1c06257fbce18f690d926fd831d9de0a21 Mon Sep 17 00:00:00 2001 From: phink Date: Wed, 19 Feb 2025 12:22:44 +0100 Subject: [PATCH 2/4] Gossipsub: extend worker parameters with heartbeat_ping_interval --- src/bin_dal_node/worker_parameters.ml | 1 + .../gossipsub/gs_default_parameters.ml | 19 +++++++++++++++++++ src/lib_gossipsub/gossipsub_intf.ml | 2 ++ src/lib_gossipsub/test/default_limits.ml | 2 ++ .../test/test_gossipsub_shared.ml | 3 +++ 5 files changed, 27 insertions(+) diff --git a/src/bin_dal_node/worker_parameters.ml b/src/bin_dal_node/worker_parameters.ml index 6748d133c2e9..165b6662e81a 100644 --- a/src/bin_dal_node/worker_parameters.ml +++ b/src/bin_dal_node/worker_parameters.ml @@ -52,6 +52,7 @@ let limits = retain_duration; fanout_ttl; heartbeat_interval; + heartbeat_ping_interval; backoff_cleanup_ticks; score_cleanup_ticks; degree_low; diff --git a/src/lib_dal_node/gossipsub/gs_default_parameters.ml b/src/lib_dal_node/gossipsub/gs_default_parameters.ml index 5dca52a5059b..3313e33308d1 100644 --- a/src/lib_dal_node/gossipsub/gs_default_parameters.ml +++ b/src/lib_dal_node/gossipsub/gs_default_parameters.ml @@ -117,6 +117,25 @@ module Limits = struct let heartbeat_interval = Types.Span.of_int_s 1 + (* The ping should be performed by the heartbeat handler at least + every 45s, hence every [heartbeat_ping_interval] ticks when + [heartbeat_interval] = 1s. + + A 45-second ping interval is a reasonable choice as it sits in + the middle of the commonly recommended 30-60 second range for P2P + networks. + + This timing provides a good balance between maintaining + reliable connections and managing network overhead. + + It's frequent enough to detect stale peers before most + NAT/firewall timeouts (which typically occur after 60-120 + seconds) + *) + let heartbeat_ping_interval = + let heartbeat_interval = Types.Span.to_int_s heartbeat_interval in + 45 / heartbeat_interval + let backoff_cleanup_ticks = 15 let score_cleanup_ticks = 1 diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index fd78727c0ddf..73804c0caf7e 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -247,6 +247,8 @@ type ('topic, 'peer, 'message_id, 'span) limits = { subscribed to, then we don't track that topic anymore, that is, we delete it from the fanout map. *) heartbeat_interval : 'span; (** The time between heartbeats. *) + heartbeat_ping_interval : int; + (** Number of heartbeat ticks between consecutive pings. *) backoff_cleanup_ticks : int; (** The number of heartbeat ticks setting the frequency at which the backoffs are checked and potentially cleared. *) diff --git a/src/lib_gossipsub/test/default_limits.ml b/src/lib_gossipsub/test/default_limits.ml index fda29164be0c..2fab2da57d72 100644 --- a/src/lib_gossipsub/test/default_limits.ml +++ b/src/lib_gossipsub/test/default_limits.ml @@ -90,6 +90,8 @@ let default_limits ?(time_in_mesh_weight = 1.0) retain_duration = Milliseconds.Span.of_int_s 10; fanout_ttl = Milliseconds.Span.of_int_s 60; heartbeat_interval = Milliseconds.Span.of_int_s 1; + heartbeat_ping_interval = 45 / 1; + (* 45 / heartbeat_interval *) backoff_cleanup_ticks = 15; score_cleanup_ticks = 1; degree_low = 5; diff --git a/src/lib_gossipsub/test/test_gossipsub_shared.ml b/src/lib_gossipsub/test/test_gossipsub_shared.ml index ddb9f11948ad..9ba07142f11e 100644 --- a/src/lib_gossipsub/test/test_gossipsub_shared.ml +++ b/src/lib_gossipsub/test/test_gossipsub_shared.ml @@ -240,6 +240,7 @@ let pp_limits fmtr retain_duration; fanout_ttl; heartbeat_interval; + heartbeat_ping_interval; backoff_cleanup_ticks; score_cleanup_ticks; degree_low; @@ -276,6 +277,7 @@ let pp_limits fmtr retain_duration = %a;@;\ fanout_ttl = %a;@;\ heartbeat_interval = %a;@;\ + heartbeat_ping_interval = %d;@;\ backoff_cleanup_ticks = %d;@;\ score_cleanup_ticks = %d;@;\ degree_low = %d;@;\ @@ -312,6 +314,7 @@ let pp_limits fmtr fanout_ttl GS.Span.pp heartbeat_interval + heartbeat_ping_interval backoff_cleanup_ticks score_cleanup_ticks degree_low -- GitLab From 83006fbca3d5c35301fcc1374feece628c24b029 Mon Sep 17 00:00:00 2001 From: phink Date: Wed, 19 Feb 2025 15:39:02 +0100 Subject: [PATCH 3/4] Gossipsub: periodically emit ping messages --- src/lib_gossipsub/gossipsub_worker.ml | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index af03976f7be7..7e491d9f3767 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -633,13 +633,29 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Add more verification/attacks protections as done in Rust. *) - (** On a [Heartbeat] events, the worker sends graft and prune messages - following the automaton's output. It also sends [IHave] messages (computed - by the automaton as well). *) + let emit_pings state = + let gstate = state.gossip_state in + let gstate_view = View.view gstate in + let heartbeat_ping_interval = + Int64.of_int gstate_view.limits.heartbeat_ping_interval + in + let heartbeat_ticks = gstate_view.heartbeat_ticks in + let rem = Int64.rem heartbeat_ticks heartbeat_ping_interval in + if Int64.equal rem 0L then + GS.Introspection.Connections.iter + (fun peer _connection -> emit_p2p_message state Ping (Seq.return peer)) + gstate_view.connections + + (** On a [Heartbeat] events, the worker sends graft and prune + messages following the automaton's output. It also sends [IHave] + messages (computed by the automaton as well). It also send ping + messages to its for each peer in its connections every + [heartbeat_ping_interval] ticks. *) let handle_heartheat = function | state, GS.Heartbeat {to_graft; to_prune; noPX_peers} -> let gstate = state.gossip_state in let gstate_view = View.view gstate in + emit_pings state ; let iter pmap mk_msg = Peer.Map.iter (fun peer topicset -> -- GitLab From 04ca85b9fe0c252977853c89bd34389fa41c50a3 Mon Sep 17 00:00:00 2001 From: phink Date: Wed, 19 Feb 2025 13:43:07 +0100 Subject: [PATCH 4/4] Gossipsub: rename handle_heartheat into handle_heartbeat --- src/lib_gossipsub/gossipsub_worker.ml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 7e491d9f3767..dfdd743b48aa 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -651,7 +651,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : messages (computed by the automaton as well). It also send ping messages to its for each peer in its connections every [heartbeat_ping_interval] ticks. *) - let handle_heartheat = function + let handle_heartbeat = function | state, GS.Heartbeat {to_graft; to_prune; noPX_peers} -> let gstate = state.gossip_state in let gstate_view = View.view gstate in @@ -848,7 +848,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Do we want to detect cases where two successive [Heartbeat] events would be handled (e.g. because the first one is late)? *) GS.heartbeat gossip_state |> update_gossip_state state - |> handle_heartheat + |> handle_heartbeat | P2P_input event -> apply_p2p_event ~self state event | App_input event -> apply_app_event state event | Check_unknown_messages -> check_unknown_messages_id state -- GitLab