From 1208281b67cd62f90c6f2afa0f262d760aa7a429 Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Thu, 25 Jul 2024 21:35:11 +0200 Subject: [PATCH 1/5] shell/prevalidator: refactor context fetching --- src/lib_shell/prevalidator.ml | 50 ++++++++++++++--------------------- 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index 0bcdb0b93ba4..e042d5d34c75 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -1002,6 +1002,25 @@ module Make Prevalidation_t.config_encoding pv.config + let fetch_context pv = + let open Lwt_result_syntax in + let*! ctxt_e = + let* context = + Prevalidation_t.get_context + pv.shell.parameters.chain_store + ~predecessor:pv.shell.predecessor + ~timestamp:(Time.System.to_protocol pv.shell.timestamp) + in + Proto.Plugin.get_context + context + ~head:(Store.Block.header pv.shell.predecessor).shell + in + match ctxt_e with + | Error errs -> + let*! () = Events.(emit pending_operation_context_error) errs in + Lwt.return_none + | Ok ctxt -> Lwt.return_some ctxt + let filter_validation_passes allowed_validation_passes (op : protocol_operation) = match allowed_validation_passes with @@ -1116,36 +1135,7 @@ module Make | Error errs, _ | _, Error errs -> Tezos_rpc.Answer.fail errs | Ok sources, Ok ophs -> let* ctxt = - if sources = [] then Lwt.return_none - else - let* context = - (* prevalidation_t.t contains the context, get_context returns it *) - Prevalidation_t.get_context - pv.shell.parameters.chain_store - ~predecessor:pv.shell.predecessor - ~timestamp:(Time.System.to_protocol pv.shell.timestamp) - in - match context with - | Error errs -> - let* () = - Events.(emit pending_operation_context_error) errs - in - Lwt.return_none - | Ok context -> ( - let* ctxt = - Proto.Plugin.get_context - context - ~head: - (Store.Block.header pv.shell.predecessor).shell - in - match ctxt with - | Error errs -> - let* () = - Events.(emit pending_operation_context_error) - errs - in - Lwt.return_none - | Ok ctxt -> Lwt.return_some ctxt) + if sources = [] then Lwt.return_none else fetch_context pv in let filter oph protocol res = let* is_in_sources = filter_sources ctxt sources protocol in -- GitLab From 642e87bf521b7ae243966425197abaadeede5ba1 Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Thu, 25 Jul 2024 22:18:41 +0200 Subject: [PATCH 2/5] shell/prevalidator: add source filter to monitor_operations RPC --- src/lib_shell/prevalidator.ml | 197 ++++++++++++---------- src/lib_shell_services/block_services.ml | 17 +- src/lib_shell_services/block_services.mli | 4 +- 3 files changed, 126 insertions(+), 92 deletions(-) diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index e042d5d34c75..8856efa9938c 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -1242,96 +1242,117 @@ module Make Tezos_rpc.Path.open_root) (fun pv params () -> Lwt_mutex.with_lock pv.lock @@ fun () -> - let op_stream, stopper = - Lwt_watcher.create_stream pv.operation_stream - in - (* First call : retrieve the current set of op from the mempool *) - let validated_seq = - if params#validated then - Classification.Sized_map.to_map - pv.shell.classification.validated - |> Operation_hash.Map.to_seq - |> Seq.map (fun (hash, {protocol; _}) -> - ((hash, protocol), None)) - else Seq.empty - in - let process_error_map map = - let open Operation_hash in - map |> Map.to_seq - |> Seq.map (fun (hash, (op, error)) -> - ((hash, op.protocol), Some error)) - in - let refused_seq = - if params#refused then - process_error_map - (Classification.map pv.shell.classification.refused) - else Seq.empty - in - let branch_refused_seq = - if params#branch_refused then - process_error_map - (Classification.map pv.shell.classification.branch_refused) - else Seq.empty - in - let branch_delayed_seq = - if params#branch_delayed then - process_error_map - (Classification.map pv.shell.classification.branch_delayed) - else Seq.empty - in - let outdated_seq = - if params#outdated then - process_error_map - (Classification.map pv.shell.classification.outdated) - else Seq.empty - in - let filter ((_, op), _) = - filter_validation_passes params#validation_passes op - in - let current_mempool = - Seq.append outdated_seq branch_delayed_seq - |> Seq.append branch_refused_seq - |> Seq.append refused_seq |> Seq.append validated_seq - |> Seq.filter filter |> List.of_seq - in - let current_mempool = ref (Some current_mempool) in - let filter_result = function - | `Validated -> params#validated - | `Refused _ -> params#refused - | `Outdated _ -> params#outdated - | `Branch_refused _ -> params#branch_refused - | `Branch_delayed _ -> params#branch_delayed + let open Lwt_syntax in + let sources = + List.map_e Signature.Public_key_hash.of_b58check params#sources in - let rec next () = - let open Lwt_syntax in - match !current_mempool with - | Some mempool -> - current_mempool := None ; - Lwt.return_some (params#version, mempool) - | None -> ( - let* o = Lwt_stream.get op_stream in - match o with - | Some (kind, op) - when filter_result kind - && filter_validation_passes + match sources with + | Error errs -> Tezos_rpc.Answer.fail errs + | Ok sources -> + let op_stream, stopper = + Lwt_watcher.create_stream pv.operation_stream + in + (* First call : retrieve the current set of op from the mempool *) + let validated_seq = + if params#validated then + Classification.Sized_map.to_map + pv.shell.classification.validated + |> Operation_hash.Map.to_seq + |> Seq.map (fun (hash, {protocol; _}) -> + ((hash, protocol), None)) + else Seq.empty + in + let process_error_map map = + let open Operation_hash in + map |> Map.to_seq + |> Seq.map (fun (hash, (op, error)) -> + ((hash, op.protocol), Some error)) + in + let refused_seq = + if params#refused then + process_error_map + (Classification.map pv.shell.classification.refused) + else Seq.empty + in + let branch_refused_seq = + if params#branch_refused then + process_error_map + (Classification.map + pv.shell.classification.branch_refused) + else Seq.empty + in + let branch_delayed_seq = + if params#branch_delayed then + process_error_map + (Classification.map + pv.shell.classification.branch_delayed) + else Seq.empty + in + let outdated_seq = + if params#outdated then + process_error_map + (Classification.map pv.shell.classification.outdated) + else Seq.empty + in + let* ctxt = + if sources = [] then Lwt.return_none else fetch_context pv + in + let filter ((_, op), _) = + let* is_in_sources = filter_sources ctxt sources op in + return + (filter_validation_passes params#validation_passes op + && is_in_sources) + in + let* current_mempool = + Seq.append outdated_seq branch_delayed_seq + |> Seq.append branch_refused_seq + |> Seq.append refused_seq |> Seq.append validated_seq + |> Lwt_seq.of_seq |> Lwt_seq.filter_s filter + |> Lwt_seq.to_list + in + let current_mempool = ref (Some current_mempool) in + let filter_result = function + | `Validated -> params#validated + | `Refused _ -> params#refused + | `Outdated _ -> params#outdated + | `Branch_refused _ -> params#branch_refused + | `Branch_delayed _ -> params#branch_delayed + in + let rec next () = + match !current_mempool with + | Some mempool -> + current_mempool := None ; + Lwt.return_some (params#version, mempool) + | None -> ( + let* o = Lwt_stream.get op_stream in + match o with + | Some (kind, op) -> + let* is_in_sources = + filter_sources ctxt sources op.protocol + in + if + filter_validation_passes params#validation_passes - op.protocol -> - let errors = - match kind with - | `Validated -> None - | `Branch_delayed errors - | `Branch_refused errors - | `Refused errors - | `Outdated errors -> - Some errors - in - Lwt.return_some - (params#version, [((op.hash, op.protocol), errors)]) - | Some _ -> next () - | None -> Lwt.return_none) - in - let shutdown () = Lwt_watcher.shutdown stopper in - Tezos_rpc.Answer.return_stream {next; shutdown}) ; + op.protocol + && is_in_sources && filter_result kind + then + let errors = + match kind with + | `Validated -> None + | `Branch_delayed errors + | `Branch_refused errors + | `Refused errors + | `Outdated errors -> + Some errors + in + Lwt.return_some + ( params#version, + [((op.hash, op.protocol), errors)] ) + else next () + | None -> Lwt.return_none) + in + let shutdown () = Lwt_watcher.shutdown stopper in + Tezos_rpc.Answer.return_stream {next; shutdown}) ; !dir) (** Module implementing the events at the {!Worker} level. Contrary diff --git a/src/lib_shell_services/block_services.ml b/src/lib_shell_services/block_services.ml index 295af8f00b3e..c18d10993956 100644 --- a/src/lib_shell_services/block_services.ml +++ b/src/lib_shell_services/block_services.ml @@ -1395,7 +1395,7 @@ module Make (Proto : PROTO) (Next_proto : PROTO) = struct let monitor_operations_versions = mk_version_1_informations () - let mempool_query = + let monitor_operations_query = let open Tezos_rpc.Query in query (fun @@ -1406,6 +1406,7 @@ module Make (Proto : PROTO) (Next_proto : PROTO) = struct branch_refused branch_delayed validation_passes + sources -> object method version = version @@ -1421,6 +1422,8 @@ module Make (Proto : PROTO) (Next_proto : PROTO) = struct method branch_delayed = branch_delayed method validation_passes = validation_passes + + method sources = sources end) |+ field "version" @@ -1463,6 +1466,11 @@ module Make (Proto : PROTO) (Next_proto : PROTO) = struct "validation_pass" Tezos_rpc.Arg.int (fun t -> t#validation_passes) + |+ multi_field + ~descr:"Include operations filtered by sources (all by default)" + "sources" + Tezos_rpc.Arg.string + (fun t -> t#sources) |> seal (* We extend the object so that the fields of 'next_operation' @@ -1484,7 +1492,7 @@ module Make (Proto : PROTO) (Next_proto : PROTO) = struct let monitor_operations path = Tezos_rpc.Service.get_service ~description:"Monitor the mempool operations." - ~query:mempool_query + ~query:monitor_operations_query ~output:processed_operation_encoding Tezos_rpc.Path.(path / "monitor_operations") @@ -1885,7 +1893,8 @@ module Make (Proto : PROTO) (Next_proto : PROTO) = struct let monitor_operations ctxt ?(chain = `Main) ?(version = S.Mempool.monitor_operations_versions.default) ?(validated = true) ?(branch_delayed = true) ?(branch_refused = false) - ?(refused = false) ?(outdated = false) ?(validation_passes = []) () = + ?(refused = false) ?(outdated = false) ?(validation_passes = []) + ?(sources = []) () = let open Lwt_result_syntax in let s = S.Mempool.monitor_operations (mempool_path chain_path) in let* stream, stopper = @@ -1907,6 +1916,8 @@ module Make (Proto : PROTO) (Next_proto : PROTO) = struct method branch_delayed = branch_delayed method validation_passes = validation_passes + + method sources = sources end) () in diff --git a/src/lib_shell_services/block_services.mli b/src/lib_shell_services/block_services.mli index d42e96cef8b8..e8375fae3ed8 100644 --- a/src/lib_shell_services/block_services.mli +++ b/src/lib_shell_services/block_services.mli @@ -463,6 +463,7 @@ module Make (Proto : PROTO) (Next_proto : PROTO) : sig ?refused:bool -> ?outdated:bool -> ?validation_passes:int list -> + ?sources:string list -> unit -> (((Operation_hash.t * Next_proto.operation) * error trace option) list Lwt_stream.t @@ -781,7 +782,8 @@ module Make (Proto : PROTO) (Next_proto : PROTO) : sig ; branch_refused : bool ; refused : bool ; outdated : bool - ; validation_passes : int list >, + ; validation_passes : int list + ; sources : string list >, unit, version * ((Operation_hash.t * Next_proto.operation) * error trace option) -- GitLab From 0e187d17e9380a1f2d7c7168e71eb64f51bd01fa Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Tue, 30 Jul 2024 15:27:32 +0200 Subject: [PATCH 3/5] tezt/rpc: add source argument for monitor_operations RPC --- tezt/lib_tezos/RPC.ml | 4 +++- tezt/lib_tezos/RPC.mli | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/tezt/lib_tezos/RPC.ml b/tezt/lib_tezos/RPC.ml index abd6c6de119f..b3a5b249a4c9 100644 --- a/tezt/lib_tezos/RPC.ml +++ b/tezt/lib_tezos/RPC.ml @@ -503,7 +503,8 @@ let get_chain_mempool_pending_operations ?(chain = "main") ?version ?validated Fun.id let get_chain_mempool_monitor_operations ?(chain = "main") ?version ?validated - ?branch_delayed ?branch_refused ?refused ?outdated ?validation_passes () = + ?branch_delayed ?branch_refused ?refused ?outdated ?validation_passes + ?sources () = let query_string = Query_arg.opt "version" Fun.id version @ Query_arg.opt_bool "validated" validated @@ -515,6 +516,7 @@ let get_chain_mempool_monitor_operations ?(chain = "main") ?version ?validated "validation_pass" (fun name vp -> (name, string_of_int vp)) validation_passes + @ Query_arg.opt_list "sources" (fun name source -> (name, source)) sources in make ~query_string diff --git a/tezt/lib_tezos/RPC.mli b/tezt/lib_tezos/RPC.mli index 5d4bdeb938d0..ba63db681699 100644 --- a/tezt/lib_tezos/RPC.mli +++ b/tezt/lib_tezos/RPC.mli @@ -496,6 +496,7 @@ val get_chain_mempool_monitor_operations : ?refused:bool -> ?outdated:bool -> ?validation_passes:int list -> + ?sources:string list -> unit -> JSON.t t -- GitLab From 2389d5c8133ae0fb6044195ad71f5e18f5f4ea05 Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Tue, 30 Jul 2024 15:28:53 +0200 Subject: [PATCH 4/5] tezt/prevalidator: add a test for sources filter in monitor_operation RPC --- tezt/tests/prevalidator.ml | 142 +++++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) diff --git a/tezt/tests/prevalidator.ml b/tezt/tests/prevalidator.ml index 794f2dbfaff5..1422b12a3cd3 100644 --- a/tezt/tests/prevalidator.ml +++ b/tezt/tests/prevalidator.ml @@ -123,6 +123,22 @@ module Revamped = struct ?unprocessed mempool) + (* Calls the [/mempool/monitor_operations] RPC. + The returned promise resolves with the list of streamed + operations when monitoring completes (usually upon flush). *) + let monitor_operations ?(sources = []) node = + let uri = + RPC_core.make_uri + (Node.as_rpc_endpoint node) + (RPC.get_chain_mempool_monitor_operations ~sources ()) + |> Uri.to_string + in + let runnable = Curl.get_raw uri in + let* stdout = Runnable.run runnable in + String.split_on_char '\n' stdout + |> List.filter_map (JSON.parse_opt ~origin:"monitor_operations") + |> List.map JSON.as_list |> List.flatten |> return + (** {2 Tests } *) (** This test injects some transfer operations and checks that the mempool does @@ -3088,6 +3104,131 @@ module Revamped = struct in log_step 13 "Check that node1 validates op5 but refuses op6." ; check_mempool ~validated:[oph3; oph5] ~refused:[oph2; oph4; oph6] client1 + + (** This test injects manager operations for several sources and checks that + the [sources] filter of the [monitor_operations] RPC correctly + filters operations by sources. *) + let test_filter_monitor_operations_by_sources = + Protocol.register_test + ~__FILE__ + ~title:"Filter monitor_operations by sources" + ~tags:[team; "mempool"; "rpc"; "monitor_operations"; "sources"] + @@ fun protocol -> + log_step 1 "Initialize a node and a client." ; + let* node, client = + Client.init_with_protocol + ~nodes_args:[Synchronisation_threshold 0] + ~protocol + `Client + () + in + let bootstrap1, bootstrap2, bootstrap3, bootstrap4, bootstrap5 = + Constant.(bootstrap1, bootstrap2, bootstrap3, bootstrap4, bootstrap5) + in + + log_step 2 "starting monitoring for %s operations" bootstrap1.alias ; + let monitoring = + monitor_operations ~sources:[bootstrap1.public_key_hash] node + in + + log_step 3 "bake_for_and_wait" ; + let* () = Client.bake_for_and_wait ~node client in + + log_step 4 "check that no operations were returned through the monitoring" ; + let* ops = monitoring in + let ophs = List.map JSON.(fun json -> json |-> "hash" |> as_string) ops in + Check.( + ([] = ophs) (list string) ~error_msg:"Expected operations %L, got %R") ; + + log_step + 5 + "inject operations from %s, %s and %s" + bootstrap1.alias + bootstrap2.alias + bootstrap3.alias ; + let* (`OpHash oph1) = + Operation.Manager.(inject [make ~source:bootstrap1 (transfer ())]) client + in + let* (`OpHash oph2) = + Operation.Manager.(inject [make ~source:bootstrap2 (transfer ())] client) + in + let* (`OpHash _) = + Operation.Manager.(inject [make ~source:bootstrap3 (transfer ())]) client + in + + log_step + 6 + "starting monitoring for %s, %s and %s operations" + bootstrap1.alias + bootstrap2.alias + bootstrap4.alias ; + let monitoring = + monitor_operations + ~sources: + [ + bootstrap1.public_key_hash; + bootstrap2.public_key_hash; + bootstrap4.public_key_hash; + ] + node + in + + log_step + 7 + "inject operations from %s and %s" + bootstrap4.alias + bootstrap5.alias ; + let* (`OpHash oph4) = + Operation.Manager.(inject [make ~source:bootstrap4 (transfer ())]) client + in + let* (`OpHash _) = + Operation.Manager.(inject [make ~source:bootstrap5 (transfer ())] client) + in + + log_step 8 "bake_for_and_wait" ; + let* () = Client.bake_for_and_wait ~node client in + + log_step + 9 + "check that only %s, %s and %s operations are returned through the \ + monitoring" + bootstrap1.alias + bootstrap2.alias + bootstrap4.alias ; + let* ops = monitoring in + let ophs = List.map JSON.(fun json -> json |-> "hash" |> as_string) ops in + Check.( + ([oph2; oph1; oph4] = ophs) + (list string) + ~error_msg:"Expected operations %L, got %R") ; + + log_step 10 "starting monitoring for %s operations" bootstrap1.alias ; + let monitoring = + monitor_operations ~sources:[bootstrap1.public_key_hash] node + in + + log_step + 11 + "inject operations from %s and %s" + bootstrap2.alias + bootstrap3.alias ; + let* (`OpHash _) = + Operation.Manager.(inject [make ~source:bootstrap2 (transfer ())]) client + in + let* (`OpHash _) = + Operation.Manager.(inject [make ~source:bootstrap3 (transfer ())]) client + in + + log_step 12 "bake_for_and_wait" ; + let* () = Client.bake_for_and_wait ~node client in + + log_step 13 "check that no operations were returned through the monitoring" ; + let* ops = monitoring in + let ophs = List.map JSON.(fun json -> json |-> "hash" |> as_string) ops in + Check.( + ([] = ophs) (list string) ~error_msg:"Expected operations %L, got %R") ; + + unit end let check_operation_is_in_validated_mempool ops oph = @@ -4199,6 +4340,7 @@ let register ~protocols = Revamped.propagation_future_attestation protocols ; Revamped.test_mempool_config_operation_filtering protocols ; Revamped.test_filter_mempool_operations_by_hash protocols ; + Revamped.test_filter_monitor_operations_by_sources protocols ; forge_pre_filtered_operation protocols ; refetch_failed_operation protocols ; ban_operation_and_check_validated protocols ; -- GitLab From 4279e9459610b23b2d2c54a943d5b0bab820a532 Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Tue, 30 Jul 2024 19:28:08 +0200 Subject: [PATCH 5/5] Changelog: add entry for monitor_operations RPC new argument sources Co-authored-by: Victor Allombert --- CHANGES.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 42d5dcff4fe0..0469d5fcfac6 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -43,6 +43,10 @@ Node /chains//mempool/pending_operations`` which allows operations to be filtered by hash. (MR :gl:`!13977`) +- Add a ``source`` argument to ``GET + /chains//mempool/monitor_operations`` which allows operations + to be filtered by source. (MR :gl:`!14284`) + - Add an RPC ``/chains//blocks//context/smart_rollups/smart_rollup//consumed_outputs/`` that returns the consumed output's indexes for the given outbox -- GitLab