From 2f18cd01d8d0062c764282150d6bd4f5e1c7d36a Mon Sep 17 00:00:00 2001 From: Pietro Date: Thu, 16 Jun 2022 10:59:26 +0200 Subject: [PATCH 1/5] p2p: Fix the maintenance triggers In the connect handler, the Too_few and Too_many connections triggers were swapped. Author: Vivien Pelletier --- src/lib_p2p/p2p_connect_handler.ml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/lib_p2p/p2p_connect_handler.ml b/src/lib_p2p/p2p_connect_handler.ml index 38ae9e098751..3d788b442a76 100644 --- a/src/lib_p2p/p2p_connect_handler.ml +++ b/src/lib_p2p/p2p_connect_handler.ml @@ -2,7 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) -(* Copyright (c) 2021 Nomadic Labs, *) +(* Copyright (c) 2021-2022 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -200,15 +200,15 @@ let create_connection t p2p_conn id_point point_info peer_info (fun point_info -> P2p_pool.Points.remove_connected t.pool point_info) point_info ; P2p_pool.Peers.remove_connected t.pool peer_id ; - if t.config.max_connections <= P2p_pool.active_connections t.pool then ( - P2p_trigger.broadcast_too_many_connections t.triggers ; - t.log Too_many_connections) ; + if P2p_pool.active_connections t.pool < t.config.min_connections then ( + P2p_trigger.broadcast_too_few_connections t.triggers ; + t.log Too_few_connections) ; Lwt_pipe.Maybe_bounded.close messages ; P2p_conn.close conn) ; List.iter (fun f -> f peer_id conn) t.new_connection_hook ; - if P2p_pool.active_connections t.pool < t.config.min_connections then ( - P2p_trigger.broadcast_too_few_connections t.triggers ; - t.log Too_few_connections) ; + if t.config.max_connections <= P2p_pool.active_connections t.pool then ( + P2p_trigger.broadcast_too_many_connections t.triggers ; + t.log Too_many_connections) ; conn let is_acceptable t connection_point_info peer_info incoming version = -- GitLab From fcebf9b79d4dfb8117604591663cc20d72369912 Mon Sep 17 00:00:00 2001 From: Pietro Date: Fri, 17 Jun 2022 16:29:49 +0200 Subject: [PATCH 2/5] p2p: add two new p2p events and move two others Add Maintenance_started, Maintenance_ended Move Too_few_connections, Too_many_connections from connect_handle to maintenace --- src/lib_base/p2p_connection.ml | 6 +++++- src/lib_base/p2p_connection.mli | 4 +++- src/lib_p2p/p2p_events.ml | 2 +- src/lib_p2p/p2p_maintenance.ml | 6 +++++- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/lib_base/p2p_connection.ml b/src/lib_base/p2p_connection.ml index 2a1ba2bac113..646b64c32f7b 100644 --- a/src/lib_base/p2p_connection.ml +++ b/src/lib_base/p2p_connection.ml @@ -2,7 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) -(* Copyright (c) 2019 Nomadic Labs, *) +(* Copyright (c) 2019-2022 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -172,6 +172,8 @@ module P2p_event = struct type t = | Too_few_connections | Too_many_connections + | Maintenance_started + | Maintenance_ended | New_point of P2p_point.Id.t | New_peer of P2p_peer_id.t | Gc_points @@ -201,6 +203,8 @@ module P2p_event = struct match event with | Too_few_connections -> Format.pp_print_string ppf "Too_few_connections" | Too_many_connections -> Format.pp_print_string ppf "Too_many_connections" + | Maintenance_started -> Format.pp_print_string ppf "Maintenance_started" + | Maintenance_ended -> Format.pp_print_string ppf "Maintenance_ended" | New_point p -> Format.pp_print_string ppf "New_point " ; P2p_point.Id.pp ppf p diff --git a/src/lib_base/p2p_connection.mli b/src/lib_base/p2p_connection.mli index ad823dd05cc7..1bfb57320053 100644 --- a/src/lib_base/p2p_connection.mli +++ b/src/lib_base/p2p_connection.mli @@ -2,7 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) -(* Copyright (c) 2019 Nomadic Labs, *) +(* Copyright (c) 2019-2022 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -87,6 +87,8 @@ module P2p_event : sig type t = | Too_few_connections | Too_many_connections + | Maintenance_started + | Maintenance_ended | New_point of P2p_point.Id.t | New_peer of P2p_peer_id.t | Gc_points diff --git a/src/lib_p2p/p2p_events.ml b/src/lib_p2p/p2p_events.ml index 02c4d4fb5afd..c8013f9b8ea1 100644 --- a/src/lib_p2p/p2p_events.ml +++ b/src/lib_p2p/p2p_events.ml @@ -1,7 +1,7 @@ (*****************************************************************************) (* *) (* Open Source License *) -(* Copyright (c) 2020 Nomadic Labs, *) +(* Copyright (c) 2020-2022 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) diff --git a/src/lib_p2p/p2p_maintenance.ml b/src/lib_p2p/p2p_maintenance.ml index 087cf34b8014..43b91e9f705f 100644 --- a/src/lib_p2p/p2p_maintenance.ml +++ b/src/lib_p2p/p2p_maintenance.ml @@ -2,7 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) -(* Copyright (c) 2019 Nomadic Labs, *) +(* Copyright (c) 2019-2022 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -235,6 +235,7 @@ let random_connections ~rng pool n = between `min_threshold` and `max_threshold`. *) let rec do_maintain ~rng t = let open Lwt_result_syntax in + t.log P2p_connection.P2p_event.Maintenance_started ; let n_connected = P2p_pool.active_connections t.pool in if n_connected < t.bounds.min_threshold then match t.debug_config with @@ -248,11 +249,13 @@ let rec do_maintain ~rng t = (* end of maintenance when enough users have been reached *) Lwt_condition.broadcast t.just_maintained () ; let*! () = Events.(emit maintenance_ended) () in + t.log P2p_connection.P2p_event.Maintenance_ended ; return_unit) and too_few_connections ~rng t n_connected = let open Lwt_result_syntax in (* try and contact new peers *) + t.log Too_few_connections ; let*! () = Events.(emit too_few_connections) n_connected in let min_to_contact = t.bounds.min_target - n_connected in let max_to_contact = t.bounds.max_target - n_connected in @@ -263,6 +266,7 @@ and too_few_connections ~rng t n_connected = and too_many_connections ~rng t n_connected = let open Lwt_syntax in (* kill random connections *) + t.log Too_many_connections ; let n = n_connected - t.bounds.max_target in let* () = Events.(emit too_many_connections) n in let connections = random_connections ~rng t.pool n in -- GitLab From 3f9033e64607665fff366efa60970aa9a9298be1 Mon Sep 17 00:00:00 2001 From: Pietro Date: Fri, 17 Jun 2022 16:32:02 +0200 Subject: [PATCH 3/5] p2p: add two events to the connect_handler trigger_maintenance_too_many_connections and trigger_maintenance_too_few_connections to log the fact that the maintenance was triggered by the connect_handler --- src/lib_p2p/p2p_connect_handler.ml | 26 +++++++++++++++++--------- src/lib_p2p/p2p_events.ml | 24 ++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/src/lib_p2p/p2p_connect_handler.ml b/src/lib_p2p/p2p_connect_handler.ml index 3d788b442a76..320674d1962d 100644 --- a/src/lib_p2p/p2p_connect_handler.ml +++ b/src/lib_p2p/p2p_connect_handler.ml @@ -145,6 +145,7 @@ let config t = t.config let create_connection t p2p_conn id_point point_info peer_info negotiated_version = + let open Lwt_syntax in let peer_id = P2p_peer_state.Info.peer_id peer_info in let canceler = Lwt_canceler.create () in let bound = @@ -186,7 +187,6 @@ let create_connection t p2p_conn id_point point_info peer_info P2p_pool.Peers.add_connected t.pool peer_id peer_info ; P2p_trigger.broadcast_new_connection t.triggers ; Lwt_canceler.on_cancel canceler (fun () -> - let open Lwt_syntax in let* () = Events.(emit disconnected) (peer_id, id_point) in let timestamp = Time.System.now () in Option.iter @@ -200,16 +200,24 @@ let create_connection t p2p_conn id_point point_info peer_info (fun point_info -> P2p_pool.Points.remove_connected t.pool point_info) point_info ; P2p_pool.Peers.remove_connected t.pool peer_id ; - if P2p_pool.active_connections t.pool < t.config.min_connections then ( - P2p_trigger.broadcast_too_few_connections t.triggers ; - t.log Too_few_connections) ; + let* () = + if P2p_pool.active_connections t.pool < t.config.min_connections then ( + P2p_trigger.broadcast_too_few_connections t.triggers ; + Events.(emit trigger_maintenance_too_few_connections) + (P2p_pool.active_connections t.pool, t.config.min_connections)) + else Lwt.return_unit + in Lwt_pipe.Maybe_bounded.close messages ; P2p_conn.close conn) ; List.iter (fun f -> f peer_id conn) t.new_connection_hook ; - if t.config.max_connections <= P2p_pool.active_connections t.pool then ( - P2p_trigger.broadcast_too_many_connections t.triggers ; - t.log Too_many_connections) ; - conn + let* () = + if t.config.max_connections <= P2p_pool.active_connections t.pool then ( + P2p_trigger.broadcast_too_many_connections t.triggers ; + Events.(emit trigger_maintenance_too_many_connections) + (P2p_pool.active_connections t.pool, t.config.max_connections)) + else Lwt.return_unit + in + return conn let is_acceptable t connection_point_info peer_info incoming version = let open Result_syntax in @@ -518,7 +526,7 @@ let raw_authenticate t ?point_info canceler scheduled_conn point = | (addr, _), Some (_, port) -> (addr, Some port) | id_point, None -> id_point in - let conn = + let*! conn = create_connection t conn diff --git a/src/lib_p2p/p2p_events.ml b/src/lib_p2p/p2p_events.ml index c8013f9b8ea1..304c6e491b9e 100644 --- a/src/lib_p2p/p2p_events.ml +++ b/src/lib_p2p/p2p_events.ml @@ -313,6 +313,30 @@ module P2p_connect_handler = struct ("addr", P2p_addr.encoding) ("port", Data_encoding.option Data_encoding.int16) ("peer", P2p_peer.Id.encoding) + + let trigger_maintenance_too_many_connections = + declare_2 + ~section + ~name:"trigger_maintenance_too_many_connections" + ~msg: + "Too many connections : trigger maintenance \ + (active_connections={active_connections} / \ + max_connections={max_connections})" + ~level:Debug + ("active_connections", Data_encoding.int16) + ("max_connections", Data_encoding.int16) + + let trigger_maintenance_too_few_connections = + declare_2 + ~section + ~name:"trigger_maintenance_too_few_connections" + ~msg: + "Too few connections : trigger maintenance \ + (active_connections={active_connections} / \ + min_connections={min_connections})" + ~level:Debug + ("active_connections", Data_encoding.int16) + ("min_connections", Data_encoding.int16) end module P2p_conn = struct -- GitLab From f8055d6287eec64c4ffbbe475804c7fd1bf40af0 Mon Sep 17 00:00:00 2001 From: Pietro Date: Fri, 17 Jun 2022 16:33:18 +0200 Subject: [PATCH 4/5] p2p/tests: add maintenance tests --- manifest/main.ml | 5 + src/lib_p2p/test/dune | 10 +- src/lib_p2p/test/test_p2p_maintenance.ml | 600 +++++++++++++++++++++++ 3 files changed, 613 insertions(+), 2 deletions(-) create mode 100644 src/lib_p2p/test/test_p2p_maintenance.ml diff --git a/manifest/main.ml b/manifest/main.ml index 9fe83fb6e9c3..ab5d2254c585 100644 --- a/manifest/main.ml +++ b/manifest/main.ml @@ -1379,6 +1379,7 @@ let _octez_p2p_tests = (* See https://gitlab.com/tezos/tezos/-/issues/1184 *) (* "test_p2p_logging"; *) "test_p2p_connect_handler"; + "test_p2p_maintenance"; ] ~path:"src/lib_p2p/test" ~opam:"tezos-p2p" @@ -1489,6 +1490,9 @@ let _octez_p2p_tests = alias_rule "runtest_p2p_connect_handler" ~action:(run_exe "test_p2p_connect_handler" []); + alias_rule + "runtest_p2p_maintenance" + ~action:(run_exe "test_p2p_maintenance" []); alias_rule "runtest" ~package:"tezos-p2p" @@ -1502,6 +1506,7 @@ let _octez_p2p_tests = "runtest_p2p_banned_peers"; "runtest_p2p_node"; "runtest_p2p_connect_handler"; + "runtest_p2p_maintenance"; ]; ]) diff --git a/src/lib_p2p/test/dune b/src/lib_p2p/test/dune index 6f85f6120da5..f8522175c1fa 100644 --- a/src/lib_p2p/test/dune +++ b/src/lib_p2p/test/dune @@ -10,7 +10,8 @@ test_p2p_buffer_reader test_p2p_banned_peers test_p2p_node - test_p2p_connect_handler) + test_p2p_connect_handler + test_p2p_maintenance) (libraries tezos-base tezos-base.unix @@ -105,6 +106,10 @@ (action (setenv BISECT_SIGTERM yes (run %{exe:test_p2p_connect_handler.exe})))) +(rule + (alias runtest_p2p_maintenance) + (action (setenv BISECT_SIGTERM yes (run %{exe:test_p2p_maintenance.exe})))) + (rule (alias runtest) (package tezos-p2p) @@ -116,5 +121,6 @@ (alias runtest_p2p_buffer_reader) (alias runtest_p2p_banned_peers) (alias runtest_p2p_node) - (alias runtest_p2p_connect_handler)) + (alias runtest_p2p_connect_handler) + (alias runtest_p2p_maintenance)) (action (progn))) diff --git a/src/lib_p2p/test/test_p2p_maintenance.ml b/src/lib_p2p/test/test_p2p_maintenance.ml new file mode 100644 index 000000000000..2602cb2f92a1 --- /dev/null +++ b/src/lib_p2p/test/test_p2p_maintenance.ml @@ -0,0 +1,600 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2020-2022 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(* Testing + ------- + Component: lib_p2p + Invocation: dune exec src/lib_p2p/test/test_p2p_maintenance.exe + Subject: Check maintenance mechanism. +*) + +module Event = struct + include Internal_event.Simple + + let section = ["test"; "p2p"; "maintenance"] + + let x_active_connections = + declare_1 + ~section + ~name:"x_active_connections" + ~msg:"{connections_count} connections are actives." + ~level:Info + ("connections_count", Data_encoding.int31) + + let maintenance_start = + declare_0 + ~section + ~name:"maintenance_start" + ~msg:"maintenance has started." + ~level:Debug + () + + let maintenance_stop = + declare_0 + ~section + ~name:"maintenance_stop" + ~msg:"maintenance finished." + ~level:Debug + () + + let maintenance_debug = + declare_1 + ~section + ~name:"maintenance_debug" + ~msg:"maintenance {debug}." + ~level:Debug + ("debug", Data_encoding.string) + + let maintenance_event = + declare_1 + ~section + ~name:"maintenance_event" + ~msg:"maintenance event {event}." + ~level:Debug + ~pp1:P2p_connection.P2p_event.pp + ("event", P2p_connection.P2p_event.encoding) +end + +let filteri f l = + List.mapi (fun i x -> (i, x)) l + |> List.filter (fun (i, x) -> f i x) + |> List.split |> snd + +let partitioni f l = + let l1, l2 = + List.partition (fun (i, x) -> f i x) (List.mapi (fun i x -> (i, x)) l) + in + let remove_i l = snd (List.split l) in + (remove_i l1, remove_i l2) + +let rec connect ~timeout connect_handler pool point = + let open Lwt_syntax in + let* () = + Event.(emit maintenance_debug) + (Format.asprintf "Connect to %a" P2p_point.Id.pp point) + in + let* r = P2p_connect_handler.connect connect_handler point ~timeout in + match r with + | Error (Tezos_p2p_services.P2p_errors.Connected :: _) -> ( + match P2p_pool.Connection.find_by_point pool point with + | Some conn -> return_ok conn + | None -> + failwith + "Something went wrong while connecting to %a" + P2p_point.Id.pp + point) + | Error + ((( Tezos_p2p_services.P2p_errors.Connection_refused + | Tezos_p2p_services.P2p_errors.Pending_connection + | Tezos_p2p_services.P2p_errors.Rejected_socket_connection + | Tezos_p2p_services.P2p_errors.Rejected_by_nack _ | Canceled | Timeout + | Tezos_p2p_services.P2p_errors.Rejected _ ) as head_err) + :: _) -> + let* () = + Event.(emit maintenance_debug) + (Format.asprintf + "Connection to %a failed (%a) Retry@." + P2p_point.Id.pp + point + (fun ppf err -> + match err with + | Tezos_p2p_services.P2p_errors.Connection_refused -> + Format.fprintf ppf "connection refused" + | Tezos_p2p_services.P2p_errors.Pending_connection -> + Format.fprintf ppf "pending connection" + | Tezos_p2p_services.P2p_errors.Rejected_socket_connection -> + Format.fprintf ppf "rejected" + | Tezos_p2p_services.P2p_errors.Rejected_by_nack + {alternative_points = Some alternative_points; _} -> + Format.fprintf + ppf + "rejected (nack_v1, peer list: @[%a@])" + P2p_point.Id.pp_list + alternative_points + | Tezos_p2p_services.P2p_errors.Rejected_by_nack + {alternative_points = None; _} -> + Format.fprintf ppf "rejected (nack_v0)" + | Canceled -> Format.fprintf ppf "canceled" + | Timeout -> Format.fprintf ppf "timeout" + | Tezos_p2p_services.P2p_errors.Rejected {peer; motive} -> + Format.fprintf + ppf + "rejected (%a) motive:%a" + P2p_peer.Id.pp + peer + P2p_rejection.pp + motive + | _ -> assert false) + head_err) + in + let* () = Lwt_unix.sleep (0.5 +. Random.float 2.) in + connect ~timeout connect_handler pool point + | (Ok _ | Error _) as res -> Lwt.return res + +module Triggers = struct + module Too_many_connections = struct + (* Test. + Test that the maintenance is triggered when there are too many active + connections. + 1 - [target] maintenance is configured to not be trigger by the timer. + 2 - [target] maintenance establishes connections to [client]s and finish + its initial maintain. + 3 - [target] establishes more connections to reach [max_connections - 1] + active connections. + 4 - [client_overconnect] try to establish a connection to [target] until + it success. Since, the number of connection is high, the connection + can be rejected. + 5 - Checks that the connect handler triggers the maintenance and the + maintenance starts. *) + let target expected_connections max_connections overconnect_points + (node : Node.t) = + let open Lwt_result_syntax in + (* As there is no way to deactivate the maintenance timer while keeping + the maintenance active, we rely on the CI timeout being lesser than the + maintenance_idle_time that we set. + *) + let maintenance_idle_time = Time.System.Span.of_seconds_exn 36000. in + let time_between_looking_for_peers = + Time.System.Span.of_seconds_exn 10. + in + let maintenance_watcher = Lwt_watcher.create_input () in + let maintenance_log, maintenance_stopper = + Lwt_watcher.create_stream maintenance_watcher + in + let maintenance = + let maintenance_config = + { + P2p_maintenance.maintenance_idle_time; + private_mode = false; + min_connections = 0; + max_connections; + expected_connections; + time_between_looking_for_peers; + } + in + P2p_maintenance.create + maintenance_config + node.pool + node.connect_handler + node.trigger + ~log:(Lwt_watcher.notify maintenance_watcher) + in + (* Activate the maintenance and wait until it becomes idle. *) + P2p_maintenance.activate maintenance ; + let*! () = Event.(emit maintenance_start) () in + let* () = + P2p_test_utils.wait_pred_s + ~pred:(fun log -> + let*! e = Lwt_stream.get log in + Lwt.return + @@ Option.fold + ~none:false + ~some:(fun e -> e = P2p_connection.P2p_event.Maintenance_ended) + e) + ~arg:maintenance_log + () + in + let active_connections = P2p_pool.active_connections node.pool in + let*! () = Event.(emit maintenance_stop) () in + let*! () = Event.(emit x_active_connections) active_connections in + (* Establish new client connection to get to max_connections - 1. *) + let nb_connections_to_add = max_connections - active_connections - 1 in + (*assert (nb_connections_to_add <= List.length overconnect_points) ;*) + let*! () = + Event.(emit maintenance_debug) + (Format.asprintf + "nb_connections_to_add = %d - %d -1 = %d" + max_connections + active_connections + nb_connections_to_add) + in + let* _ = + List.map_ep + (fun point -> + connect + ~timeout:(Time.System.Span.of_seconds_exn 10.) + node.connect_handler + node.pool + point) + (filteri (fun i _ -> i <= nb_connections_to_add) overconnect_points) + in + let active_connections = P2p_pool.active_connections node.pool in + (*assert (active_connections = max_connections - 1) ;*) + let*! () = Event.(emit x_active_connections) active_connections in + (* We have max_connections - 1. Now we try to connect one mode client, + and wait for a Too_many_connections event *) + let* () = Node.sync node in + (* Check that the maintenance has been triggered by the scheduler *) + let* () = + P2p_test_utils.wait_pred_s + ~pred:(fun log -> + let*! e = Lwt_stream.get log in + let*! () = + Event.(emit maintenance_debug) "waiting Too_many_connections" + in + let*! () = + match e with + | None -> Lwt.return_unit + | Some e -> Event.(emit maintenance_event) e + in + Lwt.return + @@ Option.fold + ~none:false + ~some:(fun e -> + e = P2p_connection.P2p_event.Too_many_connections) + e) + ~arg:maintenance_log + () + in + (* wait for the maintanance to start after the trigger *) + let* () = + P2p_test_utils.wait_pred_s + ~pred:(fun log -> + let*! e = Lwt_stream.get log in + Lwt.return + @@ Option.fold + ~none:false + ~some:(fun e -> + e = P2p_connection.P2p_event.Maintenance_started) + e) + ~arg:maintenance_log + () + in + let*! () = Event.(emit maintenance_start) () in + (* End of the test *) + let* () = Node.sync node in + (* the number of connection is not back to max_connections - 1 *) + let active_connections = P2p_pool.active_connections node.pool in + (*assert (active_connections = max_connections - 1) ;*) + let*! () = Event.(emit x_active_connections) active_connections in + Lwt_watcher.shutdown maintenance_stopper ; + let*! () = P2p_maintenance.shutdown maintenance in + return_unit + + let client (node : Node.t) = + let open Lwt_result_syntax in + (* Try to connect one mode client *) + let* () = Node.sync node in + (* End of the test *) + Node.sync node + + let client_overconnect (node : Node.t) = + let open Lwt_result_syntax in + let rec overconnect = function + | [] -> Lwt.fail Alcotest.Test_error + | point :: _ as points -> ( + let*! res = + connect + ~timeout:(Time.System.Span.of_seconds_exn 10.) + node.connect_handler + node.pool + point + in + match res with + | Error + [ + P2p_errors.Rejected_by_nack + {motive = P2p_rejection.Too_many_connections; _}; + ] -> + overconnect points + | Error _ -> Lwt.fail Alcotest.Test_error + | Ok c -> Lwt.return c) + in + (* Try to connect one more client *) + let* () = Node.sync node in + let*! _conn = overconnect node.points in + (* End of the test *) + Node.sync node + + let node expected_connections max_connections overconnect_points = function + | 0 -> target expected_connections max_connections overconnect_points + | 1 -> client_overconnect + | _ -> client + + let prefix = function 0 -> "Target_" | 1 -> "Overco_" | _ -> "Client_" + + let trusted initial_points i points = + if i = 0 then initial_points else points + + (* expected_connections = 24 + * List.length points = 42 + * max_connections = 36 + *) + let run points = + let expected_connections = 24 in + let max_connections i = + if i = 0 then 3 * expected_connections / 2 else List.length points + in + (* min connection set to zero to avoid triggering the + maintenance at node start *) + let min_connections _ = 0 in + let initial_points, overconnect_points = + let client_points = filteri (fun i _ -> i > 1) points in + (* we create a list of points. some of them are going to be the initial + * points, and other used to trigger too_many_connections. We discriminate + * this using the index of the list *) + partitioni (fun i _ -> i < expected_connections) client_points + in + Node.detach_nodes + ~prefix + ~max_connections + ~min_connections + (node expected_connections (max_connections 0) overconnect_points) + points + ~trusted:(trusted initial_points) + end + + module Too_few_connections = struct + (* Test. + Test that the maintenance is triggered when there are too few active + connections. + 1 - [target] maintenance is configured to not be trigger by the timer. + 2 - [target] maintenance establishes connections to [client]s and finish + its initial maintain. + 3 - [target] closes its connections to reach [min_connections] active + connections. + 5 - Checks that the connect handler triggers the maintenance and the + maintenance starts. *) + let target expected_connections min_connections (node : Node.t) = + let open Lwt_result_syntax in + (* As there is no way to deactivate the maintenance timer while keeping + the maintenance active, we rely on the CI timeout being lesser than the + maintenance_idle_time that we set. + *) + let maintenance_idle_time = Time.System.Span.of_seconds_exn 36000. in + let time_between_looking_for_peers = + Time.System.Span.of_seconds_exn 10. + in + let maintenance_watcher = Lwt_watcher.create_input () in + let maintenance_log, maintenance_stopper = + Lwt_watcher.create_stream maintenance_watcher + in + let maintenance = + let maintenance_config = + { + P2p_maintenance.maintenance_idle_time; + private_mode = false; + min_connections; + max_connections = 400; + expected_connections; + time_between_looking_for_peers; + } + in + P2p_maintenance.create + maintenance_config + node.pool + node.connect_handler + node.trigger + ~log:(Lwt_watcher.notify maintenance_watcher) + in + (* Active the maintenance and wait until it become idle. *) + P2p_maintenance.activate maintenance ; + let* () = + P2p_test_utils.wait_pred_s + ~pred:(fun log -> + let*! e = Lwt_stream.get log in + Lwt.return + @@ Option.fold + ~none:false + ~some:(fun e -> e = P2p_connection.P2p_event.Maintenance_ended) + e) + ~arg:maintenance_log + () + in + let active_connections = P2p_pool.active_connections node.pool in + let*! () = Event.(emit x_active_connections) active_connections in + (* Close connections to each min_connections. *) + let nb_connections_to_close = active_connections - min_connections + 1 in + let*! () = + Lwt_list.iteri_p + (fun i (_, conn) -> + if i < nb_connections_to_close then + P2p_conn.disconnect ~wait:true conn + else Lwt.return_unit) + (P2p_pool.Connection.list node.pool) + in + let active_connections = P2p_pool.active_connections node.pool in + let*! () = Event.(emit x_active_connections) active_connections in + (* Check that the maintenance has been triggered. *) + let* () = + P2p_test_utils.wait_pred_s + ~pred:(fun log -> + let*! e = Lwt_stream.get log in + Lwt.return + @@ Option.fold + ~none:false + ~some:(fun e -> + e = P2p_connection.P2p_event.Too_few_connections) + e) + ~arg:maintenance_log + () + in + let* () = + P2p_test_utils.wait_pred_s + ~pred:(fun log -> + let*! e = Lwt_stream.get log in + Lwt.return + @@ Option.fold + ~none:false + ~some:(fun e -> + e = P2p_connection.P2p_event.Maintenance_started) + e) + ~arg:maintenance_log + () + in + (* End of the test *) + let* () = Node.sync node in + Lwt_watcher.shutdown maintenance_stopper ; + let*! () = P2p_maintenance.shutdown maintenance in + return_unit + + let client (node : Node.t) = + (* End of the test *) + Node.sync node + + let node expected_connections min_connections i = + if i = 0 then target expected_connections min_connections else client + + let prefix i = if i = 0 then "Target_" else "Client_" + + (*let trusted initial_points i points =*) + (*if i = 0 then initial_points else points*) + + let run points = + let expected_connections = 36 in + let min_connections i = if i = 0 then expected_connections / 2 else 0 in + Node.detach_nodes + ~prefix + ~min_connections + (node expected_connections (min_connections 0)) + points + end + + module Timed = struct + (* TODO: checks that the maintenance maintains every + [P2p_maintenance.config.maintenance_idle_time] seconds. *) + let _run _nodes = Lwt_result_syntax.return_unit + end +end + +let points = ref [] + +let gen_points () = + let clients = 42 in + (*let addr = Ipaddr.V6.localhost in*) + let addr = Ipaddr.v6_of_v4 @@ Ipaddr.V4.make 127 0 0 3 in + let port = 29152 + Random.int 16384 in + let ports = port -- (port + clients - 1) in + points := + List.map + (fun port -> + (* to each node, we add one topic to its state. *) + (addr, port)) + ports + +let wrap n f = + Alcotest_lwt.test_case n `Quick (fun _ () -> + let open Lwt_syntax in + let rec aux n f = + let* r = f () in + match r with + | Ok () -> Lwt.return_unit + | Error (Exn (Unix.Unix_error ((EADDRINUSE | EADDRNOTAVAIL), _, _)) :: _) + -> + gen_points () ; + (*warn "Conflict on ports, retry the test." ;*) + aux n f + | Error error -> + Format.kasprintf Stdlib.failwith "%a" pp_print_trace error + in + aux n f) + +let log_config = ref None + +let spec = + Arg. + [ + ( "-v", + Unit + (fun () -> + log_config := + Some + (Lwt_log_sink_unix.create_cfg + ~rules: + "test.p2p.maintenance -> debug; p2p.maintenance -> debug" + ())), + " Log up to info msgs" ); + ( "-vv", + Unit + (fun () -> + log_config := + Some + (Lwt_log_sink_unix.create_cfg + ~rules: + "test.p2p.maintenance -> debug; p2p.maintenance -> debug; \ + p2p.connect_handler -> debug; test.p2p.node -> debug" + ())), + " Log up to debug msgs" ); + ( "-vvv", + Unit + (fun () -> + log_config := + Some + (Lwt_log_sink_unix.create_cfg + ~rules: + "test.p2p.maintenance -> debug; p2p.maintenance -> debug; \ + p2p.socket -> debug; p2p.io_scheduler -> debug;p2p.fd -> \ + debug;p2p.connect_handler -> debug; test.p2p.node -> \ + debug" + ())), + " Log up to debug msgs, socket included" ); + ] + +let main () = + let anon_fun _num_peers = raise (Arg.Bad "No anonymous argument.") in + let usage_msg = "Usage: %s .\nArguments are:" in + Arg.parse spec anon_fun usage_msg ; + let () = + Lwt_main.run + (Tezos_base_unix.Internal_event_unix.init ?lwt_log_sink:!log_config ()) + in + gen_points () ; + Lwt_main.run + @@ Alcotest_lwt.run + ~argv:[|""|] + "tezos-p2p" + [ + ( "p2p-maintenance", + [ + wrap "triggers.too-few-connections" (fun _ -> + Triggers.Too_few_connections.run !points); + wrap "triggers.too-many-connections" (fun _ -> + Triggers.Too_many_connections.run !points); + ] ); + ] + +let () = + Sys.catch_break true ; + try main () with _ -> () -- GitLab From 2baed3311096e07c287f0688747507f7d332641e Mon Sep 17 00:00:00 2001 From: Pietro Date: Tue, 2 Aug 2022 10:13:18 +0200 Subject: [PATCH 5/5] p2p: add changelog entry for maintenance fix --- CHANGES.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 49434da8a2f4..83dfdad6dec6 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -29,6 +29,9 @@ Node for a feature which is being developed and should not be modified. It should be used only for testing. +- Fixed a bug in the p2p layer that prevented a fast regulation of the + number of connections (when having too few or too many connections) + Client ------ -- GitLab