From d3247dce7b432ccef34f72d2a7ee4dcfe5494531 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Mon, 14 Apr 2025 11:05:25 +0200 Subject: [PATCH 1/9] EVM node/Tx_queue: remove dead or redundant code --- etherlink/bin_node/lib_dev/tx_queue.ml | 22 -------- etherlink/bin_node/lib_dev/tx_queue.mli | 50 ------------------- .../fa-bridge-watchtower/etherlink_monitor.ml | 15 +++++- 3 files changed, 13 insertions(+), 74 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 2dc4eb60f233..db4a449b2255 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -376,7 +376,6 @@ end module Request = struct type ('a, 'b) t = | Inject : request -> ((unit, string) result, tztrace) t - | Confirm : {txn_hash : Ethereum_types.hash} -> (unit, tztrace) t | Find : { txn_hash : Ethereum_types.hash; } @@ -432,15 +431,6 @@ module Request = struct (function | View (Inject {payload; _}) -> Some ((), payload) | _ -> None) (fun _ -> assert false); - case - Json_only - ~title:"Confirm" - (obj2 - (req "request" (constant "confirm")) - (req "transaction_hash" Ethereum_types.hash_encoding)) - (function - | View (Confirm {txn_hash}) -> Some ((), txn_hash) | _ -> None) - (fun _ -> assert false); case Json_only ~title:"Tick" @@ -530,8 +520,6 @@ module Request = struct let open Format in match r with | Inject {payload = Hex txn; _} -> fprintf fmt "Inject %s" txn - | Confirm {txn_hash = Hash (Hex txn_hash)} -> - fprintf fmt "Confirm %s" txn_hash | Find {txn_hash = Hash (Hex txn_hash)} -> fprintf fmt "Find %s" txn_hash | Tick _ -> fprintf fmt "Tick" | Clear -> fprintf fmt "Clear" @@ -819,13 +807,6 @@ module Handlers = struct else return (Error "Transaction limit was reached. Transaction is rejected.") - | Confirm {txn_hash} -> ( - protect @@ fun () -> - match Pending_transactions.pop state.pending txn_hash with - | Some {pending_callback; _} -> - let*! () = pending_callback `Confirmed in - return_unit - | None -> return_unit) | Find {txn_hash} -> protect @@ fun () -> return @@ Transaction_objects.find state.tx_object txn_hash @@ -1064,9 +1045,6 @@ let inject ?(callback = fun _ -> Lwt_syntax.return_unit) ~next_nonce (Inject {next_nonce; payload = txn; tx_object; callback}) |> handle_request_error -let confirm txn_hash = - bind_worker @@ fun w -> push_request w (Confirm {txn_hash}) - let start ~config ~keep_alive () = let open Lwt_result_syntax in let* worker = Worker.launch table () {config; keep_alive} (module Handlers) in diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index c3cd244f4e68..3e74ba4d5c51 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -43,36 +43,6 @@ val shutdown : unit -> unit tzresult Lwt.t (** [clear ()] removes the tx queue data but keeps the allocated space *) val clear : unit -> unit tzresult Lwt.t -(** [inject ?callback ~next_nonce tx_object raw_txn] pushes the - transaction [raw_txn] to the worker queue. - - The [tx_object] is stored until the transaction is confirmed or - dropped, so the transaction can be retrieved with [find]. - - [next_nonce] is the next nonce expected by the kernel for the - address [tx_object.from]. [next_nonce] must always be increasing - for any given [tx_object.from], else if the tx_queue already - contains a transaction for [tx_object.from] it will raise an error - for that request. The increasing order of [next_nonce] is enforced - by the kernel execution where transaction must be executed in - order. - - Any transaction added in the tx_queue via {!inject} must be a - valid transaction. In particular the nonce of the transaction must - be valid, i.e. it must be greater or equal to [next_nonce]. This - is validated by {!Validate.is_tx_valid}. - - {b Note:} The promise will be sleeping until at least {!start} is called. *) -val inject : - ?callback:callback -> - next_nonce:Ethereum_types.quantity -> - Ethereum_types.legacy_transaction_object -> - Ethereum_types.hex -> - (unit, string) result tzresult Lwt.t - -(** [confirm hash] is to be called by an external component to advertise a - transaction has been included in a blueprint. *) -val confirm : Ethereum_types.hash -> unit tzresult Lwt.t (** Trigger a tick in the [Tx_queue]. *) val tick : evm_node_endpoint:endpoint -> unit tzresult Lwt.t @@ -83,20 +53,6 @@ val tick : evm_node_endpoint:endpoint -> unit tzresult Lwt.t val beacon : evm_node_endpoint:endpoint -> tick_interval:float -> unit tzresult Lwt.t -(** [find hash] returns the transaction associated with that hash if - it's found in the tx_queue. *) -val find : - Ethereum_types.hash -> - Ethereum_types.legacy_transaction_object option tzresult Lwt.t - -(** [nonce ~next_nonce address] returns the first gap in the tx queue - for [address], or [next_nonce] if no transaction for [address] are - found. *) -val nonce : - next_nonce:Ethereum_types.quantity -> - Ethereum_types.address -> - Ethereum_types.quantity tzresult Lwt.t - (** [lock_transactions] locks the transactions in the queue, new transactions can be added but nothing can be retrieved with {!pop_transactions}. *) @@ -109,12 +65,6 @@ val unlock_transactions : unit -> unit tzresult Lwt.t (** [is_locked] checks if the queue is locked. *) val is_locked : unit -> bool tzresult Lwt.t -(** [content ()] returns the queued and pending transactions of the - tx_queue mapped into a tx_pool to mimic - {!Tx_pool.get_tx_pool_content}. Semantics of pending and queued - are not equal to {!Tx_pool.get_tx_pool_content} *) -val content : unit -> Ethereum_types.txpool tzresult Lwt.t - (** [pop_transactions ~validate_tx ~initial_validation_state] pops as many transactions as possible from the queue, validating them with [validate_tx]. If [validate_tx] returns [`Keep validation_state] diff --git a/etherlink/fa-bridge-watchtower/etherlink_monitor.ml b/etherlink/fa-bridge-watchtower/etherlink_monitor.ml index 6aa747f61213..062685198270 100644 --- a/etherlink/fa-bridge-watchtower/etherlink_monitor.ml +++ b/etherlink/fa-bridge-watchtower/etherlink_monitor.ml @@ -59,12 +59,19 @@ module Tx_queue = struct (* as found in etherlink/bin_floodgate/tx_queue.ml *) let transfer ctx ?to_ ?(value = Z.zero) ~nonce ~data () = + let open Lwt_result_syntax in let txn = Craft.transfer ctx ~nonce ?to_ ~value ~data () in let tx_raw = Ethereum_types.hex_to_bytes txn in let hash = Ethereum_types.hash_raw_tx tx_raw in let**? tx = Transaction.decode tx_raw in let**? tx_object = Transaction.to_transaction_object ~hash tx in - inject tx_object txn ~next_nonce:(Ethereum_types.Qty nonce) + let+ res = + Tx_container.add + tx_object + ~raw_tx:txn + ~next_nonce:(Ethereum_types.Qty nonce) + in + match res with Ok _hash -> Ok () | Error _ as res -> res end module Contract = Tezos_raw_protocol_alpha.Alpha_context.Contract @@ -716,7 +723,11 @@ let handle_confirmed_txs {db; ws_client; _} exec.blockNumber ) in let* () = Db.Deposits.set_claimed db nonce exec in - let* () = Tx_queue.confirm tx_hash in + let* () = + Tx_queue.confirm_transactions + ~clear_pending_queue_after:false + ~confirmed_txs:(Seq.cons tx_hash Seq.empty) + in return_unit | Some {status; _} -> let*! () = -- GitLab From 5419a2e67565b9a4a3108df2d8e7b2c60c58efe4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Wed, 30 Apr 2025 11:50:10 +0200 Subject: [PATCH 2/9] EVM node/Tx_pool: remove dead code --- etherlink/bin_node/lib_dev/tx_pool.mli | 30 +++----------------------- 1 file changed, 3 insertions(+), 27 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_pool.mli b/etherlink/bin_node/lib_dev/tx_pool.mli index f5c47df8c24b..88e27a3194b3 100644 --- a/etherlink/bin_node/lib_dev/tx_pool.mli +++ b/etherlink/bin_node/lib_dev/tx_pool.mli @@ -29,24 +29,9 @@ val start : parameters -> unit tzresult Lwt.t to be processed. *) val shutdown : unit -> unit tzresult Lwt.t -(** [add transaction_object raw_tx] adds a eth transaction and its raw contents - to the tx-pool. - - The consistency between [transaction_object] and [raw_tx] is assumed by - [add]. It is the responsibility of the caller to enforce it. *) -val add : - Ethereum_types.legacy_transaction_object -> - string -> - (Ethereum_types.hash, string) result tzresult Lwt.t - -(** [nonce address] returns the nonce of the user - Returns the first gap in the tx-pool, or the nonce stored on the rollup - if no transactions are in the pool. *) -val nonce : Ethereum_types.Address.t -> Ethereum_types.quantity tzresult Lwt.t - -(** [pop_transactions chain_family maximum_cumulative_size] pops as much - valid transactions as possible from the pool, until their cumulative - size exceeds `maximum_cumulative_size`. If the pool is locked or node +(** [pop_transactions chain_family maximum_cumulative_size] pops as much + valid transactions as possible from the pool, until their cumulative + size exceeds `maximum_cumulative_size`. If the pool is locked or node in tezlink mode, returns no transactions. *) val pop_transactions : chain_family:L2_types.chain_family -> @@ -76,15 +61,6 @@ val is_locked : unit -> bool tzresult Lwt.t val size_info : unit -> Metrics.Tx_pool.size_info tzresult Lwt.t -val get_tx_pool_content : unit -> Ethereum_types.txpool tzresult Lwt.t - -(** [find tx_hash] look into the tx pool if a transaction with hash - [tx_hash] exists and returns it's corresponding - {!Ethereum_types.transaction_object}. *) -val find : - Ethereum_types.hash -> - Ethereum_types.legacy_transaction_object option tzresult Lwt.t - val clear_popped_transactions : unit -> unit tzresult Lwt.t (** [mode] retrieves the current pool mode *) -- GitLab From 90dd137acf7a96d3630c179e21e9aa4acfd8ddc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Fri, 2 May 2025 17:19:41 +0200 Subject: [PATCH 3/9] EVM node/Services_backend_sig: move Tx_container below S In a future commit, we will need to extend this module type with a function taking a bakend of type S as argument. --- .../bin_node/lib_dev/services_backend_sig.ml | 61 +++++++++---------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/etherlink/bin_node/lib_dev/services_backend_sig.ml b/etherlink/bin_node/lib_dev/services_backend_sig.ml index 76e9a56c936c..bdfb566c723c 100644 --- a/etherlink/bin_node/lib_dev/services_backend_sig.ml +++ b/etherlink/bin_node/lib_dev/services_backend_sig.ml @@ -4,37 +4,6 @@ (* Copyright (c) 2023 Nomadic Labs *) (* *) (*****************************************************************************) - -(** [Tx_container] is the signature of the module that deals with - storing and forwarding transactions. the module type is used by - {!Services.dispatch_request} to request informations about pending - transactions. *) -module type Tx_container = sig - (** [nonce ~next_nonce address] must returns the next gap nonce - available. *) - val nonce : - next_nonce:Ethereum_types.quantity -> - Ethereum_types.address -> - Ethereum_types.quantity tzresult Lwt.t - - (** [add ~next_nonce tx_object raw_tx] returns the next gap nonce - available based on the pending transaction of the tx_container. - [next_nonce] is the next expected nonce found in the backend. *) - val add : - next_nonce:Ethereum_types.quantity -> - Ethereum_types.legacy_transaction_object -> - raw_tx:Ethereum_types.hex -> - (Ethereum_types.hash, string) result tzresult Lwt.t - - (** [find hash] returns the transaction_object found in tx - container. *) - val find : Ethereum_types.hash -> Transaction_object.t option tzresult Lwt.t - - (** [content ()] returns all the transactions found in tx - container. *) - val content : unit -> Ethereum_types.txpool tzresult Lwt.t -end - module type S = sig module Reader : Durable_storage.READER @@ -221,3 +190,33 @@ module Make (Backend : Backend) (Executor : Evm_execution.S) : S = struct let l2_levels_of_l1_level = Backend.l2_levels_of_l1_level end + +(** [Tx_container] is the signature of the module that deals with + storing and forwarding transactions. the module type is used by + {!Services.dispatch_request} to request informations about pending + transactions. *) +module type Tx_container = sig + (** [nonce ~next_nonce address] must returns the next gap nonce + available. *) + val nonce : + next_nonce:Ethereum_types.quantity -> + Ethereum_types.address -> + Ethereum_types.quantity tzresult Lwt.t + + (** [add ~next_nonce tx_object raw_tx] returns the next gap nonce + available based on the pending transaction of the tx_container. + [next_nonce] is the next expected nonce found in the backend. *) + val add : + next_nonce:Ethereum_types.quantity -> + Ethereum_types.legacy_transaction_object -> + raw_tx:Ethereum_types.hex -> + (Ethereum_types.hash, string) result tzresult Lwt.t + + (** [find hash] returns the transaction_object found in tx + container. *) + val find : Ethereum_types.hash -> Transaction_object.t option tzresult Lwt.t + + (** [content ()] returns all the transactions found in tx + container. *) + val content : unit -> Ethereum_types.txpool tzresult Lwt.t +end -- GitLab From 7a12fa1d9610998a90d2b859bdcca5290d8a7b08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Mon, 12 May 2025 15:36:10 +0200 Subject: [PATCH 4/9] EVM node/Tx_queue: move inject to Tx_container --- etherlink/bin_node/lib_dev/tx_queue.ml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index db4a449b2255..9ed6e1eda6a0 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -1036,15 +1036,6 @@ let beacon ~evm_node_endpoint ~tick_interval = in loop () -let inject ?(callback = fun _ -> Lwt_syntax.return_unit) ~next_nonce - (tx_object : Ethereum_types.legacy_transaction_object) txn = - let open Lwt_syntax in - let* worker = worker_promise in - Worker.Queue.push_request_and_wait - worker - (Inject {next_nonce; payload = txn; tx_object; callback}) - |> handle_request_error - let start ~config ~keep_alive () = let open Lwt_result_syntax in let* worker = Worker.launch table () {config; keep_alive} (module Handlers) in @@ -1115,6 +1106,15 @@ end module Tx_container = struct let nonce = nonce + let inject ?(callback = fun _ -> Lwt_syntax.return_unit) ~next_nonce + (tx_object : Ethereum_types.legacy_transaction_object) txn = + let open Lwt_syntax in + let* worker = worker_promise in + Worker.Queue.push_request_and_wait + worker + (Inject {next_nonce; payload = txn; tx_object; callback}) + |> handle_request_error + let add ~next_nonce tx_object ~raw_tx = let open Lwt_result_syntax in let* res = inject ~next_nonce tx_object raw_tx in -- GitLab From be541a59f00a8f2c550bc42bcc1d03fa666eb97c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Mon, 12 May 2025 15:39:17 +0200 Subject: [PATCH 5/9] EVM node/Tx_queue: move find to Tx_container --- etherlink/bin_node/lib_dev/tx_queue.ml | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 9ed6e1eda6a0..fd8a705630b2 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -1043,11 +1043,6 @@ let start ~config ~keep_alive () = let*! () = Tx_queue_events.is_ready () in return_unit -let find txn_hash = - let open Lwt_result_syntax in - let*? w = Lazy.force worker in - Worker.Queue.push_request_and_wait w (Find {txn_hash}) |> handle_request_error - let clear () = let open Lwt_result_syntax in let*? w = Lazy.force worker in @@ -1122,9 +1117,13 @@ module Tx_container = struct | Ok () -> return (Ok tx_object.hash) | Error errs -> return (Error errs) - let find hash = + let find txn_hash = let open Lwt_result_syntax in - let* legacy_tx_object = find hash in + let* legacy_tx_object = + let*? w = Lazy.force worker in + Worker.Queue.push_request_and_wait w (Find {txn_hash}) + |> handle_request_error + in (* TODO: https://gitlab.com/tezos/tezos/-/issues/7747 We should instrument the TX queue to return the real transaction objects. *) -- GitLab From 37d824e8e031aa3c2ebe033806eb32dd3a0b8f9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Fri, 2 May 2025 18:29:42 +0200 Subject: [PATCH 6/9] EVM node/Tx_queue: move nonce to Tx_container --- etherlink/bin_node/lib_dev/tx_queue.ml | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index fd8a705630b2..15788921a887 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -1048,12 +1048,6 @@ let clear () = let*? w = Lazy.force worker in Worker.Queue.push_request_and_wait w Clear |> handle_request_error -let nonce ~next_nonce address = - let open Lwt_result_syntax in - let*? w = Lazy.force worker in - Worker.Queue.push_request_and_wait w (Nonce {next_nonce; address}) - |> handle_request_error - let content () = let open Lwt_result_syntax in let*? w = Lazy.force worker in @@ -1099,7 +1093,11 @@ module Internal_for_tests = struct end module Tx_container = struct - let nonce = nonce + let nonce ~next_nonce address = + let open Lwt_result_syntax in + let*? w = Lazy.force worker in + Worker.Queue.push_request_and_wait w (Nonce {next_nonce; address}) + |> handle_request_error let inject ?(callback = fun _ -> Lwt_syntax.return_unit) ~next_nonce (tx_object : Ethereum_types.legacy_transaction_object) txn = -- GitLab From 510815dc901173e59edad1a1520b443834b6720f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Fri, 2 May 2025 18:29:42 +0200 Subject: [PATCH 7/9] EVM node/Tx_queue: move content to Tx_container --- etherlink/bin_node/lib_dev/tx_queue.ml | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 15788921a887..cbbf27fd2120 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -1048,11 +1048,6 @@ let clear () = let*? w = Lazy.force worker in Worker.Queue.push_request_and_wait w Clear |> handle_request_error -let content () = - let open Lwt_result_syntax in - let*? w = Lazy.force worker in - Worker.Queue.push_request_and_wait w Content |> handle_request_error - let shutdown () = let open Lwt_result_syntax in bind_worker @@ fun w -> @@ -1130,5 +1125,8 @@ module Tx_container = struct Transaction_object.from_store_transaction_object legacy_tx_object) - let content = content + let content () = + let open Lwt_result_syntax in + let*? w = Lazy.force worker in + Worker.Queue.push_request_and_wait w Content |> handle_request_error end -- GitLab From 4f8f3281fcd6900d2739877c2b7efe371c19b3fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Tue, 29 Apr 2025 20:35:45 +0200 Subject: [PATCH 8/9] EVM node/Tx_queue + pool: move shutdown to Tx_container --- etherlink/bin_node/lib_dev/observer.ml | 9 ++++++--- etherlink/bin_node/lib_dev/proxy.ml | 9 ++++++--- etherlink/bin_node/lib_dev/rpc.ml | 9 ++++++--- etherlink/bin_node/lib_dev/sequencer.ml | 6 ++++-- etherlink/bin_node/lib_dev/services_backend_sig.ml | 4 ++++ etherlink/bin_node/lib_dev/tx_pool.ml | 14 +++++++------- etherlink/bin_node/lib_dev/tx_pool.mli | 4 ---- etherlink/bin_node/lib_dev/tx_queue.ml | 14 +++++++------- etherlink/bin_node/lib_dev/tx_queue.mli | 4 ---- 9 files changed, 40 insertions(+), 33 deletions(-) diff --git a/etherlink/bin_node/lib_dev/observer.ml b/etherlink/bin_node/lib_dev/observer.ml index b3b8f3c95245..458e8807cce9 100644 --- a/etherlink/bin_node/lib_dev/observer.ml +++ b/etherlink/bin_node/lib_dev/observer.ml @@ -99,7 +99,8 @@ let on_new_blueprint config evm_node_endpoint next_blueprint_number return (`Restart_from next_blueprint_number) let install_finalizer_observer ~rollup_node_tracking finalizer_public_server - finalizer_private_server finalizer_rpc_process = + finalizer_private_server finalizer_rpc_process + (module Tx_container : Services_backend_sig.Tx_container) = let open Lwt_syntax in Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun exit_status -> let* () = Events.shutdown_node ~exit_status in @@ -108,8 +109,7 @@ let install_finalizer_observer ~rollup_node_tracking finalizer_public_server let* () = Option.iter_s (fun f -> f ()) finalizer_rpc_process in Misc.unwrap_error_monad @@ fun () -> let open Lwt_result_syntax in - let* () = Tx_pool.shutdown () in - let* () = Tx_queue.shutdown () in + let* () = Tx_container.shutdown () in let* () = Evm_context.shutdown () in when_ rollup_node_tracking @@ fun () -> Evm_events_follower.shutdown () @@ -128,6 +128,8 @@ let container_forward_tx ~keep_alive ~evm_node_endpoint : let content () = Lwt_result.return {pending = AddressMap.empty; queued = AddressMap.empty} + + let shutdown () = Lwt_result_syntax.return_unit end) let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync @@ -330,6 +332,7 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync finalizer_public_server finalizer_private_server finalizer_rpc_process + tx_container in let*! next_blueprint_number = Evm_context.next_blueprint_number () in diff --git a/etherlink/bin_node/lib_dev/proxy.ml b/etherlink/bin_node/lib_dev/proxy.ml index b8767934f319..d567623a5721 100644 --- a/etherlink/bin_node/lib_dev/proxy.ml +++ b/etherlink/bin_node/lib_dev/proxy.ml @@ -7,14 +7,15 @@ (* *) (*****************************************************************************) -let install_finalizer server_finalizer = +let install_finalizer server_finalizer + (module Tx_container : Services_backend_sig.Tx_container) = let open Lwt_syntax in Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun exit_status -> let* () = Events.shutdown_node ~exit_status in let* () = server_finalizer () in Misc.unwrap_error_monad @@ fun () -> let open Lwt_result_syntax in - let* () = Tx_pool.shutdown () in + let* () = Tx_container.shutdown () in Evm_context.shutdown () let container_forward_tx ~evm_node_endpoint ~keep_alive : @@ -39,6 +40,8 @@ let container_forward_tx ~evm_node_endpoint ~keep_alive : let content () = Lwt_result.return Ethereum_types.{pending = AddressMap.empty; queued = AddressMap.empty} + + let shutdown () = Lwt_result_syntax.return_unit end) let tx_queue_pop_and_inject (module Rollup_node_rpc : Services_backend_sig.S) @@ -222,7 +225,7 @@ let main ((module Rollup_node_rpc), smart_rollup_address) in let (_ : Lwt_exit.clean_up_callback_id) = - install_finalizer server_finalizer + install_finalizer server_finalizer tx_container in let wait, _resolve = Lwt.wait () in let* () = wait in diff --git a/etherlink/bin_node/lib_dev/rpc.ml b/etherlink/bin_node/lib_dev/rpc.ml index 19abe2a8d68a..c4d7fc3df763 100644 --- a/etherlink/bin_node/lib_dev/rpc.ml +++ b/etherlink/bin_node/lib_dev/rpc.ml @@ -34,12 +34,13 @@ let spawn_main ~exposed_port ~protected_endpoint ?private_endpoint ~data_dir () let finalizer () = Lwt.return process#terminate in finalizer -let install_finalizer_rpc server_public_finalizer = +let install_finalizer_rpc server_public_finalizer + (module Tx_container : Services_backend_sig.Tx_container) = let open Lwt_syntax in Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun exit_status -> let* () = Events.shutdown_node ~exit_status in let* () = server_public_finalizer () in - Misc.unwrap_error_monad @@ fun () -> Tx_pool.shutdown () + Misc.unwrap_error_monad @@ fun () -> Tx_container.shutdown () let set_metrics_level (ctxt : Evm_ro_context.t) = let open Lwt_result_syntax in @@ -132,6 +133,8 @@ let container_forward_request ~public_endpoint ~private_endpoint ~keep_alive : let content () = Lwt_result.return Ethereum_types.{pending = AddressMap.empty; queued = AddressMap.empty} + + let shutdown () = Lwt_result_syntax.return_unit end) let main ~data_dir ~evm_node_endpoint ?evm_node_private_endpoint @@ -255,7 +258,7 @@ let main ~data_dir ~evm_node_endpoint ?evm_node_private_endpoint in let (_ : Lwt_exit.clean_up_callback_id) = - install_finalizer_rpc server_public_finalizer + install_finalizer_rpc server_public_finalizer tx_container in let* () = diff --git a/etherlink/bin_node/lib_dev/sequencer.ml b/etherlink/bin_node/lib_dev/sequencer.ml index bc905704665b..1a2549b949a4 100644 --- a/etherlink/bin_node/lib_dev/sequencer.ml +++ b/etherlink/bin_node/lib_dev/sequencer.ml @@ -18,7 +18,8 @@ type sandbox_config = { } let install_finalizer_seq server_public_finalizer server_private_finalizer - finalizer_rpc_process = + finalizer_rpc_process + (module Tx_container : Services_backend_sig.Tx_container) = let open Lwt_syntax in Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun exit_status -> let* () = Events.shutdown_node ~exit_status in @@ -27,7 +28,7 @@ let install_finalizer_seq server_public_finalizer server_private_finalizer let* () = Option.iter_s (fun f -> f ()) finalizer_rpc_process in Misc.unwrap_error_monad @@ fun () -> let open Lwt_result_syntax in - let* () = Tx_pool.shutdown () in + let* () = Tx_container.shutdown () in let* () = Evm_events_follower.shutdown () in let* () = Blueprints_publisher.shutdown () in let* () = Signals_publisher.shutdown () in @@ -416,6 +417,7 @@ let main ~data_dir ?(genesis_timestamp = Misc.now ()) ~cctxt finalizer_public_server finalizer_private_server finalizer_rpc_process + tx_container in let* () = loop_sequencer diff --git a/etherlink/bin_node/lib_dev/services_backend_sig.ml b/etherlink/bin_node/lib_dev/services_backend_sig.ml index bdfb566c723c..8657e9535dae 100644 --- a/etherlink/bin_node/lib_dev/services_backend_sig.ml +++ b/etherlink/bin_node/lib_dev/services_backend_sig.ml @@ -219,4 +219,8 @@ module type Tx_container = sig (** [content ()] returns all the transactions found in tx container. *) val content : unit -> Ethereum_types.txpool tzresult Lwt.t + + (** [shutdown ()] stops the tx container, waiting for the ongoing request + to be processed. *) + val shutdown : unit -> unit tzresult Lwt.t end diff --git a/etherlink/bin_node/lib_dev/tx_pool.ml b/etherlink/bin_node/lib_dev/tx_pool.ml index 7a1c86a4a9f3..9ebeb30a2997 100644 --- a/etherlink/bin_node/lib_dev/tx_pool.ml +++ b/etherlink/bin_node/lib_dev/tx_pool.ml @@ -810,13 +810,6 @@ let start parameters = let+ worker = Worker.launch table () parameters (module Handlers) in Lwt.wakeup worker_waker worker -let shutdown () = - let open Lwt_result_syntax in - bind_worker @@ fun w -> - let*! () = Tx_pool_events.shutdown () in - let*! () = Worker.shutdown w in - return_unit - let add transaction_object raw_tx = let open Lwt_result_syntax in let*? w = Lazy.force worker in @@ -970,4 +963,11 @@ module Tx_container = struct legacy_tx_object) let content = get_tx_pool_content + + let shutdown () = + let open Lwt_result_syntax in + bind_worker @@ fun w -> + let*! () = Tx_pool_events.shutdown () in + let*! () = Worker.shutdown w in + return_unit end diff --git a/etherlink/bin_node/lib_dev/tx_pool.mli b/etherlink/bin_node/lib_dev/tx_pool.mli index 88e27a3194b3..c5e6fd48fb30 100644 --- a/etherlink/bin_node/lib_dev/tx_pool.mli +++ b/etherlink/bin_node/lib_dev/tx_pool.mli @@ -25,10 +25,6 @@ type parameters = { (** [start parameters] starts the tx-pool *) val start : parameters -> unit tzresult Lwt.t -(** [shutdown ()] stops the tx-pool, waiting for the ongoing request - to be processed. *) -val shutdown : unit -> unit tzresult Lwt.t - (** [pop_transactions chain_family maximum_cumulative_size] pops as much valid transactions as possible from the pool, until their cumulative size exceeds `maximum_cumulative_size`. If the pool is locked or node diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index cbbf27fd2120..70b6d1c15fa2 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -1048,13 +1048,6 @@ let clear () = let*? w = Lazy.force worker in Worker.Queue.push_request_and_wait w Clear |> handle_request_error -let shutdown () = - let open Lwt_result_syntax in - bind_worker @@ fun w -> - let*! () = Tx_queue_events.shutdown () in - let*! () = Worker.shutdown w in - return_unit - let lock_transactions () = bind_worker @@ fun w -> push_request w Lock_transactions @@ -1129,4 +1122,11 @@ module Tx_container = struct let open Lwt_result_syntax in let*? w = Lazy.force worker in Worker.Queue.push_request_and_wait w Content |> handle_request_error + + let shutdown () = + let open Lwt_result_syntax in + bind_worker @@ fun w -> + let*! () = Tx_queue_events.shutdown () in + let*! () = Worker.shutdown w in + return_unit end diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index 3e74ba4d5c51..fb1f9c0d4de6 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -36,10 +36,6 @@ val start : unit -> unit tzresult Lwt.t -(** [shutdown ()] stops the tx queue, waiting for the ongoing request - to be processed. *) -val shutdown : unit -> unit tzresult Lwt.t - (** [clear ()] removes the tx queue data but keeps the allocated space *) val clear : unit -> unit tzresult Lwt.t -- GitLab From 5d98cff611f10eb49649c193277da0615fa4d505 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Tue, 29 Apr 2025 22:37:23 +0200 Subject: [PATCH 9/9] EVM node/Tx_queue + pool: move tick and beacon to Tx_container --- etherlink/bin_node/lib_dev/observer.ml | 14 ++++++---- etherlink/bin_node/lib_dev/proxy.ml | 5 ++++ etherlink/bin_node/lib_dev/rpc.ml | 18 ++++++------ .../bin_node/lib_dev/services_backend_sig.ml | 12 ++++++++ etherlink/bin_node/lib_dev/tx_pool.ml | 5 ++++ etherlink/bin_node/lib_dev/tx_queue.ml | 28 ++++++++++--------- etherlink/bin_node/lib_dev/tx_queue.mli | 13 --------- .../fa-bridge-watchtower/etherlink_monitor.ml | 7 +++-- 8 files changed, 59 insertions(+), 43 deletions(-) diff --git a/etherlink/bin_node/lib_dev/observer.ml b/etherlink/bin_node/lib_dev/observer.ml index 458e8807cce9..dffe7debe29c 100644 --- a/etherlink/bin_node/lib_dev/observer.ml +++ b/etherlink/bin_node/lib_dev/observer.ml @@ -130,6 +130,11 @@ let container_forward_tx ~keep_alive ~evm_node_endpoint : Lwt_result.return {pending = AddressMap.empty; queued = AddressMap.empty} let shutdown () = Lwt_result_syntax.return_unit + + let tx_queue_tick ~evm_node_endpoint:_ = Lwt_result_syntax.return_unit + + let tx_queue_beacon ~evm_node_endpoint:_ ~tick_interval:_ = + Lwt_result_syntax.return_unit end) let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync @@ -352,10 +357,9 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync and* () = Drift_monitor.run ~evm_node_endpoint Evm_context.next_blueprint_number and* () = - if Configuration.is_tx_queue_enabled config then - Tx_queue.beacon - ~evm_node_endpoint:(Rpc evm_node_endpoint) - ~tick_interval:0.05 - else return_unit + let (module Tx_container) = tx_container in + Tx_container.tx_queue_beacon + ~evm_node_endpoint:(Rpc evm_node_endpoint) + ~tick_interval:0.05 in return_unit diff --git a/etherlink/bin_node/lib_dev/proxy.ml b/etherlink/bin_node/lib_dev/proxy.ml index d567623a5721..3e320216173f 100644 --- a/etherlink/bin_node/lib_dev/proxy.ml +++ b/etherlink/bin_node/lib_dev/proxy.ml @@ -42,6 +42,11 @@ let container_forward_tx ~evm_node_endpoint ~keep_alive : Ethereum_types.{pending = AddressMap.empty; queued = AddressMap.empty} let shutdown () = Lwt_result_syntax.return_unit + + let tx_queue_tick ~evm_node_endpoint:_ = Lwt_result_syntax.return_unit + + let tx_queue_beacon ~evm_node_endpoint:_ ~tick_interval:_ = + Lwt_result_syntax.return_unit end) let tx_queue_pop_and_inject (module Rollup_node_rpc : Services_backend_sig.S) diff --git a/etherlink/bin_node/lib_dev/rpc.ml b/etherlink/bin_node/lib_dev/rpc.ml index c4d7fc3df763..2e0edffaebc9 100644 --- a/etherlink/bin_node/lib_dev/rpc.ml +++ b/etherlink/bin_node/lib_dev/rpc.ml @@ -135,6 +135,11 @@ let container_forward_request ~public_endpoint ~private_endpoint ~keep_alive : Ethereum_types.{pending = AddressMap.empty; queued = AddressMap.empty} let shutdown () = Lwt_result_syntax.return_unit + + let tx_queue_tick ~evm_node_endpoint:_ = Lwt_result_syntax.return_unit + + let tx_queue_beacon ~evm_node_endpoint:_ ~tick_interval:_ = + Lwt_result_syntax.return_unit end) let main ~data_dir ~evm_node_endpoint ?evm_node_private_endpoint @@ -268,15 +273,10 @@ let main ~data_dir ~evm_node_endpoint ?evm_node_private_endpoint let* next_blueprint_number = Evm_ro_context.next_blueprint_number ctxt in let* () = - if - Configuration.is_tx_queue_enabled config - && Option.is_none evm_node_private_endpoint - (* Only start the beacon when the tx_queue is started. *) - then - Tx_queue.beacon - ~evm_node_endpoint:(Rpc evm_node_endpoint) - ~tick_interval:0.05 - else return_unit + let (module Tx_container) = tx_container in + Tx_container.tx_queue_beacon + ~evm_node_endpoint:(Rpc evm_node_endpoint) + ~tick_interval:0.05 and* () = Blueprints_follower.start ~ping_tx_pool diff --git a/etherlink/bin_node/lib_dev/services_backend_sig.ml b/etherlink/bin_node/lib_dev/services_backend_sig.ml index 8657e9535dae..f0536d9530a8 100644 --- a/etherlink/bin_node/lib_dev/services_backend_sig.ml +++ b/etherlink/bin_node/lib_dev/services_backend_sig.ml @@ -191,6 +191,9 @@ module Make (Backend : Backend) (Executor : Evm_execution.S) : S = struct let l2_levels_of_l1_level = Backend.l2_levels_of_l1_level end +(** Inject transactions with either RPCs or on a websocket connection. *) +type endpoint = Rpc of Uri.t | Websocket of Websocket_client.t + (** [Tx_container] is the signature of the module that deals with storing and forwarding transactions. the module type is used by {!Services.dispatch_request} to request informations about pending @@ -223,4 +226,13 @@ module type Tx_container = sig (** [shutdown ()] stops the tx container, waiting for the ongoing request to be processed. *) val shutdown : unit -> unit tzresult Lwt.t + + (** Trigger a tick in the [Tx_queue]. *) + val tx_queue_tick : evm_node_endpoint:endpoint -> unit tzresult Lwt.t + + (** [tx_queue_beacon ~evm_node_endpoint ~tick_interval] is a never fulfilled + promise which triggers a tick in the [Tx_queue] every + [tick_interval] seconds. *) + val tx_queue_beacon : + evm_node_endpoint:endpoint -> tick_interval:float -> unit tzresult Lwt.t end diff --git a/etherlink/bin_node/lib_dev/tx_pool.ml b/etherlink/bin_node/lib_dev/tx_pool.ml index 9ebeb30a2997..5476e880bbf1 100644 --- a/etherlink/bin_node/lib_dev/tx_pool.ml +++ b/etherlink/bin_node/lib_dev/tx_pool.ml @@ -970,4 +970,9 @@ module Tx_container = struct let*! () = Tx_pool_events.shutdown () in let*! () = Worker.shutdown w in return_unit + + let tx_queue_tick ~evm_node_endpoint:_ = Lwt_result_syntax.return_unit + + let tx_queue_beacon ~evm_node_endpoint:_ ~tick_interval:_ = + Lwt_result_syntax.return_unit end diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 70b6d1c15fa2..5f56d2d17856 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -46,7 +46,9 @@ type request = { } (** Inject transactions with either RPCs or on a websocket connection. *) -type endpoint = Rpc of Uri.t | Websocket of Websocket_client.t +type endpoint = Services_backend_sig.endpoint = + | Rpc of Uri.t + | Websocket of Websocket_client.t (** [Nonce_bitset] registers known nonces from transactions that went through the tx_queue from a specific sender address. With this @@ -1024,18 +1026,6 @@ let push_request worker request = let*! (pushed : bool) = Worker.Queue.push_request worker request in if not pushed then tzfail Tx_queue_is_closed else return_unit -let tick ~evm_node_endpoint = - bind_worker @@ fun w -> push_request w (Tick {evm_node_endpoint}) - -let beacon ~evm_node_endpoint ~tick_interval = - let open Lwt_result_syntax in - let rec loop () = - let* () = tick ~evm_node_endpoint in - let*! () = Lwt_unix.sleep tick_interval in - loop () - in - loop () - let start ~config ~keep_alive () = let open Lwt_result_syntax in let* worker = Worker.launch table () {config; keep_alive} (module Handlers) in @@ -1129,4 +1119,16 @@ module Tx_container = struct let*! () = Tx_queue_events.shutdown () in let*! () = Worker.shutdown w in return_unit + + let tx_queue_tick ~evm_node_endpoint = + bind_worker @@ fun w -> push_request w (Tick {evm_node_endpoint}) + + let tx_queue_beacon ~evm_node_endpoint ~tick_interval = + let open Lwt_result_syntax in + let rec loop () = + let* () = tx_queue_tick ~evm_node_endpoint in + let*! () = Lwt_unix.sleep tick_interval in + loop () + in + loop () end diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index fb1f9c0d4de6..b94997c7446b 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -24,9 +24,6 @@ [callback] is called with [`Dropped].}} *) type callback = [`Accepted | `Confirmed | `Dropped | `Refused] -> unit Lwt.t -(** Inject transactions with either RPCs or on a websocket connection. *) -type endpoint = Rpc of Uri.t | Websocket of Websocket_client.t - (** [start ~config ~max_transaction_batch_length ()] starts the worker, meaning it is possible to call {!inject}, {!confirm} and {!beacon}. *) @@ -39,16 +36,6 @@ val start : (** [clear ()] removes the tx queue data but keeps the allocated space *) val clear : unit -> unit tzresult Lwt.t - -(** Trigger a tick in the [Tx_queue]. *) -val tick : evm_node_endpoint:endpoint -> unit tzresult Lwt.t - -(** [beacon ~evm_node_endpoint ~tick_interval] is a never fulfilled - promise which triggers a tick in the [Tx_queue] every - [tick_interval] seconds. *) -val beacon : - evm_node_endpoint:endpoint -> tick_interval:float -> unit tzresult Lwt.t - (** [lock_transactions] locks the transactions in the queue, new transactions can be added but nothing can be retrieved with {!pop_transactions}. *) diff --git a/etherlink/fa-bridge-watchtower/etherlink_monitor.ml b/etherlink/fa-bridge-watchtower/etherlink_monitor.ml index 062685198270..d9cca8fe8812 100644 --- a/etherlink/fa-bridge-watchtower/etherlink_monitor.ml +++ b/etherlink/fa-bridge-watchtower/etherlink_monitor.ml @@ -862,7 +862,7 @@ end let start db ~config ~notify_ws_change ~first_block = let open Lwt_result_syntax in let evm_node_endpoint = config.Config.evm_node_endpoint in - let tx_queue_endpoint = ref (Tx_queue.Rpc evm_node_endpoint) in + let tx_queue_endpoint = ref (Services_backend_sig.Rpc evm_node_endpoint) in let run () = let*! ws_client = Websocket_client.connect @@ -870,7 +870,7 @@ let start db ~config ~notify_ws_change ~first_block = Media_type.json (Uri.with_path evm_node_endpoint (Uri.path evm_node_endpoint ^ "/ws")) in - tx_queue_endpoint := Tx_queue.Websocket ws_client ; + tx_queue_endpoint := Services_backend_sig.Websocket ws_client ; notify_ws_change ws_client ; let* () = init_db_pointers db ws_client ~first_block in let* chain_id = get_chain_id ws_client in @@ -904,7 +904,8 @@ let start db ~config ~notify_ws_change ~first_block = let rec tx_queue_beacon () = let open Lwt_syntax in let* res = - protect @@ fun () -> Tx_queue.tick ~evm_node_endpoint:!tx_queue_endpoint + protect @@ fun () -> + Tx_queue.Tx_container.tx_queue_tick ~evm_node_endpoint:!tx_queue_endpoint in let* () = match res with -- GitLab