From 563f8b58321b526db3dea9a20c5583edd8d7963e Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Fri, 21 Feb 2025 10:46:58 +0100 Subject: [PATCH] evm/node: introduce tx_queue configuration --- etherlink/CHANGES_NODE.md | 2 + etherlink/bin_node/config/configuration.ml | 40 ++++++++++- etherlink/bin_node/config/configuration.mli | 6 +- etherlink/bin_node/lib_dev/observer.ml | 69 +++++++++---------- etherlink/bin_node/lib_dev/services.ml | 4 +- etherlink/bin_node/lib_dev/tx_queue.ml | 38 +++++----- etherlink/bin_node/lib_dev/tx_queue.mli | 6 +- etherlink/tezt/lib/evm_node.ml | 5 +- .../Alpha- Configuration RPC.out | 6 +- .../EVM Node- describe config.out | 11 ++- 10 files changed, 113 insertions(+), 74 deletions(-) diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index f855d2b1b3de..e53ff3c5707a 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -17,6 +17,8 @@ - Observer `--init-from-snapshot` now also accepts a path to an existing snapshot. (!16963) +- **experimental feature** Adds a configuration for the `tx_queue`. + (!16903) ### RPCs changes diff --git a/etherlink/bin_node/config/configuration.ml b/etherlink/bin_node/config/configuration.ml index ae254ffdc68b..4903183c81fd 100644 --- a/etherlink/bin_node/config/configuration.ml +++ b/etherlink/bin_node/config/configuration.ml @@ -64,6 +64,37 @@ let chain_id_encoding : Ethereum_types.chain_id Data_encoding.t = type l2_chain = {chain_id : Ethereum_types.chain_id} +type tx_queue = {max_size : int; max_transaction_batch_length : int option} + +let tx_queue_encoding = + let open Data_encoding in + conv + (fun {max_size; max_transaction_batch_length} -> + (max_size, max_transaction_batch_length)) + (fun (max_size, max_transaction_batch_length) -> + {max_size; max_transaction_batch_length}) + (obj2 (req "max_size" int31) (opt "max_transaction_batch_length" int31)) + +let default_tx_queue = {max_size = 1000; max_transaction_batch_length = None} + +let tx_queue_opt_encoding = + let open Data_encoding in + union + [ + case + ~title:"tx queue configuration" + Json_only + (option tx_queue_encoding) + (function Some tx_queue -> Some (Some tx_queue) | None -> Some None) + Fun.id; + case + ~title:"tx queue enable" + Json_only + bool + (function _ -> None) + (function true -> Some default_tx_queue | _ -> None); + ] + type experimental_features = { drop_duplicate_on_injection : bool; blueprints_publisher_order_enabled : bool; @@ -75,7 +106,7 @@ type experimental_features = { monitor_websocket_heartbeat : monitor_websocket_heartbeat option; spawn_rpc : int option; l2_chains : l2_chain list option; - enable_tx_queue : bool; + enable_tx_queue : tx_queue option; } type sequencer = { @@ -153,6 +184,9 @@ type t = { history_mode : history_mode option; } +let is_tx_queue_enabled {experimental_features = {enable_tx_queue; _}; _} = + Option.is_some enable_tx_queue + let default_filter_config ?max_nb_blocks ?max_nb_logs ?chunk_size () = { max_nb_blocks = Option.value ~default:100 max_nb_blocks; @@ -218,7 +252,7 @@ let default_experimental_features = monitor_websocket_heartbeat = default_monitor_websocket_heartbeat; spawn_rpc = None; l2_chains = default_l2_chains; - enable_tx_queue = false; + enable_tx_queue = None; } let default_data_dir = Filename.concat (Sys.getenv "HOME") ".octez-evm-node" @@ -980,7 +1014,7 @@ let experimental_features_encoding = (dft "enable_tx_queue" ~description:"Replace the observer tx pool by a tx queue" - bool + tx_queue_opt_encoding default_experimental_features.enable_tx_queue))) let proxy_encoding = diff --git a/etherlink/bin_node/config/configuration.mli b/etherlink/bin_node/config/configuration.mli index f1cbd6ed4a2b..bcf09762d7b7 100644 --- a/etherlink/bin_node/config/configuration.mli +++ b/etherlink/bin_node/config/configuration.mli @@ -87,6 +87,8 @@ val chain_id : supported_network -> Ethereum_types.chain_id type l2_chain = {chain_id : Ethereum_types.chain_id} +type tx_queue = {max_size : int; max_transaction_batch_length : int option} + (** Configuration settings for experimental features, with no backward compatibility guarantees. *) type experimental_features = { @@ -100,7 +102,7 @@ type experimental_features = { monitor_websocket_heartbeat : monitor_websocket_heartbeat option; spawn_rpc : int option; l2_chains : l2_chain list option; - enable_tx_queue : bool; + enable_tx_queue : tx_queue option; } type sequencer = { @@ -185,6 +187,8 @@ type t = { history_mode : history_mode option; } +val is_tx_queue_enabled : t -> bool + val history_mode_encoding : history_mode Data_encoding.t val pp_history_mode_debug : Format.formatter -> history_mode -> unit diff --git a/etherlink/bin_node/lib_dev/observer.ml b/etherlink/bin_node/lib_dev/observer.ml index aabe9e197251..34dda6e80786 100644 --- a/etherlink/bin_node/lib_dev/observer.ml +++ b/etherlink/bin_node/lib_dev/observer.ml @@ -196,41 +196,36 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync in let* () = - if config.experimental_features.enable_tx_queue then - let* () = - Tx_queue.start - ~relay_endpoint:evm_node_endpoint - ~max_transaction_batch_length:None - () - in - return_unit - else - let mode = - if config.finalized_view then - Tx_pool.Forward - { - injector = - (fun _ raw_tx -> - Injector.send_raw_transaction - ~keep_alive:config.keep_alive - ~base:evm_node_endpoint - ~raw_tx); - } - else Tx_pool.Relay - in - Tx_pool.start - { - backend = observer_backend; - smart_rollup_address = - Tezos_crypto.Hashed.Smart_rollup_address.to_b58check - smart_rollup_address; - mode; - tx_timeout_limit = config.tx_pool_timeout_limit; - tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; - tx_pool_tx_per_addr_limit = - Int64.to_int config.tx_pool_tx_per_addr_limit; - max_number_of_chunks = None; - } + match config.experimental_features.enable_tx_queue with + | Some tx_queue_config -> + Tx_queue.start ~evm_node_endpoint ~config:tx_queue_config () + | None -> + let mode = + if config.finalized_view then + Tx_pool.Forward + { + injector = + (fun _ raw_tx -> + Injector.send_raw_transaction + ~keep_alive:config.keep_alive + ~base:evm_node_endpoint + ~raw_tx); + } + else Tx_pool.Relay + in + Tx_pool.start + { + backend = observer_backend; + smart_rollup_address = + Tezos_crypto.Hashed.Smart_rollup_address.to_b58check + smart_rollup_address; + mode; + tx_timeout_limit = config.tx_pool_timeout_limit; + tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; + tx_pool_tx_per_addr_limit = + Int64.to_int config.tx_pool_tx_per_addr_limit; + max_number_of_chunks = None; + } in Metrics.init ~mode:"observer" @@ -314,7 +309,7 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync let*! () = task in return_unit else - let ping_tx_pool = not config.experimental_features.enable_tx_queue in + let ping_tx_pool = not @@ Configuration.is_tx_queue_enabled config in let* () = Blueprints_follower.start ~ping_tx_pool @@ -323,7 +318,7 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync ~next_blueprint_number (on_new_blueprint evm_node_endpoint) and* () = - if config.experimental_features.enable_tx_queue then + if Configuration.is_tx_queue_enabled config then Tx_queue.beacon ~tick_interval:0.05 else return_unit in diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index c28f4beaeaab..2d864380d6a9 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -650,7 +650,7 @@ let dispatch_request (rpc : Configuration.rpc) rpc_error (Rpc_errors.transaction_rejected err None) | Ok transaction_object -> ( let* tx_hash = - if config.experimental_features.enable_tx_queue then + if Configuration.is_tx_queue_enabled config then let* () = Tx_queue.inject transaction_object tx_raw in return (Ok transaction_object.hash) else Tx_pool.add transaction_object txn @@ -875,7 +875,7 @@ let dispatch_private_request (rpc : Configuration.rpc) rpc_error (Rpc_errors.transaction_rejected err None) | Ok transaction_object -> ( let* tx_hash = - if config.experimental_features.enable_tx_queue then + if Configuration.is_tx_queue_enabled config then let transaction = Ethereum_types.hex_encode_string raw_txn in let* () = Tx_queue.inject transaction_object transaction in return @@ Ok transaction_object.hash diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 69b256585024..8a8960e5c5db 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -10,10 +10,7 @@ open Tezos_workers let two_seconds = Ptime.Span.of_int_s 2 -type parameters = { - relay_endpoint : Uri.t; - max_transaction_batch_length : int option; -} +type parameters = {evm_node_endpoint : Uri.t; config : Configuration.tx_queue} type callback = [`Accepted of Ethereum_types.hash | `Confirmed | `Dropped | `Refused] -> @@ -29,7 +26,7 @@ module Pending_transactions = struct type t = pending S.t - let empty () = S.create 1000 + let empty ~start_size = S.create start_size let add htbl (Hash (Hex hash)) callback = S.add htbl hash ({callback; since = Time.System.now ()} : pending) @@ -56,10 +53,10 @@ module Pending_transactions = struct end type state = { - relay_endpoint : Uri.t; + evm_node_endpoint : Uri.t; mutable queue : request Queue.t; pending : Pending_transactions.t; - max_transaction_batch_length : int option; + config : Configuration.tx_queue; } module Types = struct @@ -140,7 +137,7 @@ type worker = Worker.infinite Worker.queue Worker.t let uuid_seed = Random.get_state () -let send_transactions_batch ~relay_endpoint transactions = +let send_transactions_batch ~evm_node_endpoint transactions = let open Lwt_result_syntax in let module M = Map.Make (String) in let module Srt = Rpc_encodings.Send_raw_transaction in @@ -171,7 +168,7 @@ let send_transactions_batch ~relay_endpoint transactions = let* responses = Rollup_services.call_service ~keep_alive:true - ~base:relay_endpoint + ~base:evm_node_endpoint (Batch.dispatch_batch_service ~path:Resto.Path.root) () () @@ -242,7 +239,7 @@ module Handlers = struct | Tick -> let all_transactions = Queue.to_seq state.queue in let* transactions_to_inject, remaining_transactions = - match state.max_transaction_batch_length with + match state.config.max_transaction_batch_length with | None -> return (all_transactions, Seq.empty) | Some max_transaction_batch_length -> let when_negative_length = @@ -267,7 +264,7 @@ module Handlers = struct let+ () = send_transactions_batch - ~relay_endpoint:state.relay_endpoint + ~evm_node_endpoint:state.evm_node_endpoint transactions_to_inject in @@ -278,15 +275,16 @@ module Handlers = struct type launch_error = tztrace - let on_launch _self () - ({relay_endpoint; max_transaction_batch_length} : parameters) = + let on_launch _self () ({evm_node_endpoint; config} : parameters) = let open Lwt_result_syntax in return { - relay_endpoint; + evm_node_endpoint; queue = Queue.create (); - pending = Pending_transactions.empty (); - max_transaction_batch_length; + pending = Pending_transactions.empty ~start_size:(config.max_size / 4); + (* start with /4 and let it grow if necessary to not allocate + too much at start. *) + config; } let on_error (type a b) _self _status_request (_r : (a, b) Request.t) @@ -360,14 +358,10 @@ let inject ?(callback = fun _ -> Lwt_syntax.return_unit) let confirm txn_hash = bind_worker @@ fun w -> push_request w (Confirm {txn_hash}) -let start ~relay_endpoint ~max_transaction_batch_length () = +let start ~config ~evm_node_endpoint () = let open Lwt_result_syntax in let* worker = - Worker.launch - table - () - {relay_endpoint; max_transaction_batch_length} - (module Handlers) + Worker.launch table () {evm_node_endpoint; config} (module Handlers) in Lwt.wakeup worker_waker worker ; let*! () = Tx_queue_events.is_ready () in diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index 7d3850ea3bf0..5a5df2d26e37 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -33,12 +33,12 @@ type callback = type request = {payload : Ethereum_types.hex; callback : callback} -(** [start ~relay_endpoint ~max_transaction_batch_length ()] starts +(** [start ~evm_node_endpoint ~max_transaction_batch_length ()] starts the worker, meaning it is possible to call {!inject}, {!confirm} and {!beacon}. *) val start : - relay_endpoint:Uri.t -> - max_transaction_batch_length:int option -> + config:Configuration.tx_queue -> + evm_node_endpoint:Uri.t -> unit -> unit tzresult Lwt.t diff --git a/etherlink/tezt/lib/evm_node.ml b/etherlink/tezt/lib/evm_node.ml index 80ce7706a4c1..7840137ef7b8 100644 --- a/etherlink/tezt/lib/evm_node.ml +++ b/etherlink/tezt/lib/evm_node.ml @@ -1288,7 +1288,10 @@ let patch_config_with_experimental_feature | Resto -> `String "resto" | Dream -> `String "dream") |> conditional_json_put enable_websocket ~name:"enable_websocket" (`Bool true) - |> conditional_json_put enable_tx_queue ~name:"enable_tx_queue" (`Bool true) + |> conditional_json_put + enable_tx_queue + ~name:"enable_tx_queue" + (`O [("max_size", `Float 1000.)]) |> optional_json_put max_websocket_message_length ~name:"max_websocket_message_length" diff --git a/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out b/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out index efd8cdd8f843..5d71c4fd3f6b 100644 --- a/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out +++ b/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out @@ -24,7 +24,7 @@ }, "spawn_rpc": null, "l2_chains": null, - "enable_tx_queue": false + "enable_tx_queue": null }, "proxy": { "ignore_block_param": false @@ -84,7 +84,7 @@ }, "spawn_rpc": null, "l2_chains": null, - "enable_tx_queue": false + "enable_tx_queue": null }, "proxy": { "ignore_block_param": false @@ -137,7 +137,7 @@ }, "spawn_rpc": null, "l2_chains": null, - "enable_tx_queue": false + "enable_tx_queue": null }, "proxy": { "evm_node_endpoint": "hidden", diff --git a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out index 8ba82f7218c8..e3c10c5d782e 100644 --- a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out +++ b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out @@ -177,8 +177,15 @@ || null /* None */, "enable_tx_queue"?: - boolean - /* Replace the observer tx pool by a tx queue */ }, + /* Replace the observer tx pool by a tx queue */ + /* tx queue configuration */ + { /* Some */ + "max_size": integer ∈ [-2^30, 2^30], + "max_transaction_batch_length"?: integer ∈ [-2^30, 2^30] } + || null + /* None */ + || boolean + /* tx queue enable */ }, "proxy"?: { "finalized_view"?: boolean -- GitLab