From d95551b1e6678cc207525f7152c341a9d52d5f36 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Wed, 5 Mar 2025 11:13:14 +0100 Subject: [PATCH 1/3] evm/node: limit the number of transactions per user in the tx_queue --- etherlink/CHANGES_NODE.md | 4 + etherlink/bin_node/config/configuration.ml | 41 +++-- etherlink/bin_node/config/configuration.mli | 1 + etherlink/bin_node/lib_dev/services.ml | 12 +- etherlink/bin_node/lib_dev/tx_queue.ml | 107 ++++++++++--- etherlink/bin_node/lib_dev/tx_queue.mli | 2 +- etherlink/tezt/lib/evm_node.ml | 9 +- etherlink/tezt/lib/evm_node.mli | 6 +- etherlink/tezt/tests/evm_sequencer.ml | 144 +++++++++++++++++- .../EVM Node- describe config.out | 8 +- 10 files changed, 296 insertions(+), 38 deletions(-) diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index a822453ea5af..3659a1743ae0 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -26,6 +26,10 @@ you start using them, you probably want to use `octez-evm-node check config next available nonce found in the `tx_queue`. It also works for transactions that have been already forwarded to the upstream node but not yet confirmed. (!!16829) +- `tx_queue` limits the number of + transaction one user can submit. This limits is for pending + transactions the node has seen. configurable with + `tx_per_addr_limit`. (!16903) ## Version 0.19 (2025-03-10) diff --git a/etherlink/bin_node/config/configuration.ml b/etherlink/bin_node/config/configuration.ml index 09222346c1c9..a59672b3b299 100644 --- a/etherlink/bin_node/config/configuration.ml +++ b/etherlink/bin_node/config/configuration.ml @@ -75,22 +75,45 @@ type tx_queue = { max_size : int; max_transaction_batch_length : int option; max_lifespan_s : int; + tx_per_addr_limit : int64; } let default_tx_queue = - {max_size = 1000; max_transaction_batch_length = None; max_lifespan_s = 2} + { + max_size = 1000; + max_transaction_batch_length = None; + max_lifespan_s = 2; + tx_per_addr_limit = 16L; + } let tx_queue_encoding = let open Data_encoding in conv - (fun {max_size; max_transaction_batch_length; max_lifespan_s} -> - (max_size, max_transaction_batch_length, max_lifespan_s)) - (fun (max_size, max_transaction_batch_length, max_lifespan_s) -> - {max_size; max_transaction_batch_length; max_lifespan_s}) - (obj3 - (req "max_size" int31) - (opt "max_transaction_batch_length" int31) - (dft "max_lifespan" int31 default_tx_queue.max_lifespan_s)) + (fun { + max_size; + max_transaction_batch_length; + max_lifespan_s; + tx_per_addr_limit; + } -> + (max_size, max_transaction_batch_length, max_lifespan_s, tx_per_addr_limit)) + (fun ( max_size, + max_transaction_batch_length, + max_lifespan_s, + tx_per_addr_limit ) -> + { + max_size; + max_transaction_batch_length; + max_lifespan_s; + tx_per_addr_limit; + }) + (obj4 + (dft "max_size" int31 default_tx_queue.max_size) + (dft + "max_transaction_batch_length" + (option int31) + default_tx_queue.max_transaction_batch_length) + (dft "max_lifespan" int31 default_tx_queue.max_lifespan_s) + (dft "tx_per_addr_limit" int64 default_tx_queue.tx_per_addr_limit)) let tx_queue_opt_encoding = let open Data_encoding in diff --git a/etherlink/bin_node/config/configuration.mli b/etherlink/bin_node/config/configuration.mli index 3a6a2dfd5677..d2f7443241b9 100644 --- a/etherlink/bin_node/config/configuration.mli +++ b/etherlink/bin_node/config/configuration.mli @@ -94,6 +94,7 @@ type tx_queue = { max_size : int; max_transaction_batch_length : int option; max_lifespan_s : int; + tx_per_addr_limit : int64; } (** Configuration settings for experimental features, with no backward diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 8dac9122fc85..9b77ee040bcc 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -692,10 +692,12 @@ let dispatch_request (rpc : Configuration.rpc) | Ok (next_nonce, transaction_object) -> ( let* tx_hash = if Configuration.is_tx_queue_enabled config then - let* () = + let* res = Tx_queue.inject ~next_nonce transaction_object tx_raw in - return (Ok transaction_object.hash) + match res with + | Ok () -> return (Ok transaction_object.hash) + | Error errs -> return (Error errs) else Tx_pool.add transaction_object txn in match tx_hash with @@ -933,10 +935,12 @@ let dispatch_private_request (rpc : Configuration.rpc) let* tx_hash = if Configuration.is_tx_queue_enabled config then let transaction = Ethereum_types.hex_encode_string raw_txn in - let* () = + let* res = Tx_queue.inject ~next_nonce transaction_object transaction in - return @@ Ok transaction_object.hash + match res with + | Ok () -> return (Ok transaction_object.hash) + | Error errs -> return (Error errs) else Tx_pool.add transaction_object raw_txn in match tx_hash with diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index edaf3472eb6a..563a1f8c6008 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -309,11 +309,39 @@ module Pending_transactions = struct !dropped end +module Transactions_per_addr = struct + module S = String.Hashtbl + + type t = int64 S.t + + let empty ~start_size = S.create start_size + + let remove s (Ethereum_types.Address (Hex h)) = S.remove s h + + let find s (Ethereum_types.Address (Hex h)) = S.find s h + + let add s (Ethereum_types.Address (Hex h)) i = S.replace s h i + + let decrement s address = + let current = find s address in + match current with + | Some i when i <= 1L -> remove s address + | Some i -> add s address (Int64.pred i) + | None -> () + + let increment s address = + let current = find s address in + match current with + | Some i -> add s address (Int64.succ i) + | None -> add s address 1L +end + type state = { evm_node_endpoint : Uri.t; mutable queue : queue_request Queue.t; pending : Pending_transactions.t; tx_object : Tx_object.t; + tx_per_address : Transactions_per_addr.t; address_nonce : Address_nonce.t; config : Configuration.tx_queue; keep_alive : bool; @@ -339,7 +367,7 @@ end module Request = struct type ('a, 'b) t = - | Inject : request -> (unit, tztrace) t + | Inject : request -> ((unit, string) result, tztrace) t | Confirm : {txn_hash : Ethereum_types.hash} -> (unit, tztrace) t | Find : { txn_hash : Ethereum_types.hash; @@ -499,6 +527,28 @@ let send_transactions_batch ~evm_node_endpoint ~keep_alive transactions = assert (M.is_empty missed_callbacks) ; return_unit +(** clear values and keep the allocated space *) +let clear + ({ + queue; + pending; + tx_object; + tx_per_address; + address_nonce; + evm_node_endpoint = _; + config = _; + keep_alive = _; + } : + state) = + (* full matching so when a new element is added to the state it's not + forgotten to clear it. *) + String.Hashtbl.clear pending ; + String.Hashtbl.clear tx_object ; + String.Hashtbl.clear tx_per_address ; + String.Hashtbl.clear address_nonce ; + Queue.clear queue ; + () + module Handlers = struct open Request @@ -512,7 +562,7 @@ module Handlers = struct let open Lwt_result_syntax in let state = Worker.state self in match request with - | Inject {next_nonce; payload; tx_object; callback} -> + | Inject {next_nonce; payload; tx_object; callback} -> ( let (Address (Hex addr)) = tx_object.from in let (Qty tx_nonce) = tx_object.nonce in let pending_callback (reason : pending_variant) = @@ -541,6 +591,7 @@ module Handlers = struct | Ok () -> return_unit | Error errs -> Tx_queue_events.callback_error errs in + Transactions_per_addr.decrement state.tx_per_address tx_object.from ; Tx_object.remove state.tx_object tx_object.hash ; callback (reason :> all_variant) in @@ -555,6 +606,9 @@ module Handlers = struct pending_callback ; return_ok_unit | `Refused -> + Transactions_per_addr.decrement + state.tx_per_address + tx_object.from ; Tx_object.remove state.tx_object tx_object.hash ; return @@ Address_nonce.remove @@ -569,17 +623,33 @@ module Handlers = struct in callback (reason :> all_variant) in - Tx_object.add state.tx_object tx_object ; - let Ethereum_types.(Qty next_nonce) = next_nonce in - let*? () = - Address_nonce.add - state.address_nonce - ~addr - ~next_nonce - ~nonce:tx_nonce + let nb_txs_in_queue = + Transactions_per_addr.find state.tx_per_address tx_object.from in - Queue.add {payload; queue_callback} state.queue ; - return_unit + (* Check number of txs by user in tx_queue. *) + match nb_txs_in_queue with + | Some i when i >= state.config.tx_per_addr_limit -> + let*! () = + Tx_pool_events.txs_per_user_threshold_reached + ~address:(Ethereum_types.Address.to_string tx_object.from) + in + return + (Error + "Limit of transaction for a user was reached. Transaction is \ + rejected.") + | Some _ | None -> + Transactions_per_addr.increment state.tx_per_address tx_object.from ; + Tx_object.add state.tx_object tx_object ; + let Ethereum_types.(Qty next_nonce) = next_nonce in + let*? () = + Address_nonce.add + state.address_nonce + ~addr + ~next_nonce + ~nonce:tx_nonce + in + Queue.add {payload; queue_callback} state.queue ; + return (Ok ())) | Confirm {txn_hash} -> ( match Pending_transactions.pop state.pending txn_hash with | Some {pending_callback; _} -> @@ -630,10 +700,7 @@ module Handlers = struct Lwt.async (fun () -> pending_callback `Dropped)) txns | Clear -> - (* clear values and keep the allocated space *) - String.Hashtbl.clear state.pending ; - String.Hashtbl.clear state.tx_object ; - Queue.clear state.queue ; + clear state ; let*! () = Tx_queue_events.cleared () in return_unit | Nonce {next_nonce; address = Address (Hex addr)} -> @@ -660,6 +727,9 @@ module Handlers = struct (* start with /10 and let it grow if necessary to not allocate too much at start. It's expected to have less different addresses than transactions. *) + tx_per_address = Transactions_per_addr.empty ~start_size:500; + (* Provide an arbitrary size for the initial hash tables, to + be revisited if needs be. *) config; keep_alive; } @@ -738,7 +808,10 @@ let inject ?(callback = fun _ -> Lwt_syntax.return_unit) ~next_nonce let open Lwt_syntax in let* () = Tx_queue_events.add_transaction tx_object.hash in let* worker = worker_promise in - push_request worker (Inject {next_nonce; payload = txn; tx_object; callback}) + Worker.Queue.push_request_and_wait + worker + (Inject {next_nonce; payload = txn; tx_object; callback}) + |> handle_request_error let confirm txn_hash = bind_worker @@ fun w -> push_request w (Confirm {txn_hash}) diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index 256fe5f73cb4..33ef943cffe0 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -66,7 +66,7 @@ val inject : next_nonce:Ethereum_types.quantity -> Ethereum_types.legacy_transaction_object -> Ethereum_types.hex -> - unit tzresult Lwt.t + (unit, string) result tzresult Lwt.t (** [confirm hash] is to be called by an external component to advertise a transaction has been included in a blueprint. *) diff --git a/etherlink/tezt/lib/evm_node.ml b/etherlink/tezt/lib/evm_node.ml index 507d9320dc9f..c82344bd00be 100644 --- a/etherlink/tezt/lib/evm_node.ml +++ b/etherlink/tezt/lib/evm_node.ml @@ -1291,7 +1291,11 @@ let optional_json_put ~name v f json = (name, JSON.annotate ~origin:"evm_node.config_patch" @@ value_json) json -type tx_queue_config = {max_size : int; max_lifespan : int} +type tx_queue_config = { + max_size : int; + max_lifespan : int; + tx_per_addr_limit : int; +} let patch_config_with_experimental_feature ?(drop_duplicate_when_injection = false) @@ -1320,11 +1324,12 @@ let patch_config_with_experimental_feature enable_tx_queue ~name:"enable_tx_queue" (match tx_queue_config with - | Some {max_size; max_lifespan} -> + | Some {max_size; max_lifespan; tx_per_addr_limit} -> `O [ ("max_size", `Float (Float.of_int max_size)); ("max_lifespan", `Float (Float.of_int max_lifespan)); + ("tx_per_addr_limit", `String (string_of_int tx_per_addr_limit)); ] | None -> `Bool true) |> optional_json_put diff --git a/etherlink/tezt/lib/evm_node.mli b/etherlink/tezt/lib/evm_node.mli index 885c7889dfca..eea08139e314 100644 --- a/etherlink/tezt/lib/evm_node.mli +++ b/etherlink/tezt/lib/evm_node.mli @@ -302,7 +302,11 @@ val spawn_init_config_minimal : type rpc_server = Resto | Dream -type tx_queue_config = {max_size : int; max_lifespan : int} +type tx_queue_config = { + max_size : int; + max_lifespan : int; + tx_per_addr_limit : int; +} (** [patch_config_with_experimental_feature ?drop_duplicate_when_injection ?next_wasm_runtime ?rpc_server diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index 7daf03cf6627..aa7bbc51cc05 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -10759,6 +10759,7 @@ let test_tx_queue = { max_size = 1000; max_lifespan = 100000 (* absurd value so no TX are dropped *); + tx_per_addr_limit = 100000; } () in @@ -11001,6 +11002,7 @@ let test_tx_queue_nonce = { max_size = 1000; max_lifespan = 100000 (* absurd value so no TX are dropped *); + tx_per_addr_limit = 100000; } () in @@ -11295,6 +11297,145 @@ let test_observer_init_from_snapshot = let*@ _block = Rpc.get_block_by_number ~block:"1" observer in unit +let test_tx_queue_limit = + register_all + ~tags:["observer"; "tx_queue"; "limit"] + ~time_between_blocks:Nothing + ~kernels:[Latest] (* node only test *) + ~use_threshold_encryption:Register_without_feature + ~use_dal:Register_without_feature + ~websockets:false + ~title: + "Submits transactions to an observer with a tx queue and make sure its \ + limit are respected." + @@ fun {sequencer; observer; _} _protocol -> + let* () = + let*@ _ = produce_block sequencer in + unit + and* () = Evm_node.wait_for_blueprint_applied observer 1 in + let* () = Evm_node.terminate observer in + + let max_number_of_txs = 10 in + let* () = + Evm_node.Config_file.update observer + @@ Evm_node.patch_config_with_experimental_feature + ~enable_tx_queue:true + ~tx_queue_config: + { + max_size = 1000; + max_lifespan = 100000 (* absurd value so no TX are dropped *); + tx_per_addr_limit = max_number_of_txs; + } + () + in + let* () = Evm_node.run observer in + + (* helper to craft a tx with given nonce. *) + let raw_tx ~nonce = + Cast.craft_tx + ~source_private_key:Eth_account.bootstrap_accounts.(0).private_key + ~chain_id:1337 + ~nonce + ~gas_price:1_000_000_000 + ~gas:23_300 + ~value:Wei.one + ~address:Eth_account.bootstrap_accounts.(1).address + () + in + + Log.info + "send %d txs, all are successfully added to the queue" + max_number_of_txs ; + let* () = + Lwt_list.iter_p + (fun i -> + let* raw_tx = raw_tx ~nonce:i in + let*@ _hash = Rpc.send_raw_transaction ~raw_tx observer in + unit) + (range 0 (max_number_of_txs - 1)) + in + + Log.info "Then send an additional txs, that fails" ; + let*@? _error = + let* raw_tx = raw_tx ~nonce:max_number_of_txs in + Rpc.send_raw_transaction ~raw_tx observer + in + + Log.info "Wait for all txs to be included by the sequencer" ; + let wait_for_all_tx_process ~nb_txs ~name ~waiter = + let rec aux total = + if total = nb_txs then ( + Log.info "All (%d) txs processed: \"%s\"." total name ; + unit) + else if total > nb_txs then + Test.fail + "more transaction where processed (%s) than expected, impossible" + name + else + let* nb = waiter () in + let total = total + nb in + Log.debug "Processed %d of txs. (%s)" total name ; + aux total + in + aux 0 + in + + (* Checks that all txs were included in a block by the sequencer *) + let sequencer_wait_tx_included () = + let waiter () = + let* _hash = Evm_node.wait_for_block_producer_tx_injected sequencer in + return 1 + in + wait_for_all_tx_process + ~nb_txs:max_number_of_txs + ~name:"tx included by sequencer" + ~waiter + in + + (* produce enough blocks to include all txs submited *) + let produce_block_until_all_included () = + let res = ref None in + let _p = + let* () = sequencer_wait_tx_included () in + res := Some () ; + unit + in + let result_f () = return !res in + bake_until + ~__LOC__ + ~bake:(fun () -> + let*@ _ = produce_block sequencer in + unit) + ~result_f + () + in + let observer_wait_tx_confirmed () = + let waiter () = + let* _ = Evm_node.wait_for_tx_queue_transaction_confirmed observer in + return 1 + in + wait_for_all_tx_process + ~nb_txs:max_number_of_txs + ~name:"tx confirmed in observer" + ~waiter + in + + let* () = produce_block_until_all_included () + and* () = observer_wait_tx_confirmed () in + + Log.info + "Resend %d txs, all are successfully added to the queue" + max_number_of_txs ; + let* () = + Lwt_list.iter_p + (fun i -> + let* raw_tx = raw_tx ~nonce:i in + let*@ _hash = Rpc.send_raw_transaction ~raw_tx observer in + unit) + (range max_number_of_txs ((2 * max_number_of_txs) - 1)) + in + unit + let protocols = Protocol.all let () = @@ -11446,4 +11587,5 @@ let () = test_tx_queue_clear [Alpha] ; test_spawn_rpc protocols ; test_observer_init_from_snapshot protocols ; - test_tx_queue_nonce [Alpha] + test_tx_queue_nonce [Alpha] ; + test_tx_queue_limit [Alpha] diff --git a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out index 89ea48088d04..10db62cb7571 100644 --- a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out +++ b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out @@ -180,9 +180,11 @@ /* Replace the observer tx pool by a tx queue */ /* tx queue configuration */ { /* Some */ - "max_size": integer ∈ [-2^30, 2^30], - "max_transaction_batch_length"?: integer ∈ [-2^30, 2^30], - "max_lifespan"?: integer ∈ [-2^30, 2^30] } + "max_size"?: integer ∈ [-2^30, 2^30], + "max_transaction_batch_length"?: + integer ∈ [-2^30, 2^30] /* Some */ || null /* None */, + "max_lifespan"?: integer ∈ [-2^30, 2^30], + "tx_per_addr_limit"?: $int64 } || null /* None */ || boolean -- GitLab From bf9697824ddb48b39ab8577e6f8f8e9ba3c3010e Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Fri, 7 Mar 2025 12:58:08 +0100 Subject: [PATCH 2/3] evm/node: remove evm_node_endpoint from tx_queue state --- etherlink/CHANGES_NODE.md | 9 +++-- etherlink/bin_node/lib_dev/observer.ml | 8 +--- etherlink/bin_node/lib_dev/tx_queue.ml | 53 +++++++++++-------------- etherlink/bin_node/lib_dev/tx_queue.mli | 15 +++---- 4 files changed, 39 insertions(+), 46 deletions(-) diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index 3659a1743ae0..1b974fe3638c 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -26,10 +26,11 @@ you start using them, you probably want to use `octez-evm-node check config next available nonce found in the `tx_queue`. It also works for transactions that have been already forwarded to the upstream node but not yet confirmed. (!!16829) -- `tx_queue` limits the number of - transaction one user can submit. This limits is for pending - transactions the node has seen. configurable with - `tx_per_addr_limit`. (!16903) +- `tx_queue` limits the number of transaction one user can submit. + This limits is for pending transactions the node has seen. + configurable with `tx_per_addr_limit`. (!16903) +- An sequencer EVM node can uses the tx_queue to speed the inclusion + of transaction. (!17134) ## Version 0.19 (2025-03-10) diff --git a/etherlink/bin_node/lib_dev/observer.ml b/etherlink/bin_node/lib_dev/observer.ml index 485fe72977d7..05b181c9887a 100644 --- a/etherlink/bin_node/lib_dev/observer.ml +++ b/etherlink/bin_node/lib_dev/observer.ml @@ -154,11 +154,7 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync let* () = match config.experimental_features.enable_tx_queue with | Some tx_queue_config -> - Tx_queue.start - ~evm_node_endpoint - ~config:tx_queue_config - ~keep_alive:config.keep_alive - () + Tx_queue.start ~config:tx_queue_config ~keep_alive:config.keep_alive () | None -> let mode = if config.finalized_view then @@ -289,7 +285,7 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync Drift_monitor.run ~evm_node_endpoint Evm_context.next_blueprint_number and* () = if Configuration.is_tx_queue_enabled config then - Tx_queue.beacon ~tick_interval:0.05 + Tx_queue.beacon ~evm_node_endpoint ~tick_interval:0.05 else return_unit in return_unit diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 563a1f8c6008..24b16633c347 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -8,11 +8,7 @@ open Tezos_workers -type parameters = { - evm_node_endpoint : Uri.t; - config : Configuration.tx_queue; - keep_alive : bool; -} +type parameters = {config : Configuration.tx_queue; keep_alive : bool} type queue_variant = [`Accepted | `Refused] @@ -337,7 +333,6 @@ module Transactions_per_addr = struct end type state = { - evm_node_endpoint : Uri.t; mutable queue : queue_request Queue.t; pending : Pending_transactions.t; tx_object : Tx_object.t; @@ -378,7 +373,7 @@ module Request = struct address : Ethereum_types.address; } -> (Ethereum_types.quantity, tztrace) t - | Tick : (unit, tztrace) t + | Tick : {evm_node_endpoint : Uri.t} -> (unit, tztrace) t | Clear : (unit, tztrace) t type view = View : _ t -> view @@ -411,8 +406,13 @@ module Request = struct case Json_only ~title:"Tick" - (obj1 (req "request" (constant "tick"))) - (function View Tick -> Some () | _ -> None) + (obj2 + (req "request" (constant "tick")) + (req "evm_node_endpoint" string)) + (function + | View (Tick {evm_node_endpoint}) -> + Some ((), Uri.to_string evm_node_endpoint) + | _ -> None) (fun _ -> assert false); case Json_only @@ -449,7 +449,7 @@ module Request = struct | Confirm {txn_hash = Hash (Hex txn_hash)} -> fprintf fmt "Confirm %s" txn_hash | Find {txn_hash = Hash (Hex txn_hash)} -> fprintf fmt "Find %s" txn_hash - | Tick -> fprintf fmt "Tick" + | Tick _ -> fprintf fmt "Tick" | Clear -> fprintf fmt "Clear" | Nonce {next_nonce = _; address = Address (Hex address)} -> fprintf fmt "Nonce %s" address @@ -535,7 +535,6 @@ let clear tx_object; tx_per_address; address_nonce; - evm_node_endpoint = _; config = _; keep_alive = _; } : @@ -657,7 +656,7 @@ module Handlers = struct return_unit | None -> return_unit) | Find {txn_hash} -> return @@ Tx_object.find state.tx_object txn_hash - | Tick -> + | Tick {evm_node_endpoint} -> let all_transactions = Queue.to_seq state.queue in let* transactions_to_inject, remaining_transactions = match state.config.max_transaction_batch_length with @@ -686,7 +685,7 @@ module Handlers = struct let+ () = send_transactions_batch ~keep_alive:state.keep_alive - ~evm_node_endpoint:state.evm_node_endpoint + ~evm_node_endpoint transactions_to_inject in @@ -712,12 +711,10 @@ module Handlers = struct type launch_error = tztrace - let on_launch _self () ({evm_node_endpoint; config; keep_alive} : parameters) - = + let on_launch _self () ({config; keep_alive} : parameters) = let open Lwt_result_syntax in return { - evm_node_endpoint; queue = Queue.create (); pending = Pending_transactions.empty ~start_size:(config.max_size / 4); (* start with /4 and let it grow if necessary to not allocate @@ -795,13 +792,17 @@ let push_request worker request = let*! (pushed : bool) = Worker.Queue.push_request worker request in if not pushed then tzfail Tx_queue_is_closed else return_unit -let tick () = bind_worker @@ fun w -> push_request w Tick +let tick ~evm_node_endpoint = + bind_worker @@ fun w -> push_request w (Tick {evm_node_endpoint}) -let rec beacon ~tick_interval = +let beacon ~evm_node_endpoint ~tick_interval = let open Lwt_result_syntax in - let* () = tick () in - let*! () = Lwt_unix.sleep tick_interval in - beacon ~tick_interval + let rec loop () = + let* () = tick ~evm_node_endpoint in + let*! () = Lwt_unix.sleep tick_interval in + loop () + in + loop () let inject ?(callback = fun _ -> Lwt_syntax.return_unit) ~next_nonce (tx_object : Ethereum_types.legacy_transaction_object) txn = @@ -816,15 +817,9 @@ let inject ?(callback = fun _ -> Lwt_syntax.return_unit) ~next_nonce let confirm txn_hash = bind_worker @@ fun w -> push_request w (Confirm {txn_hash}) -let start ~config ~evm_node_endpoint ~keep_alive () = +let start ~config ~keep_alive () = let open Lwt_result_syntax in - let* worker = - Worker.launch - table - () - {evm_node_endpoint; config; keep_alive} - (module Handlers) - in + let* worker = Worker.launch table () {config; keep_alive} (module Handlers) in Lwt.wakeup worker_waker worker ; let*! () = Tx_queue_events.is_ready () in return_unit diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index 33ef943cffe0..273aa3445da2 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -24,12 +24,11 @@ [callback] is called with [`Dropped].}} *) type callback = [`Accepted | `Confirmed | `Dropped | `Refused] -> unit Lwt.t -(** [start ~evm_node_endpoint ~max_transaction_batch_length ()] starts - the worker, meaning it is possible to call {!inject}, {!confirm} - and {!beacon}. *) +(** [start ~config ~max_transaction_batch_length ()] starts the + worker, meaning it is possible to call {!inject}, {!confirm} and + {!beacon}. *) val start : config:Configuration.tx_queue -> - evm_node_endpoint:Uri.t -> keep_alive:bool -> unit -> unit tzresult Lwt.t @@ -72,9 +71,11 @@ val inject : transaction has been included in a blueprint. *) val confirm : Ethereum_types.hash -> unit tzresult Lwt.t -(** [beacon ~tick_interval] is a never fulfilled promise which triggers a tick - in the [Tx_queue] every [tick_interval] seconds. *) -val beacon : tick_interval:float -> unit tzresult Lwt.t +(** [beacon ~evm_node_endpoint ~tick_interval] is a never fulfilled + promise which triggers a tick in the [Tx_queue] every + [tick_interval] seconds. *) +val beacon : + evm_node_endpoint:Uri.t -> tick_interval:float -> unit tzresult Lwt.t (** [find hash] returns the transaction associated with that hash if it's found in the tx_queue. *) -- GitLab From ffb721fd51e1b29f1efd03de2d53fe118302ef8f Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Thu, 6 Mar 2025 09:24:22 +0100 Subject: [PATCH 3/3] evm/node: add locking logic to tx_queue --- etherlink/CHANGES_NODE.md | 2 +- etherlink/bin_node/lib_dev/tx_queue.ml | 47 +++++++++++++++++++++++++ etherlink/bin_node/lib_dev/tx_queue.mli | 12 +++++++ 3 files changed, 60 insertions(+), 1 deletion(-) diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index 1b974fe3638c..27747f1fe215 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -30,7 +30,7 @@ you start using them, you probably want to use `octez-evm-node check config This limits is for pending transactions the node has seen. configurable with `tx_per_addr_limit`. (!16903) - An sequencer EVM node can uses the tx_queue to speed the inclusion - of transaction. (!17134) + of transaction. (!17134 !17100) ## Version 0.19 (2025-03-10) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 24b16633c347..f942d5b16af5 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -340,6 +340,7 @@ type state = { address_nonce : Address_nonce.t; config : Configuration.tx_queue; keep_alive : bool; + mutable locked : bool; } module Types = struct @@ -375,6 +376,9 @@ module Request = struct -> (Ethereum_types.quantity, tztrace) t | Tick : {evm_node_endpoint : Uri.t} -> (unit, tztrace) t | Clear : (unit, tztrace) t + | Lock_transactions : (unit, tztrace) t + | Unlock_transactions : (unit, tztrace) t + | Is_locked : (bool, tztrace) t type view = View : _ t -> view @@ -440,6 +444,24 @@ module Request = struct Some ((), next_nonce, address) | _ -> None) (fun _ -> assert false); + case + Json_only + ~title:"Lock_transactions" + (obj1 (req "request" (constant "lock_transactions"))) + (function View Lock_transactions -> Some () | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Unlock_transactions" + (obj1 (req "request" (constant "unlock_transactions"))) + (function View Unlock_transactions -> Some () | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Is_locked" + (obj1 (req "request" (constant "is_locked"))) + (function View Is_locked -> Some () | _ -> None) + (fun _ -> assert false); ] let pp fmt (View r) = @@ -453,6 +475,9 @@ module Request = struct | Clear -> fprintf fmt "Clear" | Nonce {next_nonce = _; address = Address (Hex address)} -> fprintf fmt "Nonce %s" address + | Lock_transactions -> Format.fprintf fmt "Locking the transactions" + | Unlock_transactions -> Format.fprintf fmt "Unlocking the transactions" + | Is_locked -> Format.fprintf fmt "Checking if the tx queue is locked" end module Worker = Worker.MakeSingle (Name) (Request) (Types) @@ -537,6 +562,7 @@ let clear address_nonce; config = _; keep_alive = _; + locked = _; } : state) = (* full matching so when a new element is added to the state it's not @@ -548,6 +574,12 @@ let clear Queue.clear queue ; () +let lock_transactions state = state.locked <- true + +let unlock_transactions state = state.locked <- false + +let is_locked state = state.locked + module Handlers = struct open Request @@ -708,6 +740,9 @@ module Handlers = struct Address_nonce.next_gap state.address_nonce ~addr ~next_nonce in return @@ Ethereum_types.Qty next_gap + | Lock_transactions -> return (lock_transactions state) + | Unlock_transactions -> return (unlock_transactions state) + | Is_locked -> return (is_locked state) type launch_error = tztrace @@ -729,6 +764,7 @@ module Handlers = struct be revisited if needs be. *) config; keep_alive; + locked = false; } let on_error (type a b) _self _status_request (_r : (a, b) Request.t) @@ -847,6 +883,17 @@ let shutdown () = let*! () = Worker.shutdown w in return_unit +let lock_transactions () = + bind_worker @@ fun w -> push_request w Lock_transactions + +let unlock_transactions () = + bind_worker @@ fun w -> push_request w Unlock_transactions + +let is_locked () = + let open Lwt_result_syntax in + let*? worker = Lazy.force worker in + Worker.Queue.push_request_and_wait worker Is_locked |> handle_request_error + module Internal_for_tests = struct module Nonce_bitset = Nonce_bitset module Address_nonce = Address_nonce diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index 273aa3445da2..8c06fbbf044b 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -91,6 +91,18 @@ val nonce : Ethereum_types.address -> Ethereum_types.quantity tzresult Lwt.t +(** [lock_transactions] locks the transactions in the queue, new + transactions can be added but nothing can be retrieved with + {!pop_transactions}. *) +val lock_transactions : unit -> unit tzresult Lwt.t + +(** [unlock_transactions] unlocks the transactions if it was locked by + {!lock_transactions}. *) +val unlock_transactions : unit -> unit tzresult Lwt.t + +(** [is_locked] checks if the queue is locked. *) +val is_locked : unit -> bool tzresult Lwt.t + (**/*) module Internal_for_tests : sig -- GitLab