From 1ac42cf68aabc13c6ba91a93582c2960e63d8ca0 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Thu, 19 Dec 2024 15:58:32 +0100 Subject: [PATCH] injector: clear is done through a request instead of directly in the state --- src/lib_injector/injector_events.ml | 2 +- src/lib_injector/injector_functor.ml | 74 ++++++++++++++-------- src/lib_injector/injector_worker_types.ml | 20 ++++-- src/lib_injector/injector_worker_types.mli | 6 +- 4 files changed, 68 insertions(+), 34 deletions(-) diff --git a/src/lib_injector/injector_events.ml b/src/lib_injector/injector_events.ml index 122f3ac0303d..2fca25aa0612 100644 --- a/src/lib_injector/injector_events.ml +++ b/src/lib_injector/injector_events.ml @@ -31,7 +31,7 @@ module Make (Tags : module type of Injector_tags.Make (Parameters.Tag)) (Operation : PARAM_OPERATION) (Inj_operation : INJECTOR_OPERATION with type operation = Operation.t) - (Request : module type of Request (Inj_operation)) = + (Request : module type of Request (Parameters.Tag) (Inj_operation)) = struct include Internal_event.Simple diff --git a/src/lib_injector/injector_functor.ml b/src/lib_injector/injector_functor.ml index b4326d8519f6..945efb668080 100644 --- a/src/lib_injector/injector_functor.ml +++ b/src/lib_injector/injector_functor.ml @@ -66,7 +66,7 @@ module Make (Parameters : PARAMETERS) = struct module Tags_table = Hashtbl.Make (Parameters.Tag) module POperation = Parameters.Operation module Inj_operation = Injector_operation.Make (POperation) - module Request = Request (Inj_operation) + module Request = Request (Parameters.Tag) (Inj_operation) module Inj_proto = Injector_protocol.Make (Parameters) module type PROTOCOL_CLIENT = @@ -1210,6 +1210,24 @@ module Make (Parameters : PARAMETERS) = struct in return () + let clear state = + Injected_operations.clear state.injected.injected_operations ; + Injected_ophs.clear state.injected.injected_ophs ; + Included_operations.clear state.included.included_operations ; + Included_in_blocks.clear state.included.included_in_blocks ; + Op_queue.clear state.queue + + let remove_operations_with_tag tag state = + let open Lwt_result_syntax in + Op_queue.fold + (fun id op acc -> + if Parameters.Tag.equal (Parameters.operation_tag op.operation) tag then + let* () = Op_queue.remove state.queue id in + acc + else acc) + state.queue + return_unit + module Types = struct type nonrec state = state @@ -1260,11 +1278,19 @@ module Make (Parameters : PARAMETERS) = struct (r, request_error) Request.t -> (r, request_error) result Lwt.t = fun w request -> + let open Lwt_result_syntax in let state = Worker.state w in match request with (* The execution of the request handler is protected to avoid stopping the worker in case of an exception. *) | Request.Inject -> protect @@ fun () -> on_inject state + | Request.Clear tag -> ( + protect @@ fun () -> + match tag with + | Some tag -> remove_operations_with_tag tag state + | None -> + let*! () = clear state in + return_unit) type launch_error = error trace @@ -1306,7 +1332,9 @@ module Make (Parameters : PARAMETERS) = struct let*! () = Event.(emit3 request_failed) state request_view st errs in return_unit in - match r with Request.Inject -> emit_and_return_errors errs + match r with + | Request.Inject -> emit_and_return_errors errs + | Request.Clear _ -> emit_and_return_errors errs let on_completion w r _ st = let state = Worker.state w in @@ -1623,44 +1651,36 @@ module Make (Parameters : PARAMETERS) = struct [] workers + let handle_request_error worker_res = + let open Lwt_syntax in + let* worker_res in + match worker_res with + | Ok res -> return_ok res + | Error (Worker.Request_error errs) -> Lwt.return_error errs + | Error (Closed None) -> Lwt.return_error [Worker_types.Terminated] + | Error (Closed (Some errs)) -> Lwt.return_error errs + | Error (Any exn) -> Lwt.return_error [Exn exn] + + let put_request_and_wait w req = + Worker.Dropbox.put_request_and_wait w req |> handle_request_error + let clear_all_queues () = let workers = Worker.list table in - List.iter_p - (fun (_tags, w) -> - let state = Worker.state w in - Injected_operations.clear state.injected.injected_operations ; - Injected_ophs.clear state.injected.injected_ophs ; - Included_operations.clear state.included.included_operations ; - Included_in_blocks.clear state.included.included_in_blocks ; - Op_queue.clear state.queue) + List.iter_ep + (fun (_tags, w) -> put_request_and_wait w (Request.Clear None)) workers - let remove_operations_with_tag tag state = - let open Lwt_result_syntax in - Op_queue.fold - (fun id op acc -> - if Parameters.Tag.equal (Parameters.operation_tag op.operation) tag then - let* () = Op_queue.remove state.queue id in - acc - else acc) - state.queue - return_unit - let clear_queues ?tag () = let open Lwt_result_syntax in match tag with - | None -> - let*! () = clear_all_queues () in - return_unit + | None -> clear_all_queues () | Some tag -> let workers = Worker.list table in List.iter_ep (fun (tags, w) -> let to_clean = Tags.mem tag tags in if not to_clean then return_unit - else - let state = Worker.state w in - remove_operations_with_tag tag state) + else put_request_and_wait w (Request.Clear (Some tag))) workers let register_proto_client = Inj_proto.register diff --git a/src/lib_injector/injector_worker_types.ml b/src/lib_injector/injector_worker_types.ml index 4d2c2ebc835f..c181d4f475c0 100644 --- a/src/lib_injector/injector_worker_types.ml +++ b/src/lib_injector/injector_worker_types.ml @@ -26,8 +26,10 @@ open Injector_sigs -module Request (L1_operation : INJECTOR_OPERATION) = struct - type ('a, 'b) t = Inject : (unit, error trace) t +module Request (Tag : TAG) (L1_operation : INJECTOR_OPERATION) = struct + type ('a, 'b) t = + | Inject : (unit, error trace) t + | Clear : Tag.t option -> (unit, error trace) t type view = View : _ t -> view @@ -41,9 +43,19 @@ module Request (L1_operation : INJECTOR_OPERATION) = struct (Tag 2) ~title:"Inject" (obj1 (req "request" (constant "inject"))) - (function View Inject -> Some ()) + (function View Inject -> Some () | _ -> None) (fun () -> View Inject); + case + (Tag 3) + ~title:"Clear" + (obj2 (req "request" (constant "clear")) (opt "tag" Tag.encoding)) + (function View (Clear tag) -> Some ((), tag) | _ -> None) + (fun ((), tag) -> View (Clear tag)); ] - let pp ppf (View r) = match r with Inject -> Format.fprintf ppf "injection" + let pp ppf (View r) = + match r with + | Inject -> Format.fprintf ppf "injection" + | Clear tag -> + Format.fprintf ppf "clear %a" (Format.pp_print_option Tag.pp) tag end diff --git a/src/lib_injector/injector_worker_types.mli b/src/lib_injector/injector_worker_types.mli index 86f2b70165ee..e6c1066f35e5 100644 --- a/src/lib_injector/injector_worker_types.mli +++ b/src/lib_injector/injector_worker_types.mli @@ -26,8 +26,10 @@ open Injector_sigs -module Request (Inj_operation : INJECTOR_OPERATION) : sig - type ('a, 'b) t = Inject : (unit, error trace) t +module Request (Tag : TAG) (L1_operation : INJECTOR_OPERATION) : sig + type ('a, 'b) t = + | Inject : (unit, error trace) t + | Clear : Tag.t option -> (unit, error trace) t type view = View : _ t -> view -- GitLab