From 8b7f1641cde57237a5be4d6aeaf153608cec811c Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Thu, 5 Dec 2024 13:28:36 +0100 Subject: [PATCH 1/4] rollup/node: mv rpc query into into appropriate module --- src/lib_smart_rollup/rollup_node_services.ml | 42 ++++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/lib_smart_rollup/rollup_node_services.ml b/src/lib_smart_rollup/rollup_node_services.ml index afa58d4efe7d..e45717e33629 100644 --- a/src/lib_smart_rollup/rollup_node_services.ml +++ b/src/lib_smart_rollup/rollup_node_services.ml @@ -847,6 +847,13 @@ module Arg = struct Operation_kind.of_string s |> Option.to_result ~none:"Invalid operation kind") () + + let z = + Resto.Arg.make + ~name:"z" + ~destruct:(fun s -> Ok (Z.of_string s)) + ~construct:Z.to_string + () end module Query = struct @@ -863,6 +870,19 @@ module Query = struct query (fun tag -> tag) |+ opt_field "tag" Arg.operation_kind (fun k -> k) |> seal + + let order_and_drop_duplicate_query = + let open Tezos_rpc.Query in + query (fun order drop_duplicate -> + object + method order = order + + method drop_duplicate = drop_duplicate + end) + |+ opt_field "order" Arg.z (fun q -> q#order) + |+ field "drop_duplicate" Tezos_rpc.Arg.bool false (fun q -> + q#drop_duplicate) + |> seal end module type PREFIX = sig @@ -988,29 +1008,9 @@ module Local = struct (path / "gc_info") let injection = - let z_rpc_arg = - Resto.Arg.make - ~name:"z" - ~destruct:(fun s -> Ok (Z.of_string s)) - ~construct:Z.to_string - () - in - let query = - let open Tezos_rpc.Query in - query (fun order drop_duplicate -> - object - method order = order - - method drop_duplicate = drop_duplicate - end) - |+ opt_field "order" z_rpc_arg (fun q -> q#order) - |+ field "drop_duplicate" Tezos_rpc.Arg.bool false (fun q -> - q#drop_duplicate) - |> seal - in Tezos_rpc.Service.post_service ~description:"Inject messages in the batcher's queue" - ~query + ~query:Query.order_and_drop_duplicate_query ~input: Data_encoding.( def -- GitLab From fe5912d91305e65036209b95ab7f1041c93d54d6 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Thu, 5 Dec 2024 13:29:21 +0100 Subject: [PATCH 2/4] rollup/node: injector clear rpc can now clear based on order --- CHANGES.rst | 13 ++++ src/lib_injector/injector_functor.ml | 68 ++++++++++------ src/lib_injector/injector_sigs.ml | 14 +++- src/lib_injector/injector_worker_types.ml | 82 ++++++++++++++++++-- src/lib_injector/injector_worker_types.mli | 16 +++- src/lib_smart_rollup/rollup_node_services.ml | 17 +++- src/lib_smart_rollup_node/rpc_directory.ml | 7 +- 7 files changed, 180 insertions(+), 37 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index feeeb9ad61e5..a698f2f26394 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -75,6 +75,19 @@ Smart Rollup node received chunk with the following priority order: First chunks with ascending order then chunks by order of arrival. (MR :gl:`!15672`) +- updated RPC ``DELETE /admin/injector/queues`` with new query to + clear injector queues based on priority order. The RPC can takes two + optional arguments: + + + ``order_below``: an integer that filters out all operations with + order strictly inferior to it. + + + ``drop_no_order``: a boolean that if true remove all operations + that has no order specified. ``false`` by default. + + When ``tag`` is specified only operation of that type will be + considered, else all operations are considered.(MR :gl:`!15929`) + - Updated RPC ``/local/batcher/injection`` with a new query argument possibility. When the rpc contains ``"drop_duplicate": true`` then the batcher will drop the messages that were already injected with a diff --git a/src/lib_injector/injector_functor.ml b/src/lib_injector/injector_functor.ml index 46a1955fa28c..630a39fcc9aa 100644 --- a/src/lib_injector/injector_functor.ml +++ b/src/lib_injector/injector_functor.ml @@ -1214,11 +1214,23 @@ module Make (Parameters : PARAMETERS) = struct Included_in_blocks.clear state.included.included_in_blocks ; Op_heap.clear state.heap - let remove_operations_with_tag tag state = - Op_heap.remove_predicate - (fun op -> - Parameters.Tag.equal (Parameters.operation_tag op.operation) tag) - state.heap + let remove_operations removal_criteria {heap; _} = + let predicate_f (op : Inj_operation.t) = + let is_op_tag tag = + Parameters.Tag.equal (Parameters.operation_tag op.operation) tag + in + let is_order_below ~drop_no_order order_below = + Option.map (fun op_order -> Z.lt op_order order_below) op.order + |> Option.value ~default:drop_no_order + in + match removal_criteria with + | Request.Operation_tag tag -> is_op_tag tag + | Order_below (order_below, drop_no_order) -> + is_order_below ~drop_no_order order_below + | Tag_and_order_below (tag, order_below, drop_no_order) -> + is_op_tag tag && is_order_below ~drop_no_order order_below + in + Op_heap.remove_predicate predicate_f heap module Types = struct type nonrec state = state @@ -1276,10 +1288,10 @@ module Make (Parameters : PARAMETERS) = struct (* The execution of the request handler is protected to avoid stopping the worker in case of an exception. *) | Request.Inject -> protect @@ fun () -> on_inject state - | Request.Clear tag -> ( + | Request.Clear removal_criteria -> ( protect @@ fun () -> - match tag with - | Some tag -> remove_operations_with_tag tag state + match removal_criteria with + | Some removal_criteria -> remove_operations removal_criteria state | None -> let*! () = clear state in return_unit) @@ -1656,24 +1668,30 @@ module Make (Parameters : PARAMETERS) = struct let put_request_and_wait w req = Worker.Dropbox.put_request_and_wait w req |> handle_request_error - let clear_all_queues () = - let workers = Worker.list table in - List.iter_ep - (fun (_tags, w) -> put_request_and_wait w (Request.Clear None)) - workers - - let clear_queues ?tag () = + let clear_queues ?(drop_no_order = false) ?order_below ?tag () = let open Lwt_result_syntax in - match tag with - | None -> clear_all_queues () - | Some tag -> - let workers = Worker.list table in - List.iter_ep - (fun (tags, w) -> - let to_clean = Tags.mem tag tags in - if not to_clean then return_unit - else put_request_and_wait w (Request.Clear (Some tag))) - workers + let removal_criteria : Request.removal_criteria option = + match (tag, order_below) with + | Some tag, None -> Some (Operation_tag tag) + | None, Some order_below -> + Some (Order_below (order_below, drop_no_order)) + | Some tag, Some order_below -> + Some (Tag_and_order_below (tag, order_below, drop_no_order)) + | None, None -> None + in + let clear_worker_queue (tags, w) = + match removal_criteria with + | Some (Operation_tag tag) | Some (Tag_and_order_below (tag, _, _)) -> + (*If a tag is specified in the removal_criteria only clear + the worker that inject such operation tag *) + if Tags.mem tag tags then + put_request_and_wait w (Request.Clear removal_criteria) + else return_unit + | Some (Order_below _) | None -> + put_request_and_wait w (Request.Clear removal_criteria) + in + let workers = Worker.list table in + List.iter_ep clear_worker_queue workers let register_proto_client = Inj_proto.register end diff --git a/src/lib_injector/injector_sigs.ml b/src/lib_injector/injector_sigs.ml index 417c7a3b48db..5add827d565b 100644 --- a/src/lib_injector/injector_sigs.ml +++ b/src/lib_injector/injector_sigs.ml @@ -369,9 +369,17 @@ module type S = sig tag. *) val get_queues : ?tag:tag -> unit -> (tag list * Inj_operation.t list) list - (** Clears the injectors queues completely. If [tag] is provided, only queues - for the injector which handles this tag is cleared. *) - val clear_queues : ?tag:tag -> unit -> unit tzresult Lwt.t + (** Clears the injectors queues completely. If [tag] is provided, + only queues for the injector which handles this tag is + cleared. If [order_below], only operation with order below that + value are cleared, and if [drop_no_order] is true, clear all + operation that has no order specified. *) + val clear_queues : + ?drop_no_order:bool -> + ?order_below:Z.t -> + ?tag:tag -> + unit -> + unit tzresult Lwt.t (** Register a protocol client for a specific protocol to be used by the injector. This function {b must} be called for all protocols that the diff --git a/src/lib_injector/injector_worker_types.ml b/src/lib_injector/injector_worker_types.ml index c181d4f475c0..8fc407e58b3a 100644 --- a/src/lib_injector/injector_worker_types.ml +++ b/src/lib_injector/injector_worker_types.ml @@ -27,9 +27,72 @@ open Injector_sigs module Request (Tag : TAG) (L1_operation : INJECTOR_OPERATION) = struct + type removal_criteria = + | Operation_tag of Tag.t + | Order_below of (Z.t * bool) + | Tag_and_order_below of (Tag.t * Z.t * bool) + + let removal_criteria_encoding = + let open Data_encoding in + union + [ + case + (Tag 0) + ~title:"Operation_tag" + (obj2 (req "request" (constant "only_tag")) (req "tag" Tag.encoding)) + (function Operation_tag tag -> Some ((), tag) | _ -> None) + (fun ((), tag) -> Operation_tag tag); + case + (Tag 1) + ~title:"Order_below" + (obj3 + (req "request" (constant "only_order_below")) + (req "order_below" z) + (dft "drop_no_order" bool false)) + (function + | Order_below (order_below, drop_no_order) -> + Some ((), order_below, drop_no_order) + | _ -> None) + (fun ((), order_below, drop_no_order) -> + Order_below (order_below, drop_no_order)); + case + (Tag 2) + ~title:"Tag_and_order_below" + (obj4 + (req "request" (constant "tag_and_order")) + (req "tag" Tag.encoding) + (req "order_below" z) + (dft "drop_no_order" bool false)) + (function + | Tag_and_order_below (tag, order_below, drop_no_order) -> + Some ((), tag, order_below, drop_no_order) + | _ -> None) + (fun ((), tag, order_below, drop_no_order) -> + Tag_and_order_below (tag, order_below, drop_no_order)); + ] + + let pp_removal_criteria fmt = function + | Operation_tag tag -> Format.fprintf fmt "only tag %a" Tag.pp tag + | Order_below (order_below, drop_no_order) -> + Format.fprintf + fmt + "Only order below %a, and drop no order: %b" + Z.pp_print + order_below + drop_no_order + | Tag_and_order_below (tag, order_below, drop_no_order) -> + Format.fprintf + fmt + "Operation_tag %a, order below %a, and drop no order: %b" + Tag.pp + tag + Z.pp_print + order_below + drop_no_order + type ('a, 'b) t = | Inject : (unit, error trace) t - | Clear : Tag.t option -> (unit, error trace) t + | Clear : removal_criteria option -> (unit, error trace) t type view = View : _ t -> view @@ -48,14 +111,21 @@ module Request (Tag : TAG) (L1_operation : INJECTOR_OPERATION) = struct case (Tag 3) ~title:"Clear" - (obj2 (req "request" (constant "clear")) (opt "tag" Tag.encoding)) - (function View (Clear tag) -> Some ((), tag) | _ -> None) - (fun ((), tag) -> View (Clear tag)); + (obj2 + (req "request" (constant "clear")) + (opt "criteria" removal_criteria_encoding)) + (function + | View (Clear predicate) -> Some ((), predicate) | _ -> None) + (fun ((), predicate) -> View (Clear predicate)); ] let pp ppf (View r) = match r with | Inject -> Format.fprintf ppf "injection" - | Clear tag -> - Format.fprintf ppf "clear %a" (Format.pp_print_option Tag.pp) tag + | Clear predicate -> + Format.fprintf + ppf + "clear %a" + (Format.pp_print_option pp_removal_criteria) + predicate end diff --git a/src/lib_injector/injector_worker_types.mli b/src/lib_injector/injector_worker_types.mli index e6c1066f35e5..e9f927287b51 100644 --- a/src/lib_injector/injector_worker_types.mli +++ b/src/lib_injector/injector_worker_types.mli @@ -27,9 +27,23 @@ open Injector_sigs module Request (Tag : TAG) (L1_operation : INJECTOR_OPERATION) : sig + (** Criteria to specify which operation to remove from the + injector. *) + type removal_criteria = + | Operation_tag of Tag.t + (** [Operation_tag kind] Clear all operation of type [kind] *) + | Order_below of (Z.t * bool) + (** [Order_below (order, drop_no_order] Clear all operation + that have an order specified below [order]. If + [drop_no_order] also clear all operation that has no order + specified. *) + | Tag_and_order_below of (Tag.t * Z.t * bool) + (** [Tag_and_order_below (kind, order, drop_no_order)] Apply + {!Order_below} but only for operation of type [kind]. *) + type ('a, 'b) t = | Inject : (unit, error trace) t - | Clear : Tag.t option -> (unit, error trace) t + | Clear : removal_criteria option -> (unit, error trace) t type view = View : _ t -> view diff --git a/src/lib_smart_rollup/rollup_node_services.ml b/src/lib_smart_rollup/rollup_node_services.ml index e45717e33629..bf95c6eedfdf 100644 --- a/src/lib_smart_rollup/rollup_node_services.ml +++ b/src/lib_smart_rollup/rollup_node_services.ml @@ -871,6 +871,21 @@ module Query = struct |+ opt_field "tag" Arg.operation_kind (fun k -> k) |> seal + let operation_tag_and_order_below_query = + let open Tezos_rpc.Query in + query (fun order operation_tag drop_no_order -> + object + method order = order + + method operation_tag = operation_tag + + method drop_no_order = drop_no_order + end) + |+ opt_field "order_below" Arg.z (fun q -> q#order) + |+ opt_field "tag" Arg.operation_kind (fun q -> q#operation_tag) + |+ field "drop_no_order" Tezos_rpc.Arg.bool false (fun q -> q#drop_no_order) + |> seal + let order_and_drop_duplicate_query = let open Tezos_rpc.Query in query (fun order drop_duplicate -> @@ -1186,7 +1201,7 @@ module Admin = struct let clear_injector_queues = Tezos_rpc.Service.delete_service ~description:"Clear operation queues of injectors" - ~query:Query.operation_tag_query + ~query:Query.operation_tag_and_order_below_query ~output:Data_encoding.unit (path / "injector" / "queues") diff --git a/src/lib_smart_rollup_node/rpc_directory.ml b/src/lib_smart_rollup_node/rpc_directory.ml index 0ec7c507da03..821c4b7b935e 100644 --- a/src/lib_smart_rollup_node/rpc_directory.ml +++ b/src/lib_smart_rollup_node/rpc_directory.ml @@ -679,7 +679,12 @@ let () = let () = Admin_directory.register0 Rollup_node_services.Admin.clear_injector_queues - @@ fun _node_ctxt tag () -> Injector.clear_queues ?tag () + @@ fun _node_ctxt query () -> + Injector.clear_queues + ~drop_no_order:query#drop_no_order + ?order_below:query#order + ?tag:query#operation_tag + () let () = Admin_directory.register0 Rollup_node_services.Admin.cancel_gc -- GitLab From 0433862cd9a4d4c2a780521b675b6c4b82046433 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Thu, 5 Dec 2024 13:39:00 +0100 Subject: [PATCH 3/4] rollup/node: add rpc to clear the heap of the batcher (with condition) --- CHANGES.rst | 12 ++++++ src/lib_smart_rollup/rollup_node_services.ml | 19 ++++++++ src/lib_smart_rollup_node/batcher.ml | 43 ++++++++++++++++++- src/lib_smart_rollup_node/batcher.mli | 5 +++ .../batcher_worker_types.ml | 39 ++++++++++++++--- .../batcher_worker_types.mli | 18 ++++++-- src/lib_smart_rollup_node/rpc_directory.ml | 12 ++++++ 7 files changed, 138 insertions(+), 10 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index a698f2f26394..cf7e3d843b75 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -88,6 +88,18 @@ Smart Rollup node When ``tag`` is specified only operation of that type will be considered, else all operations are considered.(MR :gl:`!15929`) +- Added RPC ``DELETE /admin/batcher/queue``, which can take two optional + arguments: + + + ``order_below``: an integer that filters all messages with order + inferior to it. + + + ``drop_no_order``: a boolean that if true remove all messages that + has no order specified. ``false` by default. If no ``order_below`` + is specified it completely clear the queue. + + (MR :gl:`!15929`) + - Updated RPC ``/local/batcher/injection`` with a new query argument possibility. When the rpc contains ``"drop_duplicate": true`` then the batcher will drop the messages that were already injected with a diff --git a/src/lib_smart_rollup/rollup_node_services.ml b/src/lib_smart_rollup/rollup_node_services.ml index bf95c6eedfdf..e1797ba1767f 100644 --- a/src/lib_smart_rollup/rollup_node_services.ml +++ b/src/lib_smart_rollup/rollup_node_services.ml @@ -871,6 +871,18 @@ module Query = struct |+ opt_field "tag" Arg.operation_kind (fun k -> k) |> seal + let order_below_query = + let open Tezos_rpc.Query in + query (fun order drop_no_order -> + object + method order = order + + method drop_no_order = drop_no_order + end) + |+ opt_field "order_below" Arg.z (fun q -> q#order) + |+ field "drop_no_order" Tezos_rpc.Arg.bool false (fun q -> q#drop_no_order) + |> seal + let operation_tag_and_order_below_query = let open Tezos_rpc.Query in query (fun order operation_tag drop_no_order -> @@ -1211,4 +1223,11 @@ module Admin = struct ~query:Tezos_rpc.Query.empty ~output:Data_encoding.bool (path / "cancel_gc") + + let clear_batcher_queues = + Tezos_rpc.Service.delete_service + ~description:"Clear operation queues of injectors" + ~query:Query.order_below_query + ~output:Data_encoding.unit + (path / "batcher" / "queue") end diff --git a/src/lib_smart_rollup_node/batcher.ml b/src/lib_smart_rollup_node/batcher.ml index ac2aacf71741..e34982682334 100644 --- a/src/lib_smart_rollup_node/batcher.ml +++ b/src/lib_smart_rollup_node/batcher.ml @@ -243,6 +243,22 @@ let on_register ?order ~drop_duplicate state (messages : string list) = let on_new_head state = produce_batches state ~only_full:false +let clear_queues {messages_heap; batched; _} = + Message_heap.clear messages_heap ; + Batched_messages.clear batched + +let remove_messages + ({drop_no_order; order_below} : Batcher_worker_types.order_request) + {messages_heap; _} = + let predicate_f message = + Option.map + (fun op_order -> Z.leq op_order order_below) + (L2_message.order message) + |> Option.value ~default:drop_no_order + in + let _ids = Message_heap.remove_predicate predicate_f messages_heap in + () + let init_batcher_state plugin node_ctxt = { node_ctxt; @@ -290,6 +306,14 @@ module Handlers = struct | Request.Register {order; messages; drop_duplicate} -> protect @@ fun () -> on_register ?order ~drop_duplicate state messages | Request.Produce_batches -> protect @@ fun () -> on_new_head state + | Request.Clear_queues -> + protect @@ fun () -> + clear_queues state ; + Lwt_result_syntax.return_unit + | Request.Remove_messages order_request -> + protect @@ fun () -> + remove_messages order_request state ; + Lwt_result_syntax.return_unit type launch_error = error trace @@ -311,10 +335,13 @@ module Handlers = struct match r with | Request.Register _ -> emit_and_return_errors errs | Request.Produce_batches -> emit_and_return_errors errs + | Request.Clear_queues -> emit_and_return_errors errs + | Request.Remove_messages _ -> emit_and_return_errors errs let on_completion _w r _ st = match Request.view r with - | Request.View (Register _ | Produce_batches) -> + | Request.View + (Register _ | Produce_batches | Clear_queues | Remove_messages _) -> Batcher_events.(emit Worker.request_completed_debug) (Request.view r, st) let on_no_request _ = Lwt.return_unit @@ -419,6 +446,20 @@ let produce_batches () = Worker.Queue.push_request_and_wait w Request.Produce_batches |> handle_request_error +let clean_queue ?order_request () = + let open Lwt_result_syntax in + let*? w = worker () in + let* () = + (match order_request with + | None -> Worker.Queue.push_request_and_wait w Request.Clear_queues + | Some order_request -> + Worker.Queue.push_request_and_wait + w + (Request.Remove_messages order_request)) + |> handle_request_error + in + return_unit + let shutdown () = match worker () with | Error _ -> diff --git a/src/lib_smart_rollup_node/batcher.mli b/src/lib_smart_rollup_node/batcher.mli index 239cb431dc58..fe8f3385f243 100644 --- a/src/lib_smart_rollup_node/batcher.mli +++ b/src/lib_smart_rollup_node/batcher.mli @@ -75,3 +75,8 @@ val message_status : L2_message.id -> (status * string) option tzresult (** Returns the status of the publisher worker *) val worker_status : unit -> [`Running | `Not_running | `Crashed of exn] + +val clean_queue : + ?order_request:Batcher_worker_types.order_request -> + unit -> + unit tzresult Lwt.t diff --git a/src/lib_smart_rollup_node/batcher_worker_types.ml b/src/lib_smart_rollup_node/batcher_worker_types.ml index a55b1a9a1f59..e96e982615ea 100644 --- a/src/lib_smart_rollup_node/batcher_worker_types.ml +++ b/src/lib_smart_rollup_node/batcher_worker_types.ml @@ -23,6 +23,8 @@ (* *) (*****************************************************************************) +type order_request = {drop_no_order : bool; order_below : Z.t} + module Request = struct type ('a, 'b) t = | Register : { @@ -32,6 +34,8 @@ module Request = struct } -> (L2_message.id list, error trace) t | Produce_batches : (unit, error trace) t + | Clear_queues : (unit, error trace) t + | Remove_messages : order_request -> (unit, error trace) t type view = View : _ t -> view @@ -61,6 +65,25 @@ module Request = struct (obj1 (req "request" (constant "produce_batches"))) (function View Produce_batches -> Some () | _ -> None) (fun () -> View Produce_batches); + case + (Tag 2) + ~title:"Clear_queues" + (obj1 (req "request" (constant "clear_queues"))) + (function View Clear_queues -> Some () | _ -> None) + (fun () -> View Clear_queues); + case + (Tag 3) + ~title:"Remove_messages" + (obj3 + (req "request" (constant "remove_operations")) + (req "drop_no_order" bool) + (req "order_below" n)) + (function + | View (Remove_messages {drop_no_order; order_below}) -> + Some ((), drop_no_order, order_below) + | _ -> None) + (fun ((), drop_no_order, order_below) -> + View (Remove_messages {drop_no_order; order_below})); ] let pp ppf (View r) = @@ -68,11 +91,17 @@ module Request = struct | Register {order = _; messages; drop_duplicate} -> Format.fprintf ppf - "register %d new L2 message%a" + "register %d new L2 message%s" (List.length messages) - (fun fmt () -> - if drop_duplicate then Format.pp_print_string fmt "" - else Format.fprintf fmt ", checking if message was already injected") - () + (if drop_duplicate then ", checking if messages were already injected" + else "") | Produce_batches -> Format.fprintf ppf "Producing messages batches." + | Clear_queues -> Format.fprintf ppf "Clear queues." + | Remove_messages {drop_no_order; order_below} -> + Format.fprintf + ppf + "Remove messages with order inferior to %a%s." + Z.pp_print + order_below + (if drop_no_order then ",and drop messages with node order" else "") end diff --git a/src/lib_smart_rollup_node/batcher_worker_types.mli b/src/lib_smart_rollup_node/batcher_worker_types.mli index 7d70fa678991..b9983751b8bd 100644 --- a/src/lib_smart_rollup_node/batcher_worker_types.mli +++ b/src/lib_smart_rollup_node/batcher_worker_types.mli @@ -26,6 +26,8 @@ (** This module contains the parameters for the worker (see {!Worker}) used by the batcher. *) +type order_request = {drop_no_order : bool; order_below : Z.t} + module Request : sig (** Type of requests accepted by the batcher worker. *) type ('a, 'b) t = @@ -36,12 +38,20 @@ module Request : sig } -> (L2_message.id list, error trace) t (** Request to register new L2 messages in the queue. if - [drop_duplicate] is [true], then the elements of - [messages] already processed by the batcher, with - [drop_duplicate = true], are dropped. *) + [drop_duplicate] is [true], then the elements of + [messages] already processed by the batcher, with + [drop_duplicate = true], are dropped. *) | Produce_batches : (unit, error trace) t (** Request to produce new messages batches and - submit them to the injector. *) + submit them to the injector. *) + | Clear_queues : (unit, error trace) t + (** Request to clear all queues, the heap and the batched + messages. *) + | Remove_messages : order_request -> (unit, error trace) t + (** Request to remove messages in the heap and batched with + order inferior to [order_below]. If [drop_no_order] is + set, remove also all messages that have no order + specified. *) type view = View : _ t -> view diff --git a/src/lib_smart_rollup_node/rpc_directory.ml b/src/lib_smart_rollup_node/rpc_directory.ml index 821c4b7b935e..1241a51c5a83 100644 --- a/src/lib_smart_rollup_node/rpc_directory.ml +++ b/src/lib_smart_rollup_node/rpc_directory.ml @@ -686,6 +686,18 @@ let () = ?tag:query#operation_tag () +let () = + Admin_directory.register0 Rollup_node_services.Admin.clear_batcher_queues + @@ fun _node_ctxt query () -> + Batcher.clean_queue + ?order_request: + (Option.map + (fun order_below -> + Batcher_worker_types. + {drop_no_order = query#drop_no_order; order_below}) + query#order) + () + let () = Admin_directory.register0 Rollup_node_services.Admin.cancel_gc @@ fun node_ctxt () () -> -- GitLab From 306f43358c294e2ed28b7eeaf0d56f50912e0c52 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Thu, 5 Dec 2024 14:30:32 +0100 Subject: [PATCH 4/4] rollup/tezt: test clear injector and batcher rpc --- tezt/lib_tezos/sc_rollup_rpc.ml | 77 ++++++++++++++++ tezt/lib_tezos/sc_rollup_rpc.mli | 20 ++++ tezt/tests/sc_rollup.ml | 153 ++++++++++++++++++++++++++++++- 3 files changed, 249 insertions(+), 1 deletion(-) diff --git a/tezt/lib_tezos/sc_rollup_rpc.ml b/tezt/lib_tezos/sc_rollup_rpc.ml index 55b39d2268fd..55effe888d09 100644 --- a/tezt/lib_tezos/sc_rollup_rpc.ml +++ b/tezt/lib_tezos/sc_rollup_rpc.ml @@ -351,3 +351,80 @@ let get_local_outbox_pending_executable () = let get_local_outbox_pending_unexecutable () = make GET ["local"; "outbox"; "pending"; "unexecutable"] outbox_list_of_json + +let delete_admin_batcher_queue ?order_below ?drop_no_order () = + let query_string = + let order_below_query = + match order_below with + | Some order_below -> [("order_below", string_of_int order_below)] + | None -> [] + in + let drop_no_order_query = + match drop_no_order with + | Some drop_no_order -> [("drop_no_order", string_of_bool drop_no_order)] + | None -> [] + in + order_below_query @ drop_no_order_query + in + make ~query_string DELETE ["admin"; "batcher"; "queue"] (fun json -> + let empty_list = JSON.as_object json in + if List.length empty_list = 0 then () + else + failwith + (sf "invalid json %s, expected empty json \"{}\"." (JSON.encode json))) + +let injectors_queue_json json = + let open JSON in + let queue_json json = + let tags = json |-> "tags" |> as_list |> List.map as_string in + let queue = json |-> "queue" |> as_list in + (tags, queue) + in + json |> as_list |> List.map queue_json + +let get_admin_injector_queues ?tag () = + let query_string = Option.map (fun tag -> [("tag", tag)]) tag in + make GET ?query_string ["admin"; "injector"; "queues"] injectors_queue_json + +let total_injector_json json = + let open JSON in + let queue_json json = + let tags = json |-> "tags" |> as_list |> List.map as_string in + let size = json |-> "queue_size" |> as_int in + (tags, size) + in + let queues = json |-> "injectors" |> as_list |> List.map queue_json in + let total = json |-> "total" |> as_int in + (queues, total) + +let get_admin_injector_queues_total ?tag () = + let query_string = Option.map (fun tag -> [("tag", tag)]) tag in + make + GET + ?query_string + ["admin"; "injector"; "queues"; "total"] + total_injector_json + +let delete_admin_injector_queues ?operation_tag ?order_below ?drop_no_order () = + let query_string = + let order_below_query = + match order_below with + | Some order_below -> [("order_below", string_of_int order_below)] + | None -> [] + in + let drop_no_order_query = + match drop_no_order with + | Some drop_no_order -> [("drop_no_order", string_of_bool drop_no_order)] + | None -> [] + in + let operation_tag = + match operation_tag with Some tag -> [("tag", tag)] | None -> [] + in + order_below_query @ drop_no_order_query @ operation_tag + in + make ~query_string DELETE ["admin"; "injector"; "queues"] (fun json -> + let empty_list = JSON.as_object json in + if List.length empty_list = 0 then () + else + failwith + (sf "invalid json %s, expected empty json \"{}\"." (JSON.encode json))) diff --git a/tezt/lib_tezos/sc_rollup_rpc.mli b/tezt/lib_tezos/sc_rollup_rpc.mli index cf32a2773409..ff1c03a2a1b3 100644 --- a/tezt/lib_tezos/sc_rollup_rpc.mli +++ b/tezt/lib_tezos/sc_rollup_rpc.mli @@ -208,3 +208,23 @@ val get_local_outbox_pending_executable : (** RPC: [GET /local/outbox/pending/unexecutable] *) val get_local_outbox_pending_unexecutable : unit -> (int * outbox_msg list) list RPC_core.t + +(** RPC: [DELETE /admin/batcher/queue] *) +val delete_admin_batcher_queue : + ?order_below:int -> ?drop_no_order:bool -> unit -> unit RPC_core.t + +(** RPC: [GET /admin/injector/queues] *) +val get_admin_injector_queues : + ?tag:string -> unit -> (string list * JSON.t list) list RPC_core.t + +(** RPC: [GET /admin/injector/queues/total] *) +val get_admin_injector_queues_total : + ?tag:string -> unit -> ((string list * int) list * int) RPC_core.t + +(** RPC: [DELETE /admin/injector/queues] *) +val delete_admin_injector_queues : + ?operation_tag:string -> + ?order_below:int -> + ?drop_no_order:bool -> + unit -> + unit RPC_core.t diff --git a/tezt/tests/sc_rollup.ml b/tezt/tests/sc_rollup.ml index a13e369e479e..674891991f3a 100644 --- a/tezt/tests/sc_rollup.ml +++ b/tezt/tests/sc_rollup.ml @@ -5926,6 +5926,33 @@ let test_batcher_order_msgs ~kind = unit in + let check_injector_queues ~__LOC__ ~expected_add_message_op = + let* injector_queues = + Sc_rollup_node.RPC.call rollup_node + @@ Sc_rollup_rpc.get_admin_injector_queues () + in + match injector_queues with + | [(injector_tag, injector_op_queue)] -> + Check.(list_mem string "add_messages" injector_tag ~__LOC__) + ~error_msg:"Injector queue %L operation tag does not contains tags %R" ; + let map_assert_only_add_messages op = + let kind = JSON.(op |-> "kind" |> as_string) in + Check.(("add_messages" = kind) string ~__LOC__) + ~error_msg:"Injector queue %L operation tag is not %R" ; + JSON.( + op |-> "message" |> as_list |> List.map JSON.as_string + |> List.map (fun s -> int_of_string @@ Hex.to_string (`Hex s))) + in + let add_messages_op = + List.map map_assert_only_add_messages injector_op_queue + in + Check.( + (add_messages_op = expected_add_message_op) (list (list int)) ~__LOC__) + ~error_msg: + "Injector queues add_messages %L was found but %R was expected" ; + unit + | _ -> failwith "invalid number of injector queues, only one expected" + in let bake_then_check_included_msgs ~__LOC__ ~expected_messages = let* level = Node.get_level node in let wait_for_injected = @@ -5953,7 +5980,14 @@ let test_batcher_order_msgs ~kind = in let* level = Node.get_level node in - let* () = Sc_rollup_node.run ~event_level:`Debug rollup_node rollup_addr [] in + let* () = + Sc_rollup_node.run + ~event_level:`Debug + rollup_node + rollup_addr + [Injector_retention_period 0] + (*so injector included queued messages is not entraiving us *) + in let* _ = Sc_rollup_node.wait_for_level rollup_node level in (* To make sure we are all bootstrapped, might be unused *) @@ -6046,6 +6080,123 @@ let test_batcher_order_msgs ~kind = [messages_order_1; messages_order_2; messages_no_order] in let* () = bake_then_check_included_msgs ~__LOC__ ~expected_messages in + Log.info "Testing the batcher clear RPCs." ; + + Log.info "Clearing messages with order <= 2 in the batcher queue." ; + let* hash_1 = inject_int_of_string ~order:1 ~drop_duplicate:true [1] in + let* hash_2 = inject_int_of_string ~order:2 ~drop_duplicate:true [2] in + let* hash_3 = inject_int_of_string ~order:3 ~drop_duplicate:true [3] in + let expected_queued_hashes = hash_1 @ hash_2 @ hash_3 in + let expected_messages = [1; 2; 3] in + let* () = check_queue ~__LOC__ ~expected_queued_hashes ~expected_messages in + let* () = + Sc_rollup_node.RPC.call rollup_node + @@ Sc_rollup_rpc.delete_admin_batcher_queue + ~drop_no_order:false + ~order_below:2 + () + in + let expected_queued_hashes = hash_3 in + let expected_messages = [3] in + let* () = check_queue ~__LOC__ ~expected_queued_hashes ~expected_messages in + + Log.info + "Clearing messages with order <= 2 in the batcher queue or no order \ + specified." ; + let* hash_1 = inject_int_of_string ~order:1 ~drop_duplicate:true [1] in + let* hash_2 = inject_int_of_string ~order:2 ~drop_duplicate:true [2] in + (* [3] is still in the batcher *) + let* hash_4 = inject_int_of_string (* no order *) ~drop_duplicate:true [4] in + let expected_queued_hashes = hash_1 @ hash_2 @ hash_3 @ hash_4 in + let expected_messages = [1; 2; 3; 4] in + let* () = check_queue ~__LOC__ ~expected_queued_hashes ~expected_messages in + let* () = + Sc_rollup_node.RPC.call rollup_node + @@ Sc_rollup_rpc.delete_admin_batcher_queue + ~drop_no_order:true + ~order_below:2 + () + in + let expected_queued_hashes = hash_3 in + let expected_messages = [3] in + let* () = check_queue ~__LOC__ ~expected_queued_hashes ~expected_messages in + + Log.info "Clearing all the messages in the batcher queue." ; + let* hashes = + inject_int_of_string ~order:1 ~drop_duplicate:true + @@ List.init half_min_batch Fun.id + in + let expected_queued_hashes = hashes in + let expected_messages = List.init half_min_batch Fun.id in + let* () = check_queue ~__LOC__ ~expected_queued_hashes ~expected_messages in + + let* () = + Sc_rollup_node.RPC.call rollup_node + @@ Sc_rollup_rpc.delete_admin_batcher_queue () + in + let* () = + check_queue ~__LOC__ ~expected_queued_hashes:[] ~expected_messages:[] + in + + Log.info "Testing the injector clear RPCs." ; + + Log.info "Clearing all operations in the injector queue." ; + (*enough messages to create an operations *) + let messages = List.init min_batch_elements Fun.id in + let* _hashes_1 = inject_int_of_string messages in + + (*nothing left in the batcher*) + let* () = + check_queue ~__LOC__ ~expected_queued_hashes:[] ~expected_messages:[] + in + let expected_add_message_op = [messages] in + let* () = check_injector_queues ~__LOC__ ~expected_add_message_op in + let* () = + Sc_rollup_node.RPC.call rollup_node + @@ Sc_rollup_rpc.delete_admin_injector_queues () + in + let* () = check_injector_queues ~__LOC__ ~expected_add_message_op:[] in + + Log.info "Clearing operations with order <= 1 ." ; + let messages_1 = messages in + let messages_2 = + List.init min_batch_elements (fun i -> min_batch_elements + i) + in + let messages_3 = + List.init min_batch_elements (fun i -> (2 * min_batch_elements) + i) + in + + let* _hashes = inject_int_of_string ~order:0 messages_1 in + let* _hashes = inject_int_of_string ~order:1 messages_2 in + let* _hashes = inject_int_of_string ~order:2 messages_3 in + + let expected_add_message_op = [messages_1; messages_2; messages_3] in + let* () = check_injector_queues ~__LOC__ ~expected_add_message_op in + let* () = + Sc_rollup_node.RPC.call rollup_node + @@ Sc_rollup_rpc.delete_admin_injector_queues ~order_below:1 () + in + let expected_add_message_op = + (* strictly below order messages are cleared *) [messages_2; messages_3] + in + let* () = check_injector_queues ~__LOC__ ~expected_add_message_op in + + Log.info "Clearing operations with order <= 1 and no order specified ." ; + let messages_no_order = messages in + let* _hashes_1 = inject_int_of_string @@ messages_no_order in + + let expected_add_message_op = [messages_2; messages_3; messages_no_order] in + let* () = check_injector_queues ~__LOC__ ~expected_add_message_op in + let* () = + Sc_rollup_node.RPC.call rollup_node + @@ Sc_rollup_rpc.delete_admin_injector_queues + ~order_below:2 + ~drop_no_order:true + () + in + let expected_add_message_op = [messages_3] in + + let* () = check_injector_queues ~__LOC__ ~expected_add_message_op in unit let test_injector_order_operations_by_kind ~kind = -- GitLab