From c1f33c8f31bae98e78765bfe7f651974e714f085 Mon Sep 17 00:00:00 2001 From: vbot Date: Wed, 19 Apr 2023 17:31:45 +0200 Subject: [PATCH 1/5] Ddb: do not mutate requests table while iterating Co-authored-by: klakplok Co-authored-by: Pierre Chambart --- src/lib_requester/requester.ml | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/lib_requester/requester.ml b/src/lib_requester/requester.ml index 9e26f05b37d9..5a6d2119f419 100644 --- a/src/lib_requester/requester.ml +++ b/src/lib_requester/requester.ml @@ -314,6 +314,10 @@ end = struct (* TODO: Punish peer *) Events.(emit notify_duplicate) (key, peer) + type update_table_action = + | Replace of {key : key; status : status} + | Remove of {key : key} + let worker_loop state = let open Lwt_syntax in let shutdown = Lwt_canceler.when_canceling state.canceler in @@ -339,18 +343,16 @@ end = struct let* () = Events.(emit timeout) () in let now = Time.System.now () in let active_peers = Request.active state.param in - let requests = + let actions, requests = Table.fold - (fun key {peers; next_request; delay} acc -> - if Ptime.is_later next_request ~than:now then acc + (fun key {peers; next_request; delay} (actions, acc) -> + if Ptime.is_later next_request ~than:now then (actions, acc) else let remaining_peers = P2p_peer.Set.inter peers active_peers in if P2p_peer.Set.is_empty remaining_peers && not (P2p_peer.Set.is_empty peers) - then ( - Table.remove state.pending key ; - acc) + then (Remove {key} :: actions, acc) else let requested_peer = P2p_peer.Id.Set.random_elt @@ -368,17 +370,23 @@ end = struct delay = Time.System.Span.multiply_exn 1.5 delay; } in - Table.replace state.pending key next ; let requests = key :: Option.value ~default:[] (P2p_peer.Map.find requested_peer acc) in - P2p_peer.Map.add requested_peer requests acc) + ( Replace {key; status = next} :: actions, + P2p_peer.Map.add requested_peer requests acc )) state.pending - P2p_peer.Map.empty + ([], P2p_peer.Map.empty) in + (* Update pending table *) + List.iter + (function + | Remove {key} -> Table.remove state.pending key + | Replace {key; status} -> Table.replace state.pending key status) + actions ; P2p_peer.Map.iter (Request.send state.param) requests ; let* () = P2p_peer.Map.iter_s -- GitLab From c87f67f2860c42532e60a88498dd1ffd6a6bbe27 Mon Sep 17 00:00:00 2001 From: vbot Date: Wed, 19 Apr 2023 17:41:56 +0200 Subject: [PATCH 2/5] Ddb: compute the minimal timeout while folding on requests Co-authored-by: klakplok Co-authored-by: Pierre Chambart --- src/lib_requester/requester.ml | 60 +++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/src/lib_requester/requester.ml b/src/lib_requester/requester.ml index 5a6d2119f419..d9cc56878d76 100644 --- a/src/lib_requester/requester.ml +++ b/src/lib_requester/requester.ml @@ -198,6 +198,8 @@ end = struct type t = { param : Request.param; pending : status Table.t; + mutable min_next_request : Time.System.t option; + (* The time of the next pending request to timeout. *) queue : event Lwt_pipe.Unbounded.t; mutable events : event list Lwt.t; canceler : Lwt_canceler.t; @@ -253,16 +255,7 @@ end = struct Lwt.return () let compute_timeout state = - let next = - Table.fold - (fun _ {next_request; _} acc -> - match acc with - | None -> Some next_request - | Some x -> Some (Time.System.min x next_request)) - state.pending - None - in - match next with + match state.min_next_request with | None -> fst @@ Lwt.task () | Some next -> let now = Time.System.now () in @@ -293,7 +286,7 @@ end = struct | None -> P2p_peer.Set.empty | Some peer -> P2p_peer.Set.singleton peer in - Table.add + Table.replace state.pending key {peers; next_request = now; delay = Request.initial_delay} ; @@ -338,21 +331,40 @@ end = struct let* events = state.events in state.events <- Lwt_pipe.Unbounded.pop_all state.queue ; let* () = List.iter_s (process_event state now) events in + (* Requests are either added or deleted: either way, we need + to go through the table to update the timeout. Setting it + to now do just that. As a consequence, of that, the next + call to `compute_timeout` will always return + instantaneously. *) + state.min_next_request <- Some now ; loop state) else let* () = Events.(emit timeout) () in let now = Time.System.now () in let active_peers = Request.active state.param in - let actions, requests = + let compute_new_min_next_request min_next_request next_request = + match min_next_request with + | None -> Some next_request + | Some min_next_request' -> + if Ptime.is_earlier min_next_request' ~than:next_request then + min_next_request + else Some next_request + in + let actions, min_next_request, requests = Table.fold - (fun key {peers; next_request; delay} (actions, acc) -> - if Ptime.is_later next_request ~than:now then (actions, acc) + (fun key + {peers; next_request; delay} + (actions, min_next_request, acc) -> + if Ptime.is_later next_request ~than:now then + ( actions, + compute_new_min_next_request min_next_request next_request, + acc ) else let remaining_peers = P2p_peer.Set.inter peers active_peers in if P2p_peer.Set.is_empty remaining_peers && not (P2p_peer.Set.is_empty peers) - then (Remove {key} :: actions, acc) + then (Remove {key} :: actions, min_next_request, acc) else let requested_peer = P2p_peer.Id.Set.random_elt @@ -370,16 +382,18 @@ end = struct delay = Time.System.Span.multiply_exn 1.5 delay; } in - let requests = - key - :: Option.value - ~default:[] - (P2p_peer.Map.find requested_peer acc) + let new_acc = + P2p_peer.Map.update + requested_peer + (function + | None -> Some [key] | Some l -> Some (key :: l)) + acc in ( Replace {key; status = next} :: actions, - P2p_peer.Map.add requested_peer requests acc )) + compute_new_min_next_request min_next_request next_request, + new_acc )) state.pending - ([], P2p_peer.Map.empty) + ([], None, P2p_peer.Map.empty) in (* Update pending table *) List.iter @@ -387,6 +401,7 @@ end = struct | Remove {key} -> Table.remove state.pending key | Replace {key; status} -> Table.replace state.pending key status) actions ; + state.min_next_request <- min_next_request ; P2p_peer.Map.iter (Request.send state.param) requests ; let* () = P2p_peer.Map.iter_s @@ -405,6 +420,7 @@ end = struct { param; queue = Lwt_pipe.Unbounded.create (); + min_next_request = None; pending = Table.create ~entry_type:"pending_requests" ~random:true 17; events = Lwt.return_nil; canceler = Lwt_canceler.create (); -- GitLab From c59f97d34b39ae5f699c68714b4006dc6a315e84 Mon Sep 17 00:00:00 2001 From: vbot Date: Wed, 19 Apr 2023 18:02:52 +0200 Subject: [PATCH 3/5] Ddb: implement request batching by adding explicit throttling Co-authored-by: klakplok Co-authored-by: Pierre Chambart --- src/lib_requester/requester.ml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/lib_requester/requester.ml b/src/lib_requester/requester.ml index d9cc56878d76..f8336d89eb74 100644 --- a/src/lib_requester/requester.ml +++ b/src/lib_requester/requester.ml @@ -315,6 +315,19 @@ end = struct let open Lwt_syntax in let shutdown = Lwt_canceler.when_canceling state.canceler in let rec loop state = + (* It is possible that numerous pending requests may be canceled + sequentially. If this occurs, we will recalculate the + subsequent timeout for each cancellation. Calculating the + next timeout could be resource-intensive. By allowing for a + brief sleep, multiple cancellations can take place + simultaneously. + + Note: using `Lwt.pause` or `Lwt.yield` might not be + sufficient, e.g., when the scheduler does not timeout cancelers are not given a + chance to be executed. + + Note: This constant was selected using the sophisticated + "damp digit" method. *) let timeout = compute_timeout state in let* () = Lwt.choose -- GitLab From f401e499df7c75be641f88d948d3638730591d74 Mon Sep 17 00:00:00 2001 From: vbot Date: Mon, 24 Apr 2023 14:54:22 +0200 Subject: [PATCH 4/5] Ddb/Tests: adapt tests to the new throttling mechanism --- src/lib_requester/test/test_requester.ml | 34 +++++++++++++----------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/lib_requester/test/test_requester.ml b/src/lib_requester/test/test_requester.ml index 97cee6580483..5e71b608a9e4 100644 --- a/src/lib_requester/test/test_requester.ml +++ b/src/lib_requester/test/test_requester.ml @@ -574,9 +574,11 @@ let test_full_requester_test_pending_requests _ () = [ (let* _ = Test_Requester.fetch req key precheck_pass in Lwt.return_unit); - (* Ensure that the request is registered before [k] is scheduled. *) - (let* v = Lwt.pause () in - k v); + (* Ensure that the request is registered before [k] is + scheduled and that enough time is given to the + throttler. *) + (let* () = Lwt_unix.sleep 0.1 in + k ()); ] in (* Variant of [with_request] for requests that are never satisfied. When [k] @@ -586,23 +588,25 @@ let test_full_requester_test_pending_requests _ () = [ (let+ _ = Test_Requester.fetch req key precheck_pass in Alcotest.fail "Request should not have been satisfied"); - (let* v = Lwt.pause () in - k v); + (let* () = Lwt_unix.sleep 0.1 in + k ()); ] in (* Fetch value *) check_pending_count "0 pending requests" 0 ; - let foo_cancelled : unit Lwt.t = - with_request "foo" @@ fun () -> - check_pending_count "1 pending requests" 1 ; - with_unmet_request "bar" @@ fun () -> - check_pending_count "2 pending requests" 2 ; - with_unmet_request "bar" @@ fun () -> - check_pending_count "still 2 pending requests" 2 ; - Lwt.return (Test_Requester.clear_or_cancel req "foo") + let* () = + with_request "foo" (fun () -> + check_pending_count "1 pending requests" 1 ; + with_unmet_request "bar" (fun () -> + check_pending_count "2 pending requests" 2 ; + with_unmet_request "bar" (fun () -> + check_pending_count "still 2 pending requests" 2 ; + Test_Requester.clear_or_cancel req "foo" ; + (* The first "foo" fetch should be resolved *) + Lwt_unix.sleep 0.1))) in - let+ () = foo_cancelled in - check_pending_count "back to 1 pending requests" 1 + check_pending_count "back to 1 pending requests" 1 ; + Lwt.return_unit (** Test memory_table_length *) -- GitLab From 45493fb976e6b1ca1c1e2d3fe16b0400ebaac96b Mon Sep 17 00:00:00 2001 From: vbot Date: Thu, 20 Apr 2023 17:17:50 +0200 Subject: [PATCH 5/5] Changes: added an entry --- CHANGES.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index cc760c12406e..05b73d129c81 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -66,6 +66,9 @@ Node how long it will take to be synchronized. Also, gracefully indicates peer disconnection instead of spurious "worker crashed" messages. +- Fixed an issue where a node lagging behind would end up freezing and + never be able to catch up. + Client ------ -- GitLab