diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index a822453ea5af48bbd6fb28a09645f315fc49613f..3659a1743ae0a01c7048db7b2562ea1dd44738b9 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -26,6 +26,10 @@ you start using them, you probably want to use `octez-evm-node check config next available nonce found in the `tx_queue`. It also works for transactions that have been already forwarded to the upstream node but not yet confirmed. (!!16829) +- `tx_queue` limits the number of + transaction one user can submit. This limits is for pending + transactions the node has seen. configurable with + `tx_per_addr_limit`. (!16903) ## Version 0.19 (2025-03-10) diff --git a/etherlink/bin_node/config/configuration.ml b/etherlink/bin_node/config/configuration.ml index 09222346c1c975dd1f252ffe9959453f04f5246a..a59672b3b299c432f3c0860a8a76da4dd1a2d64b 100644 --- a/etherlink/bin_node/config/configuration.ml +++ b/etherlink/bin_node/config/configuration.ml @@ -75,22 +75,45 @@ type tx_queue = { max_size : int; max_transaction_batch_length : int option; max_lifespan_s : int; + tx_per_addr_limit : int64; } let default_tx_queue = - {max_size = 1000; max_transaction_batch_length = None; max_lifespan_s = 2} + { + max_size = 1000; + max_transaction_batch_length = None; + max_lifespan_s = 2; + tx_per_addr_limit = 16L; + } let tx_queue_encoding = let open Data_encoding in conv - (fun {max_size; max_transaction_batch_length; max_lifespan_s} -> - (max_size, max_transaction_batch_length, max_lifespan_s)) - (fun (max_size, max_transaction_batch_length, max_lifespan_s) -> - {max_size; max_transaction_batch_length; max_lifespan_s}) - (obj3 - (req "max_size" int31) - (opt "max_transaction_batch_length" int31) - (dft "max_lifespan" int31 default_tx_queue.max_lifespan_s)) + (fun { + max_size; + max_transaction_batch_length; + max_lifespan_s; + tx_per_addr_limit; + } -> + (max_size, max_transaction_batch_length, max_lifespan_s, tx_per_addr_limit)) + (fun ( max_size, + max_transaction_batch_length, + max_lifespan_s, + tx_per_addr_limit ) -> + { + max_size; + max_transaction_batch_length; + max_lifespan_s; + tx_per_addr_limit; + }) + (obj4 + (dft "max_size" int31 default_tx_queue.max_size) + (dft + "max_transaction_batch_length" + (option int31) + default_tx_queue.max_transaction_batch_length) + (dft "max_lifespan" int31 default_tx_queue.max_lifespan_s) + (dft "tx_per_addr_limit" int64 default_tx_queue.tx_per_addr_limit)) let tx_queue_opt_encoding = let open Data_encoding in diff --git a/etherlink/bin_node/config/configuration.mli b/etherlink/bin_node/config/configuration.mli index 3a6a2dfd5677e70ecc50f6097a989d2793650378..d2f7443241b910c77f10ad6d9d0693b63bb42d92 100644 --- a/etherlink/bin_node/config/configuration.mli +++ b/etherlink/bin_node/config/configuration.mli @@ -94,6 +94,7 @@ type tx_queue = { max_size : int; max_transaction_batch_length : int option; max_lifespan_s : int; + tx_per_addr_limit : int64; } (** Configuration settings for experimental features, with no backward diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 8dac9122fc85dda67598b7a6e11f82201074aa43..9b77ee040bccd58df9cce45f844134707a39222e 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -692,10 +692,12 @@ let dispatch_request (rpc : Configuration.rpc) | Ok (next_nonce, transaction_object) -> ( let* tx_hash = if Configuration.is_tx_queue_enabled config then - let* () = + let* res = Tx_queue.inject ~next_nonce transaction_object tx_raw in - return (Ok transaction_object.hash) + match res with + | Ok () -> return (Ok transaction_object.hash) + | Error errs -> return (Error errs) else Tx_pool.add transaction_object txn in match tx_hash with @@ -933,10 +935,12 @@ let dispatch_private_request (rpc : Configuration.rpc) let* tx_hash = if Configuration.is_tx_queue_enabled config then let transaction = Ethereum_types.hex_encode_string raw_txn in - let* () = + let* res = Tx_queue.inject ~next_nonce transaction_object transaction in - return @@ Ok transaction_object.hash + match res with + | Ok () -> return (Ok transaction_object.hash) + | Error errs -> return (Error errs) else Tx_pool.add transaction_object raw_txn in match tx_hash with diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index edaf3472eb6a09f498b86384ae69328aba048e50..563a1f8c6008f2042a5137719d42a6f7ee6672b1 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -309,11 +309,39 @@ module Pending_transactions = struct !dropped end +module Transactions_per_addr = struct + module S = String.Hashtbl + + type t = int64 S.t + + let empty ~start_size = S.create start_size + + let remove s (Ethereum_types.Address (Hex h)) = S.remove s h + + let find s (Ethereum_types.Address (Hex h)) = S.find s h + + let add s (Ethereum_types.Address (Hex h)) i = S.replace s h i + + let decrement s address = + let current = find s address in + match current with + | Some i when i <= 1L -> remove s address + | Some i -> add s address (Int64.pred i) + | None -> () + + let increment s address = + let current = find s address in + match current with + | Some i -> add s address (Int64.succ i) + | None -> add s address 1L +end + type state = { evm_node_endpoint : Uri.t; mutable queue : queue_request Queue.t; pending : Pending_transactions.t; tx_object : Tx_object.t; + tx_per_address : Transactions_per_addr.t; address_nonce : Address_nonce.t; config : Configuration.tx_queue; keep_alive : bool; @@ -339,7 +367,7 @@ end module Request = struct type ('a, 'b) t = - | Inject : request -> (unit, tztrace) t + | Inject : request -> ((unit, string) result, tztrace) t | Confirm : {txn_hash : Ethereum_types.hash} -> (unit, tztrace) t | Find : { txn_hash : Ethereum_types.hash; @@ -499,6 +527,28 @@ let send_transactions_batch ~evm_node_endpoint ~keep_alive transactions = assert (M.is_empty missed_callbacks) ; return_unit +(** clear values and keep the allocated space *) +let clear + ({ + queue; + pending; + tx_object; + tx_per_address; + address_nonce; + evm_node_endpoint = _; + config = _; + keep_alive = _; + } : + state) = + (* full matching so when a new element is added to the state it's not + forgotten to clear it. *) + String.Hashtbl.clear pending ; + String.Hashtbl.clear tx_object ; + String.Hashtbl.clear tx_per_address ; + String.Hashtbl.clear address_nonce ; + Queue.clear queue ; + () + module Handlers = struct open Request @@ -512,7 +562,7 @@ module Handlers = struct let open Lwt_result_syntax in let state = Worker.state self in match request with - | Inject {next_nonce; payload; tx_object; callback} -> + | Inject {next_nonce; payload; tx_object; callback} -> ( let (Address (Hex addr)) = tx_object.from in let (Qty tx_nonce) = tx_object.nonce in let pending_callback (reason : pending_variant) = @@ -541,6 +591,7 @@ module Handlers = struct | Ok () -> return_unit | Error errs -> Tx_queue_events.callback_error errs in + Transactions_per_addr.decrement state.tx_per_address tx_object.from ; Tx_object.remove state.tx_object tx_object.hash ; callback (reason :> all_variant) in @@ -555,6 +606,9 @@ module Handlers = struct pending_callback ; return_ok_unit | `Refused -> + Transactions_per_addr.decrement + state.tx_per_address + tx_object.from ; Tx_object.remove state.tx_object tx_object.hash ; return @@ Address_nonce.remove @@ -569,17 +623,33 @@ module Handlers = struct in callback (reason :> all_variant) in - Tx_object.add state.tx_object tx_object ; - let Ethereum_types.(Qty next_nonce) = next_nonce in - let*? () = - Address_nonce.add - state.address_nonce - ~addr - ~next_nonce - ~nonce:tx_nonce + let nb_txs_in_queue = + Transactions_per_addr.find state.tx_per_address tx_object.from in - Queue.add {payload; queue_callback} state.queue ; - return_unit + (* Check number of txs by user in tx_queue. *) + match nb_txs_in_queue with + | Some i when i >= state.config.tx_per_addr_limit -> + let*! () = + Tx_pool_events.txs_per_user_threshold_reached + ~address:(Ethereum_types.Address.to_string tx_object.from) + in + return + (Error + "Limit of transaction for a user was reached. Transaction is \ + rejected.") + | Some _ | None -> + Transactions_per_addr.increment state.tx_per_address tx_object.from ; + Tx_object.add state.tx_object tx_object ; + let Ethereum_types.(Qty next_nonce) = next_nonce in + let*? () = + Address_nonce.add + state.address_nonce + ~addr + ~next_nonce + ~nonce:tx_nonce + in + Queue.add {payload; queue_callback} state.queue ; + return (Ok ())) | Confirm {txn_hash} -> ( match Pending_transactions.pop state.pending txn_hash with | Some {pending_callback; _} -> @@ -630,10 +700,7 @@ module Handlers = struct Lwt.async (fun () -> pending_callback `Dropped)) txns | Clear -> - (* clear values and keep the allocated space *) - String.Hashtbl.clear state.pending ; - String.Hashtbl.clear state.tx_object ; - Queue.clear state.queue ; + clear state ; let*! () = Tx_queue_events.cleared () in return_unit | Nonce {next_nonce; address = Address (Hex addr)} -> @@ -660,6 +727,9 @@ module Handlers = struct (* start with /10 and let it grow if necessary to not allocate too much at start. It's expected to have less different addresses than transactions. *) + tx_per_address = Transactions_per_addr.empty ~start_size:500; + (* Provide an arbitrary size for the initial hash tables, to + be revisited if needs be. *) config; keep_alive; } @@ -738,7 +808,10 @@ let inject ?(callback = fun _ -> Lwt_syntax.return_unit) ~next_nonce let open Lwt_syntax in let* () = Tx_queue_events.add_transaction tx_object.hash in let* worker = worker_promise in - push_request worker (Inject {next_nonce; payload = txn; tx_object; callback}) + Worker.Queue.push_request_and_wait + worker + (Inject {next_nonce; payload = txn; tx_object; callback}) + |> handle_request_error let confirm txn_hash = bind_worker @@ fun w -> push_request w (Confirm {txn_hash}) diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index 256fe5f73cb4835af95ae5a7c41d432a14b904ce..33ef943cffe062e52a1a9ab40ef2710f9be91c14 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -66,7 +66,7 @@ val inject : next_nonce:Ethereum_types.quantity -> Ethereum_types.legacy_transaction_object -> Ethereum_types.hex -> - unit tzresult Lwt.t + (unit, string) result tzresult Lwt.t (** [confirm hash] is to be called by an external component to advertise a transaction has been included in a blueprint. *) diff --git a/etherlink/tezt/lib/evm_node.ml b/etherlink/tezt/lib/evm_node.ml index 507d9320dc9f5af71c4cd1c2bc689294d5977c27..c82344bd00be50048a7087a5182dbd5b2bfd6a38 100644 --- a/etherlink/tezt/lib/evm_node.ml +++ b/etherlink/tezt/lib/evm_node.ml @@ -1291,7 +1291,11 @@ let optional_json_put ~name v f json = (name, JSON.annotate ~origin:"evm_node.config_patch" @@ value_json) json -type tx_queue_config = {max_size : int; max_lifespan : int} +type tx_queue_config = { + max_size : int; + max_lifespan : int; + tx_per_addr_limit : int; +} let patch_config_with_experimental_feature ?(drop_duplicate_when_injection = false) @@ -1320,11 +1324,12 @@ let patch_config_with_experimental_feature enable_tx_queue ~name:"enable_tx_queue" (match tx_queue_config with - | Some {max_size; max_lifespan} -> + | Some {max_size; max_lifespan; tx_per_addr_limit} -> `O [ ("max_size", `Float (Float.of_int max_size)); ("max_lifespan", `Float (Float.of_int max_lifespan)); + ("tx_per_addr_limit", `String (string_of_int tx_per_addr_limit)); ] | None -> `Bool true) |> optional_json_put diff --git a/etherlink/tezt/lib/evm_node.mli b/etherlink/tezt/lib/evm_node.mli index 885c7889dfca55ed55cf53592a5d0c1e0c53ac67..eea08139e3141ed212c1d7c4dfda8565070046d8 100644 --- a/etherlink/tezt/lib/evm_node.mli +++ b/etherlink/tezt/lib/evm_node.mli @@ -302,7 +302,11 @@ val spawn_init_config_minimal : type rpc_server = Resto | Dream -type tx_queue_config = {max_size : int; max_lifespan : int} +type tx_queue_config = { + max_size : int; + max_lifespan : int; + tx_per_addr_limit : int; +} (** [patch_config_with_experimental_feature ?drop_duplicate_when_injection ?next_wasm_runtime ?rpc_server diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index 7daf03cf662741f75eb313b5cd1dbe43254a65e7..aa7bbc51cc050e1e038fccc4330f4704f002ae21 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -10759,6 +10759,7 @@ let test_tx_queue = { max_size = 1000; max_lifespan = 100000 (* absurd value so no TX are dropped *); + tx_per_addr_limit = 100000; } () in @@ -11001,6 +11002,7 @@ let test_tx_queue_nonce = { max_size = 1000; max_lifespan = 100000 (* absurd value so no TX are dropped *); + tx_per_addr_limit = 100000; } () in @@ -11295,6 +11297,145 @@ let test_observer_init_from_snapshot = let*@ _block = Rpc.get_block_by_number ~block:"1" observer in unit +let test_tx_queue_limit = + register_all + ~tags:["observer"; "tx_queue"; "limit"] + ~time_between_blocks:Nothing + ~kernels:[Latest] (* node only test *) + ~use_threshold_encryption:Register_without_feature + ~use_dal:Register_without_feature + ~websockets:false + ~title: + "Submits transactions to an observer with a tx queue and make sure its \ + limit are respected." + @@ fun {sequencer; observer; _} _protocol -> + let* () = + let*@ _ = produce_block sequencer in + unit + and* () = Evm_node.wait_for_blueprint_applied observer 1 in + let* () = Evm_node.terminate observer in + + let max_number_of_txs = 10 in + let* () = + Evm_node.Config_file.update observer + @@ Evm_node.patch_config_with_experimental_feature + ~enable_tx_queue:true + ~tx_queue_config: + { + max_size = 1000; + max_lifespan = 100000 (* absurd value so no TX are dropped *); + tx_per_addr_limit = max_number_of_txs; + } + () + in + let* () = Evm_node.run observer in + + (* helper to craft a tx with given nonce. *) + let raw_tx ~nonce = + Cast.craft_tx + ~source_private_key:Eth_account.bootstrap_accounts.(0).private_key + ~chain_id:1337 + ~nonce + ~gas_price:1_000_000_000 + ~gas:23_300 + ~value:Wei.one + ~address:Eth_account.bootstrap_accounts.(1).address + () + in + + Log.info + "send %d txs, all are successfully added to the queue" + max_number_of_txs ; + let* () = + Lwt_list.iter_p + (fun i -> + let* raw_tx = raw_tx ~nonce:i in + let*@ _hash = Rpc.send_raw_transaction ~raw_tx observer in + unit) + (range 0 (max_number_of_txs - 1)) + in + + Log.info "Then send an additional txs, that fails" ; + let*@? _error = + let* raw_tx = raw_tx ~nonce:max_number_of_txs in + Rpc.send_raw_transaction ~raw_tx observer + in + + Log.info "Wait for all txs to be included by the sequencer" ; + let wait_for_all_tx_process ~nb_txs ~name ~waiter = + let rec aux total = + if total = nb_txs then ( + Log.info "All (%d) txs processed: \"%s\"." total name ; + unit) + else if total > nb_txs then + Test.fail + "more transaction where processed (%s) than expected, impossible" + name + else + let* nb = waiter () in + let total = total + nb in + Log.debug "Processed %d of txs. (%s)" total name ; + aux total + in + aux 0 + in + + (* Checks that all txs were included in a block by the sequencer *) + let sequencer_wait_tx_included () = + let waiter () = + let* _hash = Evm_node.wait_for_block_producer_tx_injected sequencer in + return 1 + in + wait_for_all_tx_process + ~nb_txs:max_number_of_txs + ~name:"tx included by sequencer" + ~waiter + in + + (* produce enough blocks to include all txs submited *) + let produce_block_until_all_included () = + let res = ref None in + let _p = + let* () = sequencer_wait_tx_included () in + res := Some () ; + unit + in + let result_f () = return !res in + bake_until + ~__LOC__ + ~bake:(fun () -> + let*@ _ = produce_block sequencer in + unit) + ~result_f + () + in + let observer_wait_tx_confirmed () = + let waiter () = + let* _ = Evm_node.wait_for_tx_queue_transaction_confirmed observer in + return 1 + in + wait_for_all_tx_process + ~nb_txs:max_number_of_txs + ~name:"tx confirmed in observer" + ~waiter + in + + let* () = produce_block_until_all_included () + and* () = observer_wait_tx_confirmed () in + + Log.info + "Resend %d txs, all are successfully added to the queue" + max_number_of_txs ; + let* () = + Lwt_list.iter_p + (fun i -> + let* raw_tx = raw_tx ~nonce:i in + let*@ _hash = Rpc.send_raw_transaction ~raw_tx observer in + unit) + (range max_number_of_txs ((2 * max_number_of_txs) - 1)) + in + unit + let protocols = Protocol.all let () = @@ -11446,4 +11587,5 @@ let () = test_tx_queue_clear [Alpha] ; test_spawn_rpc protocols ; test_observer_init_from_snapshot protocols ; - test_tx_queue_nonce [Alpha] + test_tx_queue_nonce [Alpha] ; + test_tx_queue_limit [Alpha] diff --git a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out index 89ea48088d0467356857219d98c6c229938d6f6d..10db62cb75713bd8b53512c095e7c07be4f7c172 100644 --- a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out +++ b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out @@ -180,9 +180,11 @@ /* Replace the observer tx pool by a tx queue */ /* tx queue configuration */ { /* Some */ - "max_size": integer ∈ [-2^30, 2^30], - "max_transaction_batch_length"?: integer ∈ [-2^30, 2^30], - "max_lifespan"?: integer ∈ [-2^30, 2^30] } + "max_size"?: integer ∈ [-2^30, 2^30], + "max_transaction_batch_length"?: + integer ∈ [-2^30, 2^30] /* Some */ || null /* None */, + "max_lifespan"?: integer ∈ [-2^30, 2^30], + "tx_per_addr_limit"?: $int64 } || null /* None */ || boolean