From 0b48d85908b4fe12c479d2d4eea31c38f8feb05b Mon Sep 17 00:00:00 2001 From: Thomas Plisson Date: Wed, 5 Mar 2025 17:20:41 +0100 Subject: [PATCH] EVM: clear tx_queue after a delayed inbox flush --- etherlink/CHANGES_NODE.md | 2 + etherlink/bin_node/lib_dev/evm_context.ml | 5 ++ etherlink/bin_node/lib_dev/tx_queue.ml | 20 +++++ etherlink/bin_node/lib_dev/tx_queue.mli | 3 + etherlink/bin_node/lib_dev/tx_queue_events.ml | 10 +++ .../bin_node/lib_dev/tx_queue_events.mli | 3 + etherlink/tezt/lib/evm_node.ml | 4 + etherlink/tezt/lib/evm_node.mli | 3 + etherlink/tezt/tests/evm_sequencer.ml | 77 +++++++++++++++++++ .../EVM node- list events regression.out | 8 ++ 10 files changed, 135 insertions(+) diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index 0fda9b9a12f0..38a9a7a1e174 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -55,6 +55,8 @@ - **experimental feature** The `tx_queue` respect the configuration field `keep_alive` for it's RPC. (!16894) +- **experimental feature** `tx_queue` clears itself when a delayed +inbox flush has happened. (!17091) ### Storage changes diff --git a/etherlink/bin_node/lib_dev/evm_context.ml b/etherlink/bin_node/lib_dev/evm_context.ml index 5fecb62a028a..5fbd91b571f1 100644 --- a/etherlink/bin_node/lib_dev/evm_context.ml +++ b/etherlink/bin_node/lib_dev/evm_context.ml @@ -442,6 +442,11 @@ module State = struct (* Find the [l2_level] evm_state. *) let*! context = Irmin_context.checkout_exn ctxt.index checkpoint in let*! evm_state = Irmin_context.PVMState.get context in + (* Clear the TX queue if needed, to preserve its invariants about nonces always increasing. *) + let* () = + when_ (Configuration.is_tx_queue_enabled ctxt.configuration) + @@ Tx_queue.clear + in (* Clear the store. *) let* () = Evm_store.reset_after conn ~l2_level in let* pending_upgrade = Evm_store.Kernel_upgrades.find_latest_pending conn in diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index f221f656c713..19b2aa4f9e18 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -136,6 +136,7 @@ module Request = struct } -> (Ethereum_types.legacy_transaction_object option, tztrace) t | Tick : (unit, tztrace) t + | Clear : (unit, tztrace) t type view = View : _ t -> view @@ -178,6 +179,12 @@ module Request = struct (req "transaction_hash" Ethereum_types.hash_encoding)) (function View (Find {txn_hash}) -> Some ((), txn_hash) | _ -> None) (fun _ -> assert false); + case + Json_only + ~title:"Clear" + (obj1 (req "request" (constant "clear"))) + (function View Clear -> Some () | _ -> None) + (fun _ -> assert false); ] let pp fmt (View r) = @@ -188,6 +195,7 @@ module Request = struct fprintf fmt "Confirm %s" txn_hash | Find {txn_hash = Hash (Hex txn_hash)} -> fprintf fmt "Find %s" txn_hash | Tick -> fprintf fmt "Tick" + | Clear -> fprintf fmt "Clear" end module Worker = Worker.MakeSingle (Name) (Request) (Types) @@ -347,6 +355,13 @@ module Handlers = struct (fun {pending_callback; _} -> Lwt.async (fun () -> pending_callback `Dropped)) txns + | Clear -> + (* clear values and keep the allocated space *) + String.Hashtbl.clear state.pending ; + String.Hashtbl.clear state.tx_object ; + Queue.clear state.queue ; + let*! () = Tx_queue_events.cleared () in + return_unit type launch_error = tztrace @@ -462,6 +477,11 @@ let find txn_hash = let*? w = Lazy.force worker in Worker.Queue.push_request_and_wait w (Find {txn_hash}) |> handle_request_error +let clear () = + let open Lwt_result_syntax in + let*? w = Lazy.force worker in + Worker.Queue.push_request_and_wait w Clear |> handle_request_error + let shutdown () = let open Lwt_result_syntax in bind_worker @@ fun w -> diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index 1a752434ccea..14b7f2d20bf0 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -38,6 +38,9 @@ val start : to be processed. *) val shutdown : unit -> unit tzresult Lwt.t +(** [clear ()] removes the tx queue data but keeps the allocated space *) +val clear : unit -> unit tzresult Lwt.t + (** [inject ?callback tx_object raw_txn] pushes the raw transaction [raw_txn] to the worker queue. diff --git a/etherlink/bin_node/lib_dev/tx_queue_events.ml b/etherlink/bin_node/lib_dev/tx_queue_events.ml index f87890652692..004f9b0d005f 100644 --- a/etherlink/bin_node/lib_dev/tx_queue_events.ml +++ b/etherlink/bin_node/lib_dev/tx_queue_events.ml @@ -25,6 +25,14 @@ let shutdown = ~level:Notice () +let cleared = + declare_0 + ~section + ~name:"tx_queue_cleared" + ~msg:"cleared the tx queue" + ~level:Notice + () + let injecting_transactions = declare_1 ~name:"tx_queue_injecting_transaction" @@ -69,6 +77,8 @@ let is_ready () = emit is_ready () let shutdown () = emit shutdown () +let cleared () = emit cleared () + let injecting_transactions n = emit injecting_transactions n let add_transaction tx = emit add_transaction tx diff --git a/etherlink/bin_node/lib_dev/tx_queue_events.mli b/etherlink/bin_node/lib_dev/tx_queue_events.mli index 87b58ccea2a5..d85061a5e24b 100644 --- a/etherlink/bin_node/lib_dev/tx_queue_events.mli +++ b/etherlink/bin_node/lib_dev/tx_queue_events.mli @@ -13,6 +13,9 @@ val is_ready : unit -> unit Lwt.t (** [shutdown ()] advertises that the [Tx_queue] is shutting down. *) val shutdown : unit -> unit Lwt.t +(** [cleared ()] advertises that the [Tx_queue] finished clearing. *) +val cleared : unit -> unit Lwt.t + (** [injecting_transactions nb] advertises [nb] transactions are about to be injected to the relay endpoint with a batch of [eth_sendRawTransaction]. *) val injecting_transactions : int -> unit Lwt.t diff --git a/etherlink/tezt/lib/evm_node.ml b/etherlink/tezt/lib/evm_node.ml index 0e6c7899284e..3a1b9a7a3f94 100644 --- a/etherlink/tezt/lib/evm_node.ml +++ b/etherlink/tezt/lib/evm_node.ml @@ -582,6 +582,10 @@ let wait_for_tx_queue_injecting_transaction ?timeout evm_node = wait_for_event ?timeout evm_node ~event:"tx_queue_injecting_transaction.v0" @@ fun json -> JSON.(json |> as_int |> Option.some) +let wait_for_tx_queue_cleared ?timeout evm_node = + wait_for_event ?timeout evm_node ~event:"tx_queue_cleared.v0" + @@ Fun.const (Some ()) + let wait_for_split ?level evm_node = wait_for_event evm_node ~event:"evm_context_gc_split.v0" @@ fun json -> let event_level = JSON.(json |-> "level" |> as_int) in diff --git a/etherlink/tezt/lib/evm_node.mli b/etherlink/tezt/lib/evm_node.mli index c46001337c9e..9fbd528bdd7d 100644 --- a/etherlink/tezt/lib/evm_node.mli +++ b/etherlink/tezt/lib/evm_node.mli @@ -422,6 +422,9 @@ val wait_for_tx_queue_transaction_confirmed : {!wait_for} and returns the number of transactions injected. *) val wait_for_tx_queue_injecting_transaction : ?timeout:float -> t -> int Lwt.t +(** [wait_for_tx_queue_cleared ?timeout evm_node] waits for the [tx_queue_cleared.v0]. *) +val wait_for_tx_queue_cleared : ?timeout:float -> t -> unit Lwt.t + (** [wait_for_shutdown ?can_terminate evm_node] waits until a node terminates and return its status. If the node is not running, make the test fail. If [can_terminate] is `true` and the node was already terminated, returns diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index acb61f1c9282..a0122b01114c 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -10843,6 +10843,82 @@ let test_tx_queue = in unit +let test_tx_queue_clear = + register_all + ~title:"Tx_queue clears after delayed inbox flush" + ~tags:["evm"; "tx_queue"; "clear"; "delayed_inbox"; "flush"] + ~enable_tx_queue:true + ~time_between_blocks:Nothing + ~delayed_inbox_timeout:0 + ~delayed_inbox_min_levels:1 + ~kernels:[Latest] + ~use_dal:Register_without_feature + ~use_threshold_encryption:Register_without_feature + @@ fun { + client; + l1_contracts; + sc_rollup_address; + sc_rollup_node; + sequencer; + observer; + proxy; + _; + } + _protocol -> + let* () = Evm_node.terminate observer in + let* () = + Evm_node.run ~extra_arguments:["--dont-track-rollup-node"] observer + in + let* () = bake_until_sync ~sc_rollup_node ~proxy ~client ~sequencer () in + + let* raw_tx = + Cast.craft_tx + ~source_private_key:Eth_account.bootstrap_accounts.(0).private_key + ~chain_id:1337 + ~nonce:0 + ~gas_price:1_000_000_000 + ~gas:23_300 + ~value:(Wei.of_eth_int 1) + ~address:Eth_account.bootstrap_accounts.(1).address + () + in + let* _ = + send_raw_transaction_to_delayed_inbox + ~sc_rollup_node + ~client + ~l1_contracts + ~sc_rollup_address + ~sender:Constant.bootstrap3 + raw_tx + in + + (* Mark at which level the delayed inbox item was added. *) + let* add_level = Client.level client in + let wait_for_processed_l1_level_add = + Evm_node.wait_for_processed_l1_level ~level:add_level sequencer + in + let* _ = next_rollup_node_level ~sc_rollup_node ~client in + + (* Mark at which level the delayed inbox was flushed. *) + let* flushed_level = Client.level client in + let wait_for_processed_l1_level_flushed = + Evm_node.wait_for_processed_l1_level ~level:flushed_level sequencer + in + let* _ = next_rollup_node_level ~sc_rollup_node ~client + and* _ = wait_for_processed_l1_level_add in + + (* Produce one L2 block. The sequencer is aware of the delayed inbox + item but refuses to include it. *) + let wait_for_flush = Evm_node.wait_for_flush_delayed_inbox sequencer in + let wait_for_clear = Evm_node.wait_for_tx_queue_cleared observer in + let*@ _ = Rpc.produce_block ~with_delayed_transactions:false sequencer in + + let* _ = next_rollup_node_level ~sc_rollup_node ~client + and* _ = wait_for_flush + and* _ = wait_for_processed_l1_level_flushed + and* () = wait_for_clear in + unit + let test_spawn_rpc = let fresh_port = Port.fresh () in register_all @@ -11072,5 +11148,6 @@ let () = test_eip1559_transaction_object [Alpha] ; test_apply_from_full_history_mode protocols ; test_tx_queue [Alpha] ; + test_tx_queue_clear [Alpha] ; test_spawn_rpc protocols ; test_observer_init_from_snapshot protocols diff --git a/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out b/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out index c30ff84fa2e4..2cef532ae801 100644 --- a/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out +++ b/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out @@ -1400,6 +1400,14 @@ tx_queue_rpc_error: contain invalid byte sequences. */ string || { "invalid_utf8_string": [ integer ∈ [0, 255] ... ] } +tx_queue_cleared: + description: cleared the tx queue + level: notice + section: evm_node.dev.tx_queue + json format: + { /* tx_queue_cleared version 0 */ + "tx_queue_cleared.v0": any } + shutting_down_tx_queue: description: stopping the tx queue level: notice -- GitLab