diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index 323fc6505ccdd862d11e87483fcbe0d94f6f55f8..445f7f7290e55caf9720580788c9db75d6a2742a 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -26,6 +26,12 @@ ### Internals +### Experimental + +- Adds a new experimental feature `enable_tx_queue` to replace the tx + pool by a tx queue to improve performance and simplify the + code. (!16812) + ## Version 0.17 (2025-02-14) This release addresses several bugs reported by partners, notably around the diff --git a/etherlink/bin_node/config/configuration.ml b/etherlink/bin_node/config/configuration.ml index cc7c7c3c566773821ca3973dbe3e72483207165a..4811ce1cf9598883221ad8c49a88e2ff6cab6576 100644 --- a/etherlink/bin_node/config/configuration.ml +++ b/etherlink/bin_node/config/configuration.ml @@ -74,6 +74,7 @@ type experimental_features = { max_websocket_message_length : int; monitor_websocket_heartbeat : monitor_websocket_heartbeat option; l2_chains : l2_chain list option; + enable_tx_queue : bool; } type sequencer = { @@ -215,6 +216,7 @@ let default_experimental_features = max_websocket_message_length = default_max_socket_message_length; monitor_websocket_heartbeat = default_monitor_websocket_heartbeat; l2_chains = default_l2_chains; + enable_tx_queue = false; } let default_data_dir = Filename.concat (Sys.getenv "HOME") ".octez-evm-node" @@ -844,6 +846,7 @@ let experimental_features_encoding = max_websocket_message_length; monitor_websocket_heartbeat; l2_chains : l2_chain list option; + enable_tx_queue; } -> ( ( drop_duplicate_on_injection, blueprints_publisher_order_enabled, @@ -855,7 +858,8 @@ let experimental_features_encoding = enable_websocket, max_websocket_message_length, monitor_websocket_heartbeat, - l2_chains ) )) + l2_chains, + enable_tx_queue ) )) (fun ( ( drop_duplicate_on_injection, blueprints_publisher_order_enabled, enable_send_raw_transaction, @@ -866,7 +870,8 @@ let experimental_features_encoding = enable_websocket, max_websocket_message_length, monitor_websocket_heartbeat, - l2_chains ) ) -> + l2_chains, + enable_tx_queue ) ) -> { drop_duplicate_on_injection; blueprints_publisher_order_enabled; @@ -877,6 +882,7 @@ let experimental_features_encoding = max_websocket_message_length; monitor_websocket_heartbeat; l2_chains; + enable_tx_queue; }) (merge_objs (obj6 @@ -927,7 +933,7 @@ let experimental_features_encoding = DEPRECATED: You should remove this option from your \ configuration file." bool)) - (obj5 + (obj6 (dft "rpc_server" ~description: @@ -959,7 +965,12 @@ let experimental_features_encoding = \ If not set, the node will adopt a single \ chain behaviour." (option (list l2_chain_encoding)) - default_l2_chains))) + default_l2_chains) + (dft + "enable_tx_queue" + ~description:"Replace the observer tx pool by a tx queue" + bool + default_experimental_features.enable_tx_queue))) let proxy_encoding = let open Data_encoding in diff --git a/etherlink/bin_node/config/configuration.mli b/etherlink/bin_node/config/configuration.mli index e1cf612ced605e1cb5091e99c7cd3891faead906..e0be742d8f8951ce961d1cccfc76d2947559d28e 100644 --- a/etherlink/bin_node/config/configuration.mli +++ b/etherlink/bin_node/config/configuration.mli @@ -99,6 +99,7 @@ type experimental_features = { max_websocket_message_length : int; monitor_websocket_heartbeat : monitor_websocket_heartbeat option; l2_chains : l2_chain list option; + enable_tx_queue : bool; } type sequencer = { diff --git a/etherlink/bin_node/lib_dev/observer.ml b/etherlink/bin_node/lib_dev/observer.ml index 5b8a065a789019b868e84dcfd2c2aa2c9c5cfada..97a87275ca1223aab67eff7a97b83a0b171b09d1 100644 --- a/etherlink/bin_node/lib_dev/observer.ml +++ b/etherlink/bin_node/lib_dev/observer.ml @@ -79,6 +79,7 @@ let install_finalizer_observer ~rollup_node_tracking finalizer_public_server Misc.unwrap_error_monad @@ fun () -> let open Lwt_result_syntax in let* () = Tx_pool.shutdown () in + let* () = Tx_queue.shutdown () in let* () = Evm_context.shutdown () in when_ rollup_node_tracking @@ fun () -> Evm_events_follower.shutdown () @@ -176,34 +177,42 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync Evm_ro_context.ro_backend ro_ctxt config ~evm_node_endpoint in - let mode = - if config.finalized_view then - Tx_pool.Forward + let* () = + if config.experimental_features.enable_tx_queue then + let* () = + Tx_queue.start + ~relay_endpoint:evm_node_endpoint + ~max_transaction_batch_length:None + () + in + return_unit + else + let mode = + if config.finalized_view then + Tx_pool.Forward + { + injector = + (fun _ raw_tx -> + Injector.send_raw_transaction + ~keep_alive:config.keep_alive + ~base:evm_node_endpoint + ~raw_tx); + } + else Tx_pool.Relay + in + Tx_pool.start { - injector = - (fun _ raw_tx -> - Injector.send_raw_transaction - ~keep_alive:config.keep_alive - ~base:evm_node_endpoint - ~raw_tx); + backend = observer_backend; + smart_rollup_address = + Tezos_crypto.Hashed.Smart_rollup_address.to_b58check + smart_rollup_address; + mode; + tx_timeout_limit = config.tx_pool_timeout_limit; + tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; + tx_pool_tx_per_addr_limit = + Int64.to_int config.tx_pool_tx_per_addr_limit; + max_number_of_chunks = None; } - else Tx_pool.Relay - in - - let* () = - Tx_pool.start - { - backend = observer_backend; - smart_rollup_address = - Tezos_crypto.Hashed.Smart_rollup_address.to_b58check - smart_rollup_address; - mode; - tx_timeout_limit = config.tx_pool_timeout_limit; - tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; - tx_pool_tx_per_addr_limit = - Int64.to_int config.tx_pool_tx_per_addr_limit; - max_number_of_chunks = None; - } in Metrics.init ~mode:"observer" @@ -267,8 +276,17 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync let*! () = task in return_unit else - Blueprints_follower.start - ~time_between_blocks - ~evm_node_endpoint - ~next_blueprint_number - (on_new_blueprint evm_node_endpoint) + let ping_tx_pool = not config.experimental_features.enable_tx_queue in + let* () = + Blueprints_follower.start + ~ping_tx_pool + ~time_between_blocks + ~evm_node_endpoint + ~next_blueprint_number + (on_new_blueprint evm_node_endpoint) + and* () = + if config.experimental_features.enable_tx_queue then + Tx_queue.beacon ~tick_interval:0.05 + else return_unit + in + return_unit diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index f8fb59670eab86d65fcdfc7bf44942675974959e..fbfe7c8a9f87caa13c7f1666c8af2879a2103bf3 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -643,7 +643,12 @@ let dispatch_request (rpc : Configuration.rpc) in rpc_error (Rpc_errors.transaction_rejected err None) | Ok transaction_object -> ( - let* tx_hash = Tx_pool.add transaction_object txn in + let* tx_hash = + if config.experimental_features.enable_tx_queue then + let* () = Tx_queue.inject transaction_object tx_raw in + return (Ok transaction_object.hash) + else Tx_pool.add transaction_object txn + in match tx_hash with | Ok tx_hash -> rpc_ok tx_hash | Error reason -> @@ -794,7 +799,7 @@ let dispatch_request (rpc : Configuration.rpc) Lwt.return JSONRPC.{value; id} let dispatch_private_request (rpc : Configuration.rpc) - (_config : Configuration.t) + (config : Configuration.t) ((module Backend_rpc : Services_backend_sig.S), _) ~block_production ({method_; parameters; id} : JSONRPC.request) : JSONRPC.response Lwt.t = let open Lwt_syntax in @@ -859,13 +864,17 @@ let dispatch_private_request (rpc : Configuration.rpc) in match is_valid with | Error err -> - let*! () = - let transaction = Ethereum_types.hex_encode_string raw_txn in - Tx_pool_events.invalid_transaction ~transaction - in + let transaction = Ethereum_types.hex_encode_string raw_txn in + let*! () = Tx_pool_events.invalid_transaction ~transaction in rpc_error (Rpc_errors.transaction_rejected err None) | Ok transaction_object -> ( - let* tx_hash = Tx_pool.add transaction_object raw_txn in + let* tx_hash = + if config.experimental_features.enable_tx_queue then + let transaction = Ethereum_types.hex_encode_string raw_txn in + let* () = Tx_queue.inject transaction_object transaction in + return @@ Ok transaction_object.hash + else Tx_pool.add transaction_object raw_txn + in match tx_hash with | Ok tx_hash -> rpc_ok tx_hash | Error reason -> diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml new file mode 100644 index 0000000000000000000000000000000000000000..69b256585024d60fb2217774c460633354dcc346 --- /dev/null +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -0,0 +1,381 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Functori *) +(* Copyright (c) 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +open Tezos_workers + +let two_seconds = Ptime.Span.of_int_s 2 + +type parameters = { + relay_endpoint : Uri.t; + max_transaction_batch_length : int option; +} + +type callback = + [`Accepted of Ethereum_types.hash | `Confirmed | `Dropped | `Refused] -> + unit Lwt.t + +type request = {payload : Ethereum_types.hex; callback : callback} + +type pending = {callback : callback; since : Ptime.t} + +module Pending_transactions = struct + open Ethereum_types + module S = String.Hashtbl + + type t = pending S.t + + let empty () = S.create 1000 + + let add htbl (Hash (Hex hash)) callback = + S.add htbl hash ({callback; since = Time.System.now ()} : pending) + + let pop htbl (Hash (Hex hash)) = + match S.find htbl hash with + | Some pending -> + S.remove htbl hash ; + Some pending + | None -> None + + let drop htbl = + let now = Time.System.now () in + let dropped = ref [] in + S.filter_map_inplace + (fun _hash pending -> + let lifespan = Ptime.diff now pending.since in + if Ptime.Span.compare lifespan two_seconds > 0 then ( + dropped := pending :: !dropped ; + None) + else Some pending) + htbl ; + !dropped +end + +type state = { + relay_endpoint : Uri.t; + mutable queue : request Queue.t; + pending : Pending_transactions.t; + max_transaction_batch_length : int option; +} + +module Types = struct + type nonrec state = state + + type nonrec parameters = parameters +end + +module Name = struct + type t = unit + + let encoding = Data_encoding.unit + + let base = ["evm_node_worker"; "tx_queue"] + + let pp _fmt () = () + + let equal () () = true +end + +module Request = struct + type ('a, 'b) t = + | Inject : { + payload : Ethereum_types.hex; + callback : callback; + } + -> (unit, tztrace) t + | Confirm : {txn_hash : Ethereum_types.hash} -> (unit, tztrace) t + | Tick : (unit, tztrace) t + + type view = View : _ t -> view + + let view req = View req + + let encoding = + let open Data_encoding in + (* This encoding is used to encode only *) + union + [ + case + Json_only + ~title:"Inject" + (obj2 + (req "request" (constant "inject")) + (req "payload" Ethereum_types.hex_encoding)) + (function + | View (Inject {payload; _}) -> Some ((), payload) | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Confirm" + (obj2 + (req "request" (constant "confirm")) + (req "transaction_hash" Ethereum_types.hash_encoding)) + (function + | View (Confirm {txn_hash}) -> Some ((), txn_hash) | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Tick" + (obj1 (req "request" (constant "tick"))) + (function View Tick -> Some () | _ -> None) + (fun _ -> assert false); + ] + + let pp fmt (View r) = + let open Format in + match r with + | Inject {payload = Hex txn; _} -> fprintf fmt "Inject %s" txn + | Confirm {txn_hash = Hash (Hex txn_hash)} -> + fprintf fmt "Confirm %s" txn_hash + | Tick -> fprintf fmt "Tick" +end + +module Worker = Worker.MakeSingle (Name) (Request) (Types) + +type worker = Worker.infinite Worker.queue Worker.t + +let uuid_seed = Random.get_state () + +let send_transactions_batch ~relay_endpoint transactions = + let open Lwt_result_syntax in + let module M = Map.Make (String) in + let module Srt = Rpc_encodings.Send_raw_transaction in + if Seq.is_empty transactions then return_unit + else + let rev_batch, callbacks = + Seq.fold_left + (fun (rev_batch, callbacks) {payload; callback} -> + let req_id = Uuidm.(v4_gen uuid_seed () |> to_string ~upper:false) in + let txn = + Rpc_encodings.JSONRPC. + { + method_ = Srt.method_; + parameters = + Some (Data_encoding.Json.construct Srt.input_encoding payload); + id = Some (Id_string req_id); + } + in + + (txn :: rev_batch, M.add req_id callback callbacks)) + ([], M.empty) + transactions + in + let batch = List.rev rev_batch in + + let*! () = Tx_queue_events.injecting_transactions (List.length batch) in + + let* responses = + Rollup_services.call_service + ~keep_alive:true + ~base:relay_endpoint + (Batch.dispatch_batch_service ~path:Resto.Path.root) + () + () + (Batch batch) + in + + let responses = + match responses with Singleton r -> [r] | Batch rs -> rs + in + + let* missed_callbacks = + List.fold_left_es + (fun callbacks (response : Rpc_encodings.JSONRPC.response) -> + match response with + | {id = Some (Id_string req); value} -> ( + match (value, M.find_opt req callbacks) with + | value, Some callback -> + let* () = + match value with + | Ok res -> + let hash = + Data_encoding.Json.destruct Srt.output_encoding res + in + Lwt_result.ok (callback (`Accepted hash)) + | Error error -> + let*! () = Tx_queue_events.rpc_error error in + Lwt_result.ok (callback `Refused) + in + return (M.remove req callbacks) + | _ -> return callbacks) + | _ -> failwith "Inconsistent response from the server") + callbacks + responses + in + + assert (M.is_empty missed_callbacks) ; + return_unit + +module Handlers = struct + open Request + + type self = worker + + let on_request : + type r request_error. + worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t + = + fun self request -> + let open Lwt_result_syntax in + let state = Worker.state self in + match request with + | Inject {payload; callback} -> + let instrumented_callback reason = + (match reason with + | `Accepted hash -> + Pending_transactions.add state.pending hash callback + | _ -> ()) ; + callback reason + in + Queue.add {payload; callback = instrumented_callback} state.queue ; + return_unit + | Confirm {txn_hash} -> ( + match Pending_transactions.pop state.pending txn_hash with + | Some {callback; _} -> + Lwt.async (fun () -> callback `Confirmed) ; + return_unit + | None -> return_unit) + | Tick -> + let all_transactions = Queue.to_seq state.queue in + let* transactions_to_inject, remaining_transactions = + match state.max_transaction_batch_length with + | None -> return (all_transactions, Seq.empty) + | Some max_transaction_batch_length -> + let when_negative_length = + TzTrace.make + (Exn (Failure "Negative max_transaction_batch_length")) + in + let*? transactions_to_inject = + Seq.take + ~when_negative_length + max_transaction_batch_length + all_transactions + in + let*? remaining_transactions = + Seq.drop + ~when_negative_length + max_transaction_batch_length + all_transactions + in + return (transactions_to_inject, remaining_transactions) + in + state.queue <- Queue.of_seq remaining_transactions ; + + let+ () = + send_transactions_batch + ~relay_endpoint:state.relay_endpoint + transactions_to_inject + in + + let txns = Pending_transactions.drop state.pending in + List.iter + (fun {callback; _} -> Lwt.async (fun () -> callback `Dropped)) + txns + + type launch_error = tztrace + + let on_launch _self () + ({relay_endpoint; max_transaction_batch_length} : parameters) = + let open Lwt_result_syntax in + return + { + relay_endpoint; + queue = Queue.create (); + pending = Pending_transactions.empty (); + max_transaction_batch_length; + } + + let on_error (type a b) _self _status_request (_r : (a, b) Request.t) + (_errs : b) : [`Continue | `Shutdown] tzresult Lwt.t = + Lwt_result_syntax.return `Continue + + let on_completion _ _ _ _ = Lwt.return_unit + + let on_no_request _ = Lwt.return_unit + + let on_close _ = Lwt.return_unit +end + +let table = Worker.create_table Queue + +let worker_promise, worker_waker = Lwt.task () + +type error += No_worker + +type error += Tx_queue_is_closed + +let () = + register_error_kind + `Permanent + ~id:"tx_queue_is_closed" + ~title:"Tx_queue_is_closed" + ~description:"Failed to add a request to the Tx queue, it's closed." + Data_encoding.unit + (function Tx_queue_is_closed -> Some () | _ -> None) + (fun () -> Tx_queue_is_closed) + +let worker = + lazy + (match Lwt.state worker_promise with + | Lwt.Return worker -> Ok worker + | Lwt.Fail e -> Result_syntax.tzfail (error_of_exn e) + | Lwt.Sleep -> Result_syntax.tzfail No_worker) + +let bind_worker f = + let open Lwt_result_syntax in + let res = Lazy.force worker in + match res with + | Error [No_worker] -> + (* There is no worker, nothing to do *) + return_unit + | Error errs -> fail errs + | Ok w -> f w + +let push_request worker request = + let open Lwt_result_syntax in + let*! (pushed : bool) = Worker.Queue.push_request worker request in + if not pushed then tzfail Tx_queue_is_closed else return_unit + +let tick () = bind_worker @@ fun w -> push_request w Tick + +let rec beacon ~tick_interval = + let open Lwt_result_syntax in + let* () = tick () in + let*! () = Lwt_unix.sleep tick_interval in + beacon ~tick_interval + +let inject ?(callback = fun _ -> Lwt_syntax.return_unit) + (tx_object : Ethereum_types.legacy_transaction_object) txn = + (* tx_object uses only for it's hash for now. This will be revisited + in a follow-up *) + let open Lwt_syntax in + let* () = Tx_queue_events.add_transaction tx_object.hash in + let* worker = worker_promise in + push_request worker (Inject {payload = txn; callback}) + +let confirm txn_hash = + bind_worker @@ fun w -> push_request w (Confirm {txn_hash}) + +let start ~relay_endpoint ~max_transaction_batch_length () = + let open Lwt_result_syntax in + let* worker = + Worker.launch + table + () + {relay_endpoint; max_transaction_batch_length} + (module Handlers) + in + Lwt.wakeup worker_waker worker ; + let*! () = Tx_queue_events.is_ready () in + return_unit + +let shutdown () = + let open Lwt_result_syntax in + bind_worker @@ fun w -> + let*! () = Tx_queue_events.shutdown () in + let*! () = Worker.shutdown w in + return_unit diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli new file mode 100644 index 0000000000000000000000000000000000000000..7d3850ea3bf02f4476b00fd082368bd1b4672935 --- /dev/null +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -0,0 +1,65 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Functori *) +(* Copyright (c) 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +(** The Tx_queue is a worker allowing to batch raw transactions in a single + [eth_sendRawTransaction] at a regular interval. It provides a non-blocking + interface based on the use of callbacks. *) + +(** A [callback] is called by the [Tx_queue] at various stages of a + submitted transaction's life. + + The next tick after its insertion in the queue, a transaction is submitted + to the relay node within a batch of [eth_sendRawTransaction] requests. + + {ul + {li Depending on the result of the RPC, its [callback] is called with + either [`Accepted hash] (where [hash] is the hash of the raw + transaction) or [`Refused]).} + {li As soon as the transaction appears in a blueprint, its callback is + called with [`Confirmed]. If this does not happen before 2s, the + [callback] is called with [`Dropped].}} *) +type callback = + [`Accepted of Ethereum_types.hash | `Confirmed | `Dropped | `Refused] -> + unit Lwt.t + +(** A [request] submitted to the [Tx_queue] consists in a payload + (that is, a raw transaction) and a {!type:callback} that will be + used to advertise the transaction's life cycle. *) + +type request = {payload : Ethereum_types.hex; callback : callback} + +(** [start ~relay_endpoint ~max_transaction_batch_length ()] starts + the worker, meaning it is possible to call {!inject}, {!confirm} + and {!beacon}. *) +val start : + relay_endpoint:Uri.t -> + max_transaction_batch_length:int option -> + unit -> + unit tzresult Lwt.t + +(** [shutdown ()] stops the tx queue, waiting for the ongoing request + to be processed. *) +val shutdown : unit -> unit tzresult Lwt.t + +(** [inject ?callback tx_object raw_txn] pushes the raw transaction + [raw_txn] to the worker queue. + + {b Note:} The promise will be sleeping until at least {!start} is called. *) +val inject : + ?callback:callback -> + Ethereum_types.legacy_transaction_object -> + Ethereum_types.hex -> + unit tzresult Lwt.t + +(** [confirm hash] is to be called by an external component to advertise a + transaction has been included in a blueprint. *) +val confirm : Ethereum_types.hash -> unit tzresult Lwt.t + +(** [beacon ~tick_interval] is a never fulfilled promise which triggers a tick + in the [Tx_queue] every [tick_interval] seconds. *) +val beacon : tick_interval:float -> unit tzresult Lwt.t diff --git a/etherlink/bin_node/lib_dev/tx_queue_events.ml b/etherlink/bin_node/lib_dev/tx_queue_events.ml new file mode 100644 index 0000000000000000000000000000000000000000..3e17dcda574a092b8982a1d56554bdb5779c247d --- /dev/null +++ b/etherlink/bin_node/lib_dev/tx_queue_events.ml @@ -0,0 +1,61 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Nomadic Labs *) +(* Copyright (c) 2025 Functori *) +(* *) +(*****************************************************************************) +open Internal_event.Simple + +let section = Events.section @ ["tx_queue"] + +let is_ready = + declare_0 + ~section + ~name:"tx_queue_is_ready" + ~level:Notice + ~msg:"tx queue is ready" + () + +let shutdown = + declare_0 + ~section + ~name:"shutting_down_tx_queue" + ~msg:"stopping the tx queue" + ~level:Notice + () + +let injecting_transactions = + declare_1 + ~name:"tx_queue_injecting_transaction" + ~msg:"injecting {n} transactions" + ~level:Info + ("n", Data_encoding.int31) + +let rpc_error = + declare_2 + ~section + ~name:"tx_queue_rpc_error" + ~msg:"an RPC produced the error :\n\tcode:{code},\n\tmessage:{message}" + ~level:Error + ("code", Data_encoding.int32) + ("message", Data_encoding.string) + +let add_transaction = + declare_1 + ~name:"tx_queue_add_transaction" + ~msg:"transaction {tx_hash} received" + ~level:Debug + ~pp1:(fun fmt Ethereum_types.(Hash (Hex h)) -> Format.fprintf fmt "%10s" h) + ("tx_hash", Ethereum_types.hash_encoding) + +let is_ready () = emit is_ready () + +let shutdown () = emit shutdown () + +let injecting_transactions n = emit injecting_transactions n + +let add_transaction tx = emit add_transaction tx + +let rpc_error (error : Rpc_encodings.JSONRPC.error) = + emit rpc_error (Int32.of_int error.code, error.message) diff --git a/etherlink/bin_node/lib_dev/tx_queue_events.mli b/etherlink/bin_node/lib_dev/tx_queue_events.mli new file mode 100644 index 0000000000000000000000000000000000000000..305ce9aa3276c3d58eaa2164e4641e1e0c8e82d7 --- /dev/null +++ b/etherlink/bin_node/lib_dev/tx_queue_events.mli @@ -0,0 +1,25 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Nomadic Labs *) +(* Copyright (c) 2025 Functori *) +(* *) +(*****************************************************************************) + +(** [is_ready ()] advertises that the [Tx_queue] is ready to receive + transactions. *) +val is_ready : unit -> unit Lwt.t + +(** [shutdown ()] advertises that the [Tx_queue] is shutting down. *) +val shutdown : unit -> unit Lwt.t + +(** [injecting_transactions nb] advertises [nb] transactions are about to be + injected to the relay endpoint with a batch of [eth_sendRawTransaction]. *) +val injecting_transactions : int -> unit Lwt.t + +(** [add_transaction tx_hash] Advertises [tx_hash] was added to the tx + queue. *) +val add_transaction : Ethereum_types.hash -> unit Lwt.t + +(** [rpc_error error] advertises an RPC produced the error [error]. *) +val rpc_error : Rpc_encodings.JSONRPC.error -> unit Lwt.t diff --git a/etherlink/tezt/lib/evm_node.ml b/etherlink/tezt/lib/evm_node.ml index 352d391f7b3a79fc228d69f9da422192aee77195..9788ca4eaef1c44eef5d5b0e8e9f0235e16eb974 100644 --- a/etherlink/tezt/lib/evm_node.ml +++ b/etherlink/tezt/lib/evm_node.ml @@ -556,6 +556,14 @@ let wait_for_tx_pool_add_transaction ?timeout evm_node = wait_for_event ?timeout evm_node ~event:"tx_pool_add_transaction.v0" @@ JSON.as_string_opt +let wait_for_tx_queue_add_transaction ?timeout evm_node = + wait_for_event ?timeout evm_node ~event:"tx_queue_add_transaction.v0" + @@ fun json -> JSON.(json |> as_string |> Option.some) + +let wait_for_tx_queue_injecting_transaction ?timeout evm_node = + wait_for_event ?timeout evm_node ~event:"tx_queue_injecting_transaction.v0" + @@ fun json -> JSON.(json |> as_int |> Option.some) + let wait_for_split ?level evm_node = wait_for_event evm_node ~event:"evm_context_gc_split.v0" @@ fun json -> let event_level = JSON.(json |-> "level" |> as_int) in @@ -1250,7 +1258,8 @@ let optional_json_put ~name v f json = let patch_config_with_experimental_feature ?(drop_duplicate_when_injection = false) ?(blueprints_publisher_order_enabled = false) ?(next_wasm_runtime = true) - ?rpc_server ?(enable_websocket = false) ?max_websocket_message_length () = + ?rpc_server ?(enable_websocket = false) ?max_websocket_message_length + ?(enable_tx_queue = false) () = JSON.update "experimental_features" @@ fun json -> conditional_json_put drop_duplicate_when_injection @@ -1269,6 +1278,7 @@ let patch_config_with_experimental_feature | Resto -> `String "resto" | Dream -> `String "dream") |> conditional_json_put enable_websocket ~name:"enable_websocket" (`Bool true) + |> conditional_json_put enable_tx_queue ~name:"enable_tx_queue" (`Bool true) |> optional_json_put max_websocket_message_length ~name:"max_websocket_message_length" diff --git a/etherlink/tezt/lib/evm_node.mli b/etherlink/tezt/lib/evm_node.mli index 1c9ab477bd7981ae1e040074a2cab0f9e4700442..b1df76eff13b84747d774294e6768509963ce557 100644 --- a/etherlink/tezt/lib/evm_node.mli +++ b/etherlink/tezt/lib/evm_node.mli @@ -306,6 +306,7 @@ val patch_config_with_experimental_feature : ?rpc_server:rpc_server -> ?enable_websocket:bool -> ?max_websocket_message_length:int -> + ?enable_tx_queue:bool -> unit -> JSON.t -> JSON.t @@ -391,6 +392,16 @@ val wait_for_rollup_node_ahead : t -> int Lwt.t hash. *) val wait_for_tx_pool_add_transaction : ?timeout:float -> t -> string Lwt.t +(** [wait_for_tx_queue_add_transaction ?timeout evm_node] waits for the event + [tx_queue_add_transaction.v0] using {!wait_for} and returns the transaction + hash. *) +val wait_for_tx_queue_add_transaction : ?timeout:float -> t -> string Lwt.t + +(** [wait_for_tx_queue_injecting_transaction ?timeout evm_node] waits + for the event [tx_queue_injecting_transaction.v0] using + {!wait_for} and returns the number of transactions injected. *) +val wait_for_tx_queue_injecting_transaction : ?timeout:float -> t -> int Lwt.t + (** [wait_for_shutdown ?can_terminate evm_node] waits until a node terminates and return its status. If the node is not running, make the test fail. If [can_terminate] is `true` and the node was already terminated, returns diff --git a/etherlink/tezt/lib/setup.ml b/etherlink/tezt/lib/setup.ml index 336fec316e94bb41ce8941eb4c178e4f27a342c7..8a4561ef8254aa6c257746955068a0c14fa21bd5 100644 --- a/etherlink/tezt/lib/setup.ml +++ b/etherlink/tezt/lib/setup.ml @@ -205,7 +205,7 @@ let setup_sequencer ?max_delayed_inbox_blueprint_length ?next_wasm_runtime ?(drop_duplicate_when_injection = true) ?(blueprints_publisher_order_enabled = true) ?rollup_history_mode ~enable_dal ?dal_slots ~enable_multichain ?rpc_server ?websockets - ?history_mode protocol = + ?history_mode ?enable_tx_queue protocol = let* node, client = setup_l1 ?commitment_period @@ -285,13 +285,14 @@ let setup_sequencer ?max_delayed_inbox_blueprint_length ?next_wasm_runtime | Some p -> Some p | None -> Some (Port.fresh ()) in - let patch_config = + let patch_config ?enable_tx_queue = Evm_node.patch_config_with_experimental_feature ~drop_duplicate_when_injection ~blueprints_publisher_order_enabled ?next_wasm_runtime ?rpc_server ?enable_websocket:websockets + ?enable_tx_queue (* When adding new experimental feature please make sure it's a good idea to activate it for all test or not. *) () @@ -346,14 +347,14 @@ let setup_sequencer ?max_delayed_inbox_blueprint_length ?next_wasm_runtime let* sequencer = Evm_node.init ?rpc_port:sequencer_rpc_port - ~patch_config + ~patch_config:(patch_config ~enable_tx_queue:false) ~mode:sequencer_mode ?history_mode (Sc_rollup_node.endpoint sc_rollup_node) in let* observer = run_new_observer_node - ~patch_config + ~patch_config:(patch_config ?enable_tx_queue) ~sc_rollup_node ?rpc_server ?websockets @@ -395,8 +396,8 @@ let register_test ~__FILE__ ?max_delayed_inbox_blueprint_length ?(threshold_encryption = false) ?(uses = uses) ?(additional_uses = []) ?rollup_history_mode ~enable_dal ?(dal_slots = if enable_dal then Some [0; 1; 2; 3] else None) - ~enable_multichain ?rpc_server ?websockets ?history_mode body ~title ~tags - protocols = + ~enable_multichain ?rpc_server ?websockets ?history_mode ?enable_tx_queue + body ~title ~tags protocols = let kernel_tag, kernel_use = Kernel.to_uses_and_tags kernel in let tags = kernel_tag :: tags in let additional_uses = @@ -449,6 +450,7 @@ let register_test ~__FILE__ ?max_delayed_inbox_blueprint_length ?dal_slots ~enable_multichain ?rpc_server + ?enable_tx_queue protocol in body sequencer_setup protocol @@ -493,7 +495,7 @@ let register_test_for_kernels ~__FILE__ ?max_delayed_inbox_blueprint_length ?enable_fa_bridge ?rollup_history_mode ?commitment_period ?challenge_window ?additional_uses ~threshold_encryption ~enable_dal ?dal_slots ~enable_multichain ?rpc_server ?websockets ?enable_fast_withdrawal - ?history_mode ~title ~tags body protocols = + ?history_mode ?enable_tx_queue ~title ~tags body protocols = List.iter (fun kernel -> register_test @@ -533,6 +535,7 @@ let register_test_for_kernels ~__FILE__ ?max_delayed_inbox_blueprint_length ~enable_dal ?dal_slots ~enable_multichain + ?enable_tx_queue ~title ~tags body diff --git a/etherlink/tezt/lib/setup.mli b/etherlink/tezt/lib/setup.mli index 9c704292ca0183e69890abed7087a08b874c8007..ad781ef2d086ca1bc76015d1c87c6567cb1c14ee 100644 --- a/etherlink/tezt/lib/setup.mli +++ b/etherlink/tezt/lib/setup.mli @@ -95,6 +95,7 @@ val register_test : ?rpc_server:Evm_node.rpc_server -> ?websockets:bool -> ?history_mode:Evm_node.history_mode -> + ?enable_tx_queue:bool -> (sequencer_setup -> Protocol.t -> unit Lwt.t) -> title:string -> tags:string list -> @@ -141,6 +142,7 @@ val register_test_for_kernels : ?websockets:bool -> ?enable_fast_withdrawal:bool -> ?history_mode:Evm_node.history_mode -> + ?enable_tx_queue:bool -> title:string -> tags:string list -> (sequencer_setup -> Protocol.t -> unit Lwt.t) -> @@ -186,5 +188,6 @@ val setup_sequencer : ?rpc_server:Evm_node.rpc_server -> ?websockets:bool -> ?history_mode:Evm_node.history_mode -> + ?enable_tx_queue:bool -> Protocol.t -> sequencer_setup Lwt.t diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index b53a5b14300f9a73ef59cdba4ad0cae5bc2f64d2..a96474da7dc2678148e4781d3bf0c8214c9b0348 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -252,8 +252,8 @@ let register_all ?max_delayed_inbox_blueprint_length ?sequencer_rpc_port ?websockets ?enable_fast_withdrawal ?history_mode ?(use_threshold_encryption = default_threshold_encryption_registration) ?(use_dal = default_dal_registration) - ?(use_multichain = default_multichain_registration) ~title ~tags body - protocols = + ?(use_multichain = default_multichain_registration) ?enable_tx_queue ~title + ~tags body protocols = let dal_cases = match use_dal with | Register_both {extra_tags_with; extra_tags_without} -> @@ -319,6 +319,7 @@ let register_all ?max_delayed_inbox_blueprint_length ?sequencer_rpc_port ?rollup_history_mode ~enable_dal ~enable_multichain + ?enable_tx_queue ~title ~tags:(te_tags @ dal_tags @ multichain_tags @ tags) body @@ -10014,6 +10015,55 @@ let test_apply_from_full_history_mode = let*@ _ = Rpc.get_block_by_number ~block:"1" observer in unit +let test_tx_queue = + register_all + ~tags:["observer"; "tx_queue"] + ~time_between_blocks:Nothing + ~kernels:[Latest] (* node only test *) + ~use_threshold_encryption:Register_without_feature + ~use_dal:Register_without_feature + ~enable_tx_queue:true + ~websockets:false + ~title:"Submits a transaction to an observer with a tx queue." + @@ fun {sequencer; observer; _} _protocol -> + let* () = + let*@ _ = produce_block sequencer in + unit + and* () = Evm_node.wait_for_blueprint_applied observer 1 in + + let observer_wait_tx_added = + Evm_node.wait_for_tx_queue_add_transaction observer + in + let sequencer_wait_tx_added = + Evm_node.wait_for_tx_pool_add_transaction sequencer + in + let observer_wait_tx_injected = + Evm_node.wait_for_tx_queue_injecting_transaction observer + in + let* raw_tx = + Cast.craft_tx + ~source_private_key:Eth_account.bootstrap_accounts.(0).private_key + ~chain_id:1337 + ~nonce:0 + ~gas_price:1_000_000_000 + ~gas:23_300 + ~value:Wei.one + ~address:Eth_account.bootstrap_accounts.(1).address + () + in + let* () = + let*@ _hash = Rpc.send_raw_transaction ~raw_tx observer in + unit + and* _ = observer_wait_tx_added + and* observer_wait_tx_injected + and* _ = sequencer_wait_tx_added in + Check.( + (observer_wait_tx_injected = 1) + ~__LOC__ + int + ~error_msg:"Expected %r transaction injected found %l.") ; + unit + let protocols = Protocol.all let () = @@ -10154,4 +10204,5 @@ let () = test_rpc_getLogs_with_earliest_fail protocols ; test_eip2930_transaction_object [Alpha] ; test_eip1559_transaction_object [Alpha] ; - test_apply_from_full_history_mode protocols + test_apply_from_full_history_mode protocols ; + test_tx_queue [Alpha] diff --git a/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out b/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out index 8513ce84954f9f49b7c24815ff19392c71b68eed..b0fe8a68c2c6ab0cefbd6be01e4f72cb480993fb 100644 --- a/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out +++ b/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out @@ -1383,6 +1383,39 @@ tx_pool_started: { /* tx_pool_started version 0 */ "tx_pool_started.v0": any } +tx_queue_rpc_error: + description: an RPC produced the error : + code:{code}, + message:{message} + level: error + section: evm_node.dev.tx_queue + json format: + { /* tx_queue_rpc_error version 0 */ + "tx_queue_rpc_error.v0": + { "code": integer ∈ [-2^31-1, 2^31], + "message": $unistring } } + $unistring: + /* Universal string representation + Either a plain UTF8 string, or a sequence of bytes for strings that + contain invalid byte sequences. */ + string || { "invalid_utf8_string": [ integer ∈ [0, 255] ... ] } + +shutting_down_tx_queue: + description: stopping the tx queue + level: notice + section: evm_node.dev.tx_queue + json format: + { /* shutting_down_tx_queue version 0 */ + "shutting_down_tx_queue.v0": any } + +tx_queue_is_ready: + description: tx queue is ready + level: notice + section: evm_node.dev.tx_queue + json format: + { /* tx_queue_is_ready version 0 */ + "tx_queue_is_ready.v0": any } + counter_inc_assertion_warning: description: assertion failed while updating a Counter metric: the increment ({increment} occuring at {label}) is negative; the value will not be updated level: warning @@ -2243,6 +2276,23 @@ tx_pool_transaction_injection_failed: { /* tx_pool_transaction_injection_failed version 0 */ "tx_pool_transaction_injection_failed.v0": any } +tx_queue_rpc_error: + description: an RPC produced the error : + code:{code}, + message:{message} + level: error + section: evm_node.dev.tx_queue + json format: + { /* tx_queue_rpc_error version 0 */ + "tx_queue_rpc_error.v0": + { "code": integer ∈ [-2^31-1, 2^31], + "message": $unistring } } + $unistring: + /* Universal string representation + Either a plain UTF8 string, or a sequence of bytes for strings that + contain invalid byte sequences. */ + string || { "invalid_utf8_string": [ integer ∈ [0, 255] ... ] } + node_da_fees: description: internal: node gives {node_da_fees} DA fees, whereas kernel gives {kernel_da_fees} on block {block_number} with {call} level: fatal diff --git a/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out b/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out index d3e2849a6daf37839d7a95844496385486d491b5..9886e1b094cbd5749999f694de76b281c90f1570 100644 --- a/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out +++ b/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out @@ -22,7 +22,8 @@ "ping_interval": 5, "ping_timeout": 30 }, - "l2_chains": null + "l2_chains": null, + "enable_tx_queue": false }, "proxy": { "ignore_block_param": false @@ -80,7 +81,8 @@ "ping_interval": 5, "ping_timeout": 30 }, - "l2_chains": null + "l2_chains": null, + "enable_tx_queue": false }, "proxy": { "ignore_block_param": false @@ -131,7 +133,8 @@ "ping_interval": 5, "ping_timeout": 30 }, - "l2_chains": null + "l2_chains": null, + "enable_tx_queue": false }, "proxy": { "evm_node_endpoint": "hidden", diff --git a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out index 943d89ffe1ab25d6ff06eb99360fdc488f15ce60..8f81bba32ff748c2d9e5cbe2c16e086ad827d503 100644 --- a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out +++ b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out @@ -169,7 +169,10 @@ [ { "chain_id": $bignum /* The id of the l2 chain */ } ... ] /* Some */ || null - /* None */ }, + /* None */, + "enable_tx_queue"?: + boolean + /* Replace the observer tx pool by a tx queue */ }, "proxy"?: { "finalized_view"?: boolean