From d47e3bac31a15ba41056e4132cd6f63ee1f17c4a Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Wed, 26 Feb 2025 11:50:11 +0100 Subject: [PATCH 1/2] evm/node: split callback type of tx_queue --- etherlink/bin_node/lib_dev/tx_queue.ml | 70 +++++++++++++++++--------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 8a8960e5c5db..d1b0ad02ca32 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -12,24 +12,48 @@ let two_seconds = Ptime.Span.of_int_s 2 type parameters = {evm_node_endpoint : Uri.t; config : Configuration.tx_queue} -type callback = - [`Accepted of Ethereum_types.hash | `Confirmed | `Dropped | `Refused] -> - unit Lwt.t +type queue_variant = [`Accepted of Ethereum_types.hash | `Refused] -type request = {payload : Ethereum_types.hex; callback : callback} +type pending_variant = [`Confirmed | `Dropped] + +type all_variant = [queue_variant | pending_variant] + +type 'a variant_callback = 'a -> unit Lwt.t + +(** tx is in the queue and wait to be injected into the upstream + node. *) +type queue_request = { + payload : Ethereum_types.hex; (** payload of the transaction *) + queue_callback : queue_variant variant_callback; + (** callback to call with the response given by the upstream + node. *) +} -type pending = {callback : callback; since : Ptime.t} +(** tx have been forwarded to the upstream node, now it's pending until confirmed. *) +type pending_request = { + since : Time.System.t; + (** time when the transaction was injected into the upstream node. *) + pending_callback : pending_variant variant_callback; + (** callback to call when the pending transaction have been confirmed or is dropped. *) +} + +type callback = all_variant variant_callback + +type request = {payload : Ethereum_types.hex; callback : callback} module Pending_transactions = struct open Ethereum_types module S = String.Hashtbl - type t = pending S.t + type t = pending_request S.t let empty ~start_size = S.create start_size - let add htbl (Hash (Hex hash)) callback = - S.add htbl hash ({callback; since = Time.System.now ()} : pending) + let add htbl (Hash (Hex hash)) pending_callback = + S.add + htbl + hash + ({pending_callback; since = Time.System.now ()} : pending_request) let pop htbl (Hash (Hex hash)) = match S.find htbl hash with @@ -54,7 +78,7 @@ end type state = { evm_node_endpoint : Uri.t; - mutable queue : request Queue.t; + mutable queue : queue_request Queue.t; pending : Pending_transactions.t; config : Configuration.tx_queue; } @@ -79,11 +103,7 @@ end module Request = struct type ('a, 'b) t = - | Inject : { - payload : Ethereum_types.hex; - callback : callback; - } - -> (unit, tztrace) t + | Inject : request -> (unit, tztrace) t | Confirm : {txn_hash : Ethereum_types.hash} -> (unit, tztrace) t | Tick : (unit, tztrace) t @@ -145,7 +165,7 @@ let send_transactions_batch ~evm_node_endpoint transactions = else let rev_batch, callbacks = Seq.fold_left - (fun (rev_batch, callbacks) {payload; callback} -> + (fun (rev_batch, callbacks) {payload; queue_callback} -> let req_id = Uuidm.(v4_gen uuid_seed () |> to_string ~upper:false) in let txn = Rpc_encodings.JSONRPC. @@ -157,7 +177,7 @@ let send_transactions_batch ~evm_node_endpoint transactions = } in - (txn :: rev_batch, M.add req_id callback callbacks)) + (txn :: rev_batch, M.add req_id queue_callback callbacks)) ([], M.empty) transactions in @@ -221,19 +241,20 @@ module Handlers = struct let state = Worker.state self in match request with | Inject {payload; callback} -> - let instrumented_callback reason = + let queue_callback reason = (match reason with | `Accepted hash -> - Pending_transactions.add state.pending hash callback - | _ -> ()) ; - callback reason + Pending_transactions.add state.pending hash (fun reason -> + callback (reason :> all_variant)) + | `Refused -> ()) ; + callback (reason :> all_variant) in - Queue.add {payload; callback = instrumented_callback} state.queue ; + Queue.add {payload; queue_callback} state.queue ; return_unit | Confirm {txn_hash} -> ( match Pending_transactions.pop state.pending txn_hash with - | Some {callback; _} -> - Lwt.async (fun () -> callback `Confirmed) ; + | Some {pending_callback; _} -> + Lwt.async (fun () -> pending_callback `Confirmed) ; return_unit | None -> return_unit) | Tick -> @@ -270,7 +291,8 @@ module Handlers = struct let txns = Pending_transactions.drop state.pending in List.iter - (fun {callback; _} -> Lwt.async (fun () -> callback `Dropped)) + (fun {pending_callback; _} -> + Lwt.async (fun () -> pending_callback `Dropped)) txns type launch_error = tztrace -- GitLab From 4a961d07b5393c3da225413c573ff3c99403dfc5 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Tue, 18 Feb 2025 17:38:08 +0100 Subject: [PATCH 2/2] evm/node: tx_queue also store the transaction object --- etherlink/CHANGES_NODE.md | 5 ++ etherlink/bin_node/lib_dev/services.ml | 6 +- etherlink/bin_node/lib_dev/tx_queue.ml | 84 +++++++++++++++---- etherlink/bin_node/lib_dev/tx_queue.mli | 19 ++--- etherlink/tezt/tests/evm_sequencer.ml | 105 +++++++++++++++++++----- 5 files changed, 172 insertions(+), 47 deletions(-) diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index e53ff3c5707a..cdbb8f0a84f6 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -22,6 +22,11 @@ ### RPCs changes +- **experimental feature** With the `tx_queue` feature enable in an + observer node, the RPC `eth_getTransactionByhash` returns the + transaction found in the `tx_queue`, it also works for transaction + that have been already forwarded to the upstream node. (!!16829) + ### Metrics changes ### Execution changes diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 2d864380d6a9..6997f59dec23 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -570,7 +570,11 @@ let dispatch_request (rpc : Configuration.rpc) | Get_transaction_by_hash.Method -> let f tx_hash = let* transaction_object = - let* transaction_object = Tx_pool.find tx_hash in + if Configuration.is_tx_queue_enabled config then + Tx_queue.find tx_hash + else Tx_pool.find tx_hash + in + let* transaction_object = match transaction_object with | Some transaction_object -> (* TODO: https://gitlab.com/tezos/tezos/-/issues/7747 diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index d1b0ad02ca32..e6a7be62f3d8 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -12,7 +12,7 @@ let two_seconds = Ptime.Span.of_int_s 2 type parameters = {evm_node_endpoint : Uri.t; config : Configuration.tx_queue} -type queue_variant = [`Accepted of Ethereum_types.hash | `Refused] +type queue_variant = [`Accepted | `Refused] type pending_variant = [`Confirmed | `Dropped] @@ -39,7 +39,29 @@ type pending_request = { type callback = all_variant variant_callback -type request = {payload : Ethereum_types.hex; callback : callback} +type request = { + payload : Ethereum_types.hex; + tx_object : Ethereum_types.legacy_transaction_object; + callback : callback; +} + +module Tx_object = struct + open Ethereum_types + module S = String.Hashtbl + + type t = Ethereum_types.legacy_transaction_object S.t + + let empty ~start_size = S.create start_size + + let add htbl + (({hash = Hash (Hex hash); _} : Ethereum_types.legacy_transaction_object) + as tx_object) = + S.add htbl hash tx_object + + let find htbl (Hash (Hex hash)) = S.find htbl hash + + let remove htbl (Hash (Hex hash)) = S.remove htbl hash +end module Pending_transactions = struct open Ethereum_types @@ -80,6 +102,7 @@ type state = { evm_node_endpoint : Uri.t; mutable queue : queue_request Queue.t; pending : Pending_transactions.t; + tx_object : Tx_object.t; config : Configuration.tx_queue; } @@ -105,6 +128,10 @@ module Request = struct type ('a, 'b) t = | Inject : request -> (unit, tztrace) t | Confirm : {txn_hash : Ethereum_types.hash} -> (unit, tztrace) t + | Find : { + txn_hash : Ethereum_types.hash; + } + -> (Ethereum_types.legacy_transaction_object option, tztrace) t | Tick : (unit, tztrace) t type view = View : _ t -> view @@ -140,6 +167,14 @@ module Request = struct (obj1 (req "request" (constant "tick"))) (function View Tick -> Some () | _ -> None) (fun _ -> assert false); + case + Json_only + ~title:"Find" + (obj2 + (req "request" (constant "find")) + (req "transaction_hash" Ethereum_types.hash_encoding)) + (function View (Find {txn_hash}) -> Some ((), txn_hash) | _ -> None) + (fun _ -> assert false); ] let pp fmt (View r) = @@ -148,6 +183,7 @@ module Request = struct | Inject {payload = Hex txn; _} -> fprintf fmt "Inject %s" txn | Confirm {txn_hash = Hash (Hex txn_hash)} -> fprintf fmt "Confirm %s" txn_hash + | Find {txn_hash = Hash (Hex txn_hash)} -> fprintf fmt "Find %s" txn_hash | Tick -> fprintf fmt "Tick" end @@ -208,11 +244,7 @@ let send_transactions_batch ~evm_node_endpoint transactions = | value, Some callback -> let* () = match value with - | Ok res -> - let hash = - Data_encoding.Json.destruct Srt.output_encoding res - in - Lwt_result.ok (callback (`Accepted hash)) + | Ok _hash_encoded -> Lwt_result.ok (callback `Accepted) | Error error -> let*! () = Tx_queue_events.rpc_error error in Lwt_result.ok (callback `Refused) @@ -240,15 +272,22 @@ module Handlers = struct let open Lwt_result_syntax in let state = Worker.state self in match request with - | Inject {payload; callback} -> + | Inject {payload; tx_object; callback} -> + let pending_callback (reason : pending_variant) = + Tx_object.remove state.tx_object tx_object.hash ; + callback (reason :> all_variant) + in let queue_callback reason = (match reason with - | `Accepted hash -> - Pending_transactions.add state.pending hash (fun reason -> - callback (reason :> all_variant)) - | `Refused -> ()) ; + | `Accepted -> + Pending_transactions.add + state.pending + tx_object.hash + pending_callback + | `Refused -> Tx_object.remove state.tx_object tx_object.hash) ; callback (reason :> all_variant) in + Tx_object.add state.tx_object tx_object ; Queue.add {payload; queue_callback} state.queue ; return_unit | Confirm {txn_hash} -> ( @@ -257,6 +296,7 @@ module Handlers = struct Lwt.async (fun () -> pending_callback `Confirmed) ; return_unit | None -> return_unit) + | Find {txn_hash} -> return @@ Tx_object.find state.tx_object txn_hash | Tick -> let all_transactions = Queue.to_seq state.queue in let* transactions_to_inject, remaining_transactions = @@ -306,6 +346,7 @@ module Handlers = struct pending = Pending_transactions.empty ~start_size:(config.max_size / 4); (* start with /4 and let it grow if necessary to not allocate too much at start. *) + tx_object = Tx_object.empty ~start_size:(config.max_size / 4); config; } @@ -345,6 +386,16 @@ let worker = | Lwt.Fail e -> Result_syntax.tzfail (error_of_exn e) | Lwt.Sleep -> Result_syntax.tzfail No_worker) +let handle_request_error rq = + let open Lwt_syntax in + let* rq in + match rq with + | Ok res -> return_ok res + | Error (Worker.Request_error errs) -> Lwt.return_error errs + | Error (Closed None) -> Lwt.return_error [Tx_queue_is_closed] + | Error (Closed (Some errs)) -> Lwt.return_error errs + | Error (Any exn) -> Lwt.return_error [Exn exn] + let bind_worker f = let open Lwt_result_syntax in let res = Lazy.force worker in @@ -370,12 +421,10 @@ let rec beacon ~tick_interval = let inject ?(callback = fun _ -> Lwt_syntax.return_unit) (tx_object : Ethereum_types.legacy_transaction_object) txn = - (* tx_object uses only for it's hash for now. This will be revisited - in a follow-up *) let open Lwt_syntax in let* () = Tx_queue_events.add_transaction tx_object.hash in let* worker = worker_promise in - push_request worker (Inject {payload = txn; callback}) + push_request worker (Inject {payload = txn; tx_object; callback}) let confirm txn_hash = bind_worker @@ fun w -> push_request w (Confirm {txn_hash}) @@ -389,6 +438,11 @@ let start ~config ~evm_node_endpoint () = let*! () = Tx_queue_events.is_ready () in return_unit +let find txn_hash = + let open Lwt_result_syntax in + let*? w = Lazy.force worker in + Worker.Queue.push_request_and_wait w (Find {txn_hash}) |> handle_request_error + let shutdown () = let open Lwt_result_syntax in bind_worker @@ fun w -> diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index 5a5df2d26e37..e70ed1587310 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -18,20 +18,11 @@ {ul {li Depending on the result of the RPC, its [callback] is called with - either [`Accepted hash] (where [hash] is the hash of the raw - transaction) or [`Refused]).} + either [`Accepted] or [`Refused]).} {li As soon as the transaction appears in a blueprint, its callback is called with [`Confirmed]. If this does not happen before 2s, the [callback] is called with [`Dropped].}} *) -type callback = - [`Accepted of Ethereum_types.hash | `Confirmed | `Dropped | `Refused] -> - unit Lwt.t - -(** A [request] submitted to the [Tx_queue] consists in a payload - (that is, a raw transaction) and a {!type:callback} that will be - used to advertise the transaction's life cycle. *) - -type request = {payload : Ethereum_types.hex; callback : callback} +type callback = [`Accepted | `Confirmed | `Dropped | `Refused] -> unit Lwt.t (** [start ~evm_node_endpoint ~max_transaction_batch_length ()] starts the worker, meaning it is possible to call {!inject}, {!confirm} @@ -63,3 +54,9 @@ val confirm : Ethereum_types.hash -> unit tzresult Lwt.t (** [beacon ~tick_interval] is a never fulfilled promise which triggers a tick in the [Tx_queue] every [tick_interval] seconds. *) val beacon : tick_interval:float -> unit tzresult Lwt.t + +(** [find hash] returns the transaction associated with that hash if + it's found in the tx_queue. *) +val find : + Ethereum_types.hash -> + Ethereum_types.legacy_transaction_object option tzresult Lwt.t diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index 2d66b8f26305..d34c63b47dbf 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -10248,38 +10248,103 @@ let test_tx_queue = let*@ _ = produce_block sequencer in unit and* () = Evm_node.wait_for_blueprint_applied observer 1 in - - let observer_wait_tx_added = - Evm_node.wait_for_tx_queue_add_transaction observer - in - let sequencer_wait_tx_added = - Evm_node.wait_for_tx_pool_add_transaction sequencer - in - let observer_wait_tx_injected = - Evm_node.wait_for_tx_queue_injecting_transaction observer + let check_tx_is_found ~__LOC__ ~hash ~node = + let*@ tx = Rpc.get_transaction_by_hash ~transaction_hash:hash node in + Check.( + is_true + (Option.is_some tx) + ~__LOC__ + ~error_msg:"Expected to find the transaction but it was not found.") ; + unit in - let* raw_tx = + (* helper to craft a tx with given nonce. *) + let raw_tx ~nonce = Cast.craft_tx ~source_private_key:Eth_account.bootstrap_accounts.(0).private_key ~chain_id:1337 - ~nonce:0 + ~nonce ~gas_price:1_000_000_000 ~gas:23_300 ~value:Wei.one ~address:Eth_account.bootstrap_accounts.(1).address () in - let* () = - let*@ _hash = Rpc.send_raw_transaction ~raw_tx observer in - unit + + (* number of transactions that we are going to process (submit, + inject, ...) *) + let nb_txs = 10 in + + let wait_for_all_tx_process ~name ~waiter = + let rec aux total = + if total = nb_txs then ( + Log.info "All (%d) txs processed: \"%s\"." total name ; + unit) + else if total > nb_txs then + Test.fail + "more transaction where processed (%s) than expected, impossible" + name + else + let* nb = waiter () in + let total = total + nb in + Log.debug "Processed %d of txs. (%s)" total name ; + aux total + in + aux 0 + in + + (* Promises that checks that [nb_txs] txs have been seen with different + events. *) + + (* Checks that all txs were added to the observer tx_queue *) + let observer_wait_tx_added = + let waiter () = + let* _ = Evm_node.wait_for_tx_queue_add_transaction observer in + return 1 + in + wait_for_all_tx_process ~name:"tx added in observer queue" ~waiter + in + + (* Checks that all txs were added to the sequencer tx_pool *) + let sequencer_wait_tx_added = + let waiter () = + let* _ = Evm_node.wait_for_tx_pool_add_transaction sequencer in + return 1 + in + wait_for_all_tx_process ~name:"tx added in sequencer tx pool" ~waiter + in + + (* Checks that all txs were injected to the sequencer by the + observer *) + let observer_wait_tx_injected = + let waiter () = Evm_node.wait_for_tx_queue_injecting_transaction observer in + wait_for_all_tx_process ~name:"tx injected by observer queue" ~waiter + in + + (* Test start here *) + Log.info + "Sending %d transactions to the observer and check after each submition \ + that the tx can be retrieved" + nb_txs ; + let* hashes = + fold nb_txs [] @@ fun i hashes -> + let* raw_tx = raw_tx ~nonce:i in + let*@ hash = Rpc.send_raw_transaction ~raw_tx observer in + let* () = check_tx_is_found ~__LOC__ ~hash ~node:observer in + return (hash :: hashes) and* _ = observer_wait_tx_added - and* observer_wait_tx_injected + and* _ = observer_wait_tx_injected and* _ = sequencer_wait_tx_added in - Check.( - (observer_wait_tx_injected = 1) - ~__LOC__ - int - ~error_msg:"Expected %r transaction injected found %l.") ; + Log.info + "Verifying that all transactions can be retrieved both in the observer and \ + in the sequencer" ; + let* () = + Lwt_list.iter_p + (fun hash -> + let* () = check_tx_is_found ~__LOC__ ~hash ~node:observer + and* () = check_tx_is_found ~__LOC__ ~hash ~node:sequencer in + unit) + hashes + in unit let test_spawn_rpc = -- GitLab