From 150d540f778197edacfaa60dab5d960c3e59a0a7 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Thu, 6 Mar 2025 10:01:35 +0100 Subject: [PATCH 1/5] evm/node: add hash to tx_queue queue_request --- etherlink/bin_node/lib_dev/tx_queue.ml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 0184dca89abf..658ed7a2c442 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -21,6 +21,7 @@ type 'a variant_callback = 'a -> unit Lwt.t (** tx is in the queue and wait to be injected into the upstream node. *) type queue_request = { + hash : Ethereum_types.hash; payload : Ethereum_types.hex; (** payload of the transaction *) queue_callback : queue_variant variant_callback; (** callback to call with the response given by the upstream @@ -502,7 +503,7 @@ let send_transactions_batch ~evm_node_endpoint ~keep_alive transactions = else let rev_batch, callbacks = Seq.fold_left - (fun (rev_batch, callbacks) {payload; queue_callback} -> + (fun (rev_batch, callbacks) {hash = _; payload; queue_callback} -> let req_id = Uuidm.(v4_gen uuid_seed () |> to_string ~upper:false) in let txn = Rpc_encodings.JSONRPC. @@ -691,7 +692,9 @@ module Handlers = struct ~next_nonce ~nonce:tx_nonce in - Queue.add {payload; queue_callback} state.queue ; + Queue.add + {hash = tx_object.hash; payload; queue_callback} + state.queue ; return (Ok ())) else return -- GitLab From cd72221e48f54e1bd7995f4ab22f377114d9cb3a Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Thu, 6 Mar 2025 10:11:44 +0100 Subject: [PATCH 2/5] evm/node: add `pop_transactions` request to the tx_queue --- etherlink/bin_node/lib_dev/tx_queue.ml | 77 +++++++++++++++++++ etherlink/bin_node/lib_dev/tx_queue.mli | 19 ++++- etherlink/bin_node/lib_dev/tx_queue_events.ml | 10 +++ .../bin_node/lib_dev/tx_queue_events.mli | 4 + 4 files changed, 108 insertions(+), 2 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 658ed7a2c442..76fc92694d40 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -381,6 +381,10 @@ module Request = struct | Unlock_transactions : (unit, tztrace) t | Is_locked : (bool, tztrace) t | Content : (Ethereum_types.txpool, tztrace) t + | Pop_transactions : { + maximum_cumulative_size : int; + } + -> ((string * Ethereum_types.legacy_transaction_object) list, tztrace) t type view = View : _ t -> view @@ -470,6 +474,17 @@ module Request = struct (obj1 (req "request" (constant "content"))) (function View Content -> Some () | _ -> None) (fun _ -> assert false); + case + Json_only + ~title:"Pop_transactions" + (obj2 + (req "request" (constant "pop_transactions")) + (req "maximum_cumulatize_size" int31)) + (function + | View (Pop_transactions {maximum_cumulative_size}) -> + Some ((), maximum_cumulative_size) + | _ -> None) + (fun _ -> assert false); ] let pp fmt (View r) = @@ -487,6 +502,11 @@ module Request = struct | Unlock_transactions -> Format.fprintf fmt "Unlocking the transactions" | Is_locked -> Format.fprintf fmt "Checking if the tx queue is locked" | Content -> fprintf fmt "Content" + | Pop_transactions {maximum_cumulative_size} -> + fprintf + fmt + "Popping transactions of maximum cumulative size %d bytes" + maximum_cumulative_size end module Worker = Worker.MakeSingle (Name) (Request) (Types) @@ -589,6 +609,35 @@ let unlock_transactions state = state.locked <- false let is_locked state = state.locked +let pop_queue_until state ~maximum_cumulative_size = + let open Lwt_result_syntax in + let rec aux (current_size, rev_selected) = + match Queue.peek_opt state.queue with + | None -> return rev_selected + | Some {hash; payload; queue_callback} -> + let raw_tx = Ethereum_types.hex_to_bytes payload in + let new_size = current_size + String.length raw_tx in + if new_size <= maximum_cumulative_size then + (* Drop the tx because it's selected. *) + let _ = Queue.take state.queue in + let tx_object = Tx_object.find state.tx_object hash in + match tx_object with + | None -> + (* Drop that tx because no tx_object associated. this is + an inpossible case, we log it to investigate. *) + let*! () = Tx_queue_events.missing_tx_object hash in + let*! () = queue_callback `Refused in + aux (current_size, rev_selected) + | Some tx_object -> + let rev_selected = + ((raw_tx, tx_object), queue_callback) :: rev_selected + in + aux (new_size, rev_selected) + else return rev_selected + in + let* rev_selected = aux (0, []) in + return @@ List.rev rev_selected + module Handlers = struct open Request @@ -799,6 +848,26 @@ module Handlers = struct in return {pending; queued} + | Pop_transactions {maximum_cumulative_size} -> + let open Lwt_result_syntax in + if is_locked state then return [] + else + let* selected = pop_queue_until state ~maximum_cumulative_size in + let*! selected = + List.map_s + (fun (tx, callback) -> + let open Lwt_syntax in + let* () = callback `Accepted in + return tx) + selected + in + (* All transactions popped are considered `Accepted, and are + added to the pending state. The only consumer of that + request is the block producer, a local worker that will + process all popped transaction, and confirm only + transactions that were included in a block with + [Confirm_transactions] *) + return selected type launch_error = tztrace @@ -954,6 +1023,14 @@ let is_locked () = let*? worker = Lazy.force worker in Worker.Queue.push_request_and_wait worker Is_locked |> handle_request_error +let pop_transactions ~maximum_cumulative_size = + let open Lwt_result_syntax in + let*? w = Lazy.force worker in + Worker.Queue.push_request_and_wait + w + (Pop_transactions {maximum_cumulative_size}) + |> handle_request_error + module Internal_for_tests = struct module Nonce_bitset = Nonce_bitset module Address_nonce = Address_nonce diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index bb5b4729f5d4..3df3d22b7f8f 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -103,10 +103,25 @@ val unlock_transactions : unit -> unit tzresult Lwt.t (** [is_locked] checks if the queue is locked. *) val is_locked : unit -> bool tzresult Lwt.t -(** [content ()] returns the queued and pending transactions of the tx_queue - mapped into a tx_pool to mimic {!Tx_pool.get_tx_pool_content}. Semantics of pending and queued are not equal to {!Tx_pool.get_tx_pool_content} *) +(** [content ()] returns the queued and pending transactions of the + tx_queue mapped into a tx_pool to mimic + {!Tx_pool.get_tx_pool_content}. Semantics of pending and queued + are not equal to {!Tx_pool.get_tx_pool_content} *) val content : unit -> Ethereum_types.txpool tzresult Lwt.t +(** [pop_transactions ~maximum_cumulative_size] pops as much valid + transactions as possible from the pool, until their cumulative + size exceeds [maximum_cumulative_size]. + + If the tx_queue is locked (c.f. {!lock_transactions} then returns + the empty list. + + All returned transaction are considered as accepted and all + associated callbacks are called with [`Accepted]. *) +val pop_transactions : + maximum_cumulative_size:int -> + (string * Ethereum_types.legacy_transaction_object) list tzresult Lwt.t + (**/*) module Internal_for_tests : sig diff --git a/etherlink/bin_node/lib_dev/tx_queue_events.ml b/etherlink/bin_node/lib_dev/tx_queue_events.ml index bb36ccd0ebc6..dc0f5fdab581 100644 --- a/etherlink/bin_node/lib_dev/tx_queue_events.ml +++ b/etherlink/bin_node/lib_dev/tx_queue_events.ml @@ -82,6 +82,14 @@ let transaction_confirmed = ~pp1:(fun fmt Ethereum_types.(Hash (Hex h)) -> Format.fprintf fmt "%10s" h) ("tx_hash", Ethereum_types.hash_encoding) +let missing_tx_object = + declare_1 + ~name:"tx_queue_missing_tx_object" + ~msg:"transaction {tx_hash} has no associated object" + ~level:Error + ~pp1:(fun fmt Ethereum_types.(Hash (Hex h)) -> Format.fprintf fmt "%10s" h) + ("tx_hash", Ethereum_types.hash_encoding) + let is_ready () = emit is_ready () let shutdown () = emit shutdown () @@ -100,3 +108,5 @@ let rpc_error (error : Rpc_encodings.JSONRPC.error) = emit rpc_error (Int32.of_int error.code, error.message) let callback_error (error : tztrace) = emit callback_error error + +let missing_tx_object tx = emit missing_tx_object tx diff --git a/etherlink/bin_node/lib_dev/tx_queue_events.mli b/etherlink/bin_node/lib_dev/tx_queue_events.mli index 0cf612c01c48..c01b0903e3a6 100644 --- a/etherlink/bin_node/lib_dev/tx_queue_events.mli +++ b/etherlink/bin_node/lib_dev/tx_queue_events.mli @@ -35,5 +35,9 @@ val transaction_confirmed : Ethereum_types.hash -> unit Lwt.t (** [rpc_error error] advertises an RPC produced the error [error]. *) val rpc_error : Rpc_encodings.JSONRPC.error -> unit Lwt.t +(** [missing_tx_object hash] Advertises that it fails to find the + tx_object associated to [hash]. *) +val missing_tx_object : Ethereum_types.hash -> unit Lwt.t + (** [callback_error error] advertises an RPC produced the error [error]. *) val callback_error : tztrace -> unit Lwt.t -- GitLab From 05baaabc565cb73a071388b641919c157c78df2b Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Thu, 6 Mar 2025 10:16:02 +0100 Subject: [PATCH 3/5] evm/node: block_producer uses tx_queue if enabled --- etherlink/bin_node/lib_dev/block_producer.ml | 30 ++++++++++++--- etherlink/bin_node/lib_dev/block_producer.mli | 1 + .../bin_node/lib_dev/blueprints_publisher.ml | 16 ++++++-- .../bin_node/lib_dev/blueprints_publisher.mli | 1 + etherlink/bin_node/lib_dev/sequencer.ml | 37 ++++++++++++------- 5 files changed, 63 insertions(+), 22 deletions(-) diff --git a/etherlink/bin_node/lib_dev/block_producer.ml b/etherlink/bin_node/lib_dev/block_producer.ml index 0c3463e59d11..fb726ee6aab7 100644 --- a/etherlink/bin_node/lib_dev/block_producer.ml +++ b/etherlink/bin_node/lib_dev/block_producer.ml @@ -10,6 +10,7 @@ type parameters = { smart_rollup_address : string; sequencer_key : Client_keys.sk_uri; maximum_number_of_chunks : int; + uses_tx_queue : bool; } (* The size of a delayed transaction is overapproximated to the maximum size @@ -188,14 +189,23 @@ let produce_block_with_transactions ~sequencer_key ~cctxt ~timestamp (** Produces a block if we find at least one valid transaction in the transaction pool or if [force] is true. *) let produce_block_if_needed ~cctxt ~smart_rollup_address ~sequencer_key ~force - ~timestamp ~delayed_hashes ~remaining_cumulative_size head_info = + ~timestamp ~delayed_hashes ~remaining_cumulative_size ~uses_tx_queue + head_info = let open Lwt_result_syntax in let* transactions_and_objects = (* Low key optimization to avoid even checking the txpool if there is not enough space for the smallest transaction. *) if remaining_cumulative_size <= minimum_ethereum_transaction_size then return [] + else if uses_tx_queue then + (* TODO: https://gitlab.com/tezos/tezos/-/merge_requests/17211 + Validates transactions with regards to balance for + example (with_state validation) *) + Tx_queue.pop_transactions + ~maximum_cumulative_size:remaining_cumulative_size else + (* When the tx_pool is removed, we could keep the sequence instead + of creating a list in the popped transaction of the tx_queue. *) Tx_pool.pop_transactions ~maximum_cumulative_size:remaining_cumulative_size in @@ -211,7 +221,12 @@ let produce_block_if_needed ~cctxt ~smart_rollup_address ~sequencer_key ~force ~delayed_hashes head_info in - let* () = Tx_pool.clear_popped_transactions () in + let* () = + if uses_tx_queue then + (*TODO: in next commit here we confirm all TXs that have been + included *) return_unit + else Tx_pool.clear_popped_transactions () + in return n else return 0 @@ -233,10 +248,12 @@ let head_info_and_delayed_transactions ~with_delayed_transactions let*! head_info = Evm_context.head_info () in return (head_info, delayed_hashes, remaining_cumulative_size) -let produce_block ~cctxt ~smart_rollup_address ~sequencer_key ~force ~timestamp - ~maximum_number_of_chunks ~with_delayed_transactions = +let produce_block ~uses_tx_queue ~cctxt ~smart_rollup_address ~sequencer_key + ~force ~timestamp ~maximum_number_of_chunks ~with_delayed_transactions = let open Lwt_result_syntax in - let* is_locked = Tx_pool.is_locked () in + let* is_locked = + if uses_tx_queue then Tx_queue.is_locked () else Tx_pool.is_locked () + in if is_locked then let*! () = Block_producer_events.production_locked () in return 0 @@ -273,6 +290,7 @@ let produce_block ~cctxt ~smart_rollup_address ~sequencer_key ~force ~timestamp ~force ~delayed_hashes ~remaining_cumulative_size + ~uses_tx_queue head_info module Handlers = struct @@ -292,10 +310,12 @@ module Handlers = struct smart_rollup_address; sequencer_key; maximum_number_of_chunks; + uses_tx_queue; } = state in produce_block + ~uses_tx_queue ~cctxt ~smart_rollup_address ~sequencer_key diff --git a/etherlink/bin_node/lib_dev/block_producer.mli b/etherlink/bin_node/lib_dev/block_producer.mli index e7684b285f98..bca2b791d170 100644 --- a/etherlink/bin_node/lib_dev/block_producer.mli +++ b/etherlink/bin_node/lib_dev/block_producer.mli @@ -10,6 +10,7 @@ type parameters = { smart_rollup_address : string; sequencer_key : Client_keys.sk_uri; maximum_number_of_chunks : int; + uses_tx_queue : bool; } (** [start parameters] starts the events follower. *) diff --git a/etherlink/bin_node/lib_dev/blueprints_publisher.ml b/etherlink/bin_node/lib_dev/blueprints_publisher.ml index cc02d18d2a4a..6fed772ff304 100644 --- a/etherlink/bin_node/lib_dev/blueprints_publisher.ml +++ b/etherlink/bin_node/lib_dev/blueprints_publisher.ml @@ -18,6 +18,7 @@ type parameters = { keep_alive : bool; drop_duplicate : bool; order_enabled : bool; + tx_queue_enabled : bool; } type state = { @@ -38,6 +39,7 @@ type state = { mutable cooldown : int; (** Do not try to catch-up if [cooldown] is not equal to 0 *) enable_dal : bool; + tx_queue_enabled : bool; } module Types = struct @@ -104,6 +106,8 @@ module Worker = struct let on_cooldown worker = 0 < current_cooldown worker + let tx_queue_enabled worker = (state worker).tx_queue_enabled + let decrement_cooldown worker = let current = current_cooldown worker in if on_cooldown worker then set_cooldown worker (current - 1) else () @@ -160,7 +164,9 @@ module Worker = struct in match rollup_is_lagging_behind self with | No_lag | Needs_republish -> return_unit - | Needs_lock -> Tx_pool.lock_transactions () + | Needs_lock -> + if tx_queue_enabled self then Tx_queue.lock_transactions () + else Tx_pool.lock_transactions () let catch_up worker = let open Lwt_result_syntax in @@ -254,6 +260,7 @@ module Handlers = struct keep_alive; drop_duplicate; order_enabled; + tx_queue_enabled; } : Types.parameters) = let open Lwt_result_syntax in @@ -283,6 +290,7 @@ module Handlers = struct keep_alive; enable_dal = Option.is_some dal_slots; order_enabled; + tx_queue_enabled; } let on_request : @@ -312,7 +320,8 @@ module Handlers = struct Worker.decrement_cooldown self ; (* If there is no lag or the worker just needs to republish we unlock the transaction pool in case it was locked. *) - Tx_pool.unlock_transactions ()) + if Worker.tx_queue_enabled self then Tx_queue.unlock_transactions () + else Tx_pool.unlock_transactions ()) let on_completion (type a err) _self (_r : (a, err) Request.t) (_res : a) _st = @@ -340,7 +349,7 @@ let table = Worker.create_table Queue let worker_promise, worker_waker = Lwt.task () let start ~blueprints_range ~rollup_node_endpoint ~config ~latest_level_seen - ~keep_alive ~drop_duplicate ~order_enabled () = + ~keep_alive ~drop_duplicate ~order_enabled ~tx_queue_enabled () = let open Lwt_result_syntax in let* worker = Worker.launch @@ -354,6 +363,7 @@ let start ~blueprints_range ~rollup_node_endpoint ~config ~latest_level_seen keep_alive; drop_duplicate; order_enabled; + tx_queue_enabled; } (module Handlers) in diff --git a/etherlink/bin_node/lib_dev/blueprints_publisher.mli b/etherlink/bin_node/lib_dev/blueprints_publisher.mli index a62552ca9011..1b801e4cf2d7 100644 --- a/etherlink/bin_node/lib_dev/blueprints_publisher.mli +++ b/etherlink/bin_node/lib_dev/blueprints_publisher.mli @@ -18,6 +18,7 @@ val start : keep_alive:bool -> drop_duplicate:bool -> order_enabled:bool -> + tx_queue_enabled:bool -> unit -> unit tzresult Lwt.t diff --git a/etherlink/bin_node/lib_dev/sequencer.ml b/etherlink/bin_node/lib_dev/sequencer.ml index 256403050063..363ea5beeec7 100644 --- a/etherlink/bin_node/lib_dev/sequencer.ml +++ b/etherlink/bin_node/lib_dev/sequencer.ml @@ -164,6 +164,7 @@ let main ~data_dir ?(genesis_timestamp = Misc.now ()) ~cctxt configuration.experimental_features.drop_duplicate_on_injection ~order_enabled: configuration.experimental_features.blueprints_publisher_order_enabled + ~tx_queue_enabled:(Configuration.is_tx_queue_enabled configuration) () in let* () = @@ -196,20 +197,27 @@ let main ~data_dir ?(genesis_timestamp = Misc.now ()) ~cctxt let backend = Evm_ro_context.ro_backend ro_ctxt configuration in let* () = - Tx_pool.start - { - backend; - smart_rollup_address = smart_rollup_address_b58; - mode = Sequencer; - tx_timeout_limit = configuration.tx_pool_timeout_limit; - tx_pool_addr_limit = Int64.to_int configuration.tx_pool_addr_limit; - tx_pool_tx_per_addr_limit = - Int64.to_int configuration.tx_pool_tx_per_addr_limit; - max_number_of_chunks = - (match configuration.sequencer with - | Some {max_number_of_chunks; _} -> Some max_number_of_chunks - | None -> None); - } + match configuration.experimental_features.enable_tx_queue with + | Some tx_queue_config -> + Tx_queue.start + ~config:tx_queue_config + ~keep_alive:configuration.keep_alive + () + | None -> + Tx_pool.start + { + backend; + smart_rollup_address = smart_rollup_address_b58; + mode = Sequencer; + tx_timeout_limit = configuration.tx_pool_timeout_limit; + tx_pool_addr_limit = Int64.to_int configuration.tx_pool_addr_limit; + tx_pool_tx_per_addr_limit = + Int64.to_int configuration.tx_pool_tx_per_addr_limit; + max_number_of_chunks = + (match configuration.sequencer with + | Some {max_number_of_chunks; _} -> Some max_number_of_chunks + | None -> None); + } in Metrics.init ~mode:"sequencer" @@ -222,6 +230,7 @@ let main ~data_dir ?(genesis_timestamp = Misc.now ()) ~cctxt smart_rollup_address = smart_rollup_address_b58; sequencer_key = sequencer_config.sequencer; maximum_number_of_chunks = sequencer_config.max_number_of_chunks; + uses_tx_queue = Configuration.is_tx_queue_enabled configuration; } in let* () = -- GitLab From 2ce3929e69a562ecd4db74d88a5a8f8ec1f3e7f4 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Fri, 7 Mar 2025 11:06:43 +0100 Subject: [PATCH 4/5] evm/node: included transaction in a block are confirmed in the tx_queue --- etherlink/CHANGES_NODE.md | 2 +- etherlink/bin_node/lib_dev/block_producer.ml | 13 ++--- etherlink/bin_node/lib_dev/observer.ml | 6 ++- etherlink/bin_node/lib_dev/tx_queue.ml | 56 ++++++++++++++++++++ etherlink/bin_node/lib_dev/tx_queue.mli | 8 +++ 5 files changed, 76 insertions(+), 9 deletions(-) diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index b504d66a40bd..5f352c76cc2a 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -40,7 +40,7 @@ you start using them, you probably want to use `octez-evm-node check config This limits is for pending transactions the node has seen. configurable with `tx_per_addr_limit`. (!16903) - An sequencer EVM node can uses the tx_queue to speed the inclusion - of transaction. (!17134 !17100) + of transaction. (!17134 !17100 !17109) - `tx_queue` now has a maximum number of transactions. (!17083) - Observer nodes can now be run with `periodic_snapshot_path` defined in the configuration. It exports a snapshot to the given path every time they diff --git a/etherlink/bin_node/lib_dev/block_producer.ml b/etherlink/bin_node/lib_dev/block_producer.ml index fb726ee6aab7..201d8b79a273 100644 --- a/etherlink/bin_node/lib_dev/block_producer.ml +++ b/etherlink/bin_node/lib_dev/block_producer.ml @@ -169,7 +169,7 @@ let produce_block_with_transactions ~sequencer_key ~cctxt ~timestamp Evm_state.get_delayed_inbox_item head_info.evm_state delayed_hash) delayed_hashes in - let* _hashes = + let* confirmed_txs = Evm_context.apply_blueprint timestamp blueprint_payload delayed_transactions in let (Qty number) = head_info.next_blueprint_number in @@ -184,7 +184,7 @@ let produce_block_with_transactions ~sequencer_key ~cctxt ~timestamp (fun hash -> Block_producer_events.transaction_selected ~hash) (tx_hashes @ delayed_hashes) in - return_unit + return confirmed_txs (** Produces a block if we find at least one valid transaction in the transaction pool or if [force] is true. *) @@ -211,7 +211,7 @@ let produce_block_if_needed ~cctxt ~smart_rollup_address ~sequencer_key ~force in let n = List.length transactions_and_objects + List.length delayed_hashes in if force || n > 0 then - let* () = + let* confirmed_txs = produce_block_with_transactions ~sequencer_key ~cctxt @@ -223,8 +223,9 @@ let produce_block_if_needed ~cctxt ~smart_rollup_address ~sequencer_key ~force in let* () = if uses_tx_queue then - (*TODO: in next commit here we confirm all TXs that have been - included *) return_unit + Tx_queue.confirm_transactions + ~clear_pending_queue_after:true + ~confirmed_txs else Tx_pool.clear_popped_transactions () in return n @@ -270,7 +271,7 @@ let produce_block ~uses_tx_queue ~cctxt ~smart_rollup_address ~sequencer_key | None -> false in if is_going_to_upgrade then - let* () = + let* _hashes (* empty because no txs given *) = produce_block_with_transactions ~sequencer_key ~cctxt diff --git a/etherlink/bin_node/lib_dev/observer.ml b/etherlink/bin_node/lib_dev/observer.ml index 1d4b0718beca..ba279f97d23d 100644 --- a/etherlink/bin_node/lib_dev/observer.ml +++ b/etherlink/bin_node/lib_dev/observer.ml @@ -8,10 +8,12 @@ open Ethereum_types -let confirm_txs config tx_hashes = +let confirm_txs config confirmed_txs = let open Lwt_result_syntax in if Configuration.is_tx_queue_enabled config then - Seq.iter_ep (fun hash -> Tx_queue.confirm hash) tx_hashes + Tx_queue.confirm_transactions + ~clear_pending_queue_after:false + ~confirmed_txs else return_unit (** [on_new_blueprint evm_node_endpoint next_blueprint_number diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 76fc92694d40..3fccc8630ac0 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -304,6 +304,10 @@ module Pending_transactions = struct else Some pending) htbl ; !dropped + + let to_seq = S.to_seq_values + + let clear = S.clear end module Transactions_per_addr = struct @@ -385,6 +389,11 @@ module Request = struct maximum_cumulative_size : int; } -> ((string * Ethereum_types.legacy_transaction_object) list, tztrace) t + | Confirm_transactions : { + confirmed_txs : Ethereum_types.hash Seq.t; + clear_pending_queue_after : bool; + } + -> (unit, tztrace) t type view = View : _ t -> view @@ -485,6 +494,20 @@ module Request = struct Some ((), maximum_cumulative_size) | _ -> None) (fun _ -> assert false); + case + Json_only + ~title:"Confirm_transactions" + (obj3 + (req "request" (constant "confirm_transactions")) + (req "confirmed_txs" (list Ethereum_types.hash_encoding)) + (req "clear_pending_queue_after" bool)) + (function + | View + (Confirm_transactions + {confirmed_txs; clear_pending_queue_after}) -> + Some ((), List.of_seq confirmed_txs, clear_pending_queue_after) + | _ -> None) + (fun _ -> assert false); ] let pp fmt (View r) = @@ -507,6 +530,7 @@ module Request = struct fmt "Popping transactions of maximum cumulative size %d bytes" maximum_cumulative_size + | Confirm_transactions _ -> fprintf fmt "Confirming transactions" end module Worker = Worker.MakeSingle (Name) (Request) (Types) @@ -868,6 +892,30 @@ module Handlers = struct transactions that were included in a block with [Confirm_transactions] *) return selected + | Confirm_transactions {confirmed_txs; clear_pending_queue_after} -> + let*! () = + Seq.S.iter + (fun hash -> + let callback = Pending_transactions.pop state.pending hash in + match callback with + | Some {pending_callback; _} -> pending_callback `Confirmed + | None -> + (* delayed transactions hashes are part of confirmed + txs *) + Lwt.return_unit) + confirmed_txs + in + if clear_pending_queue_after then ( + let dropped = Pending_transactions.to_seq state.pending in + let*! () = + Seq.S.iter + (fun {pending_callback; _} -> pending_callback `Dropped) + dropped + in + (* Emptying the pending the dropped transactions *) + Pending_transactions.clear state.pending ; + return_unit) + else return_unit type launch_error = tztrace @@ -1031,6 +1079,14 @@ let pop_transactions ~maximum_cumulative_size = (Pop_transactions {maximum_cumulative_size}) |> handle_request_error +let confirm_transactions ~clear_pending_queue_after ~confirmed_txs = + let open Lwt_result_syntax in + let*? w = Lazy.force worker in + Worker.Queue.push_request_and_wait + w + (Confirm_transactions {confirmed_txs; clear_pending_queue_after}) + |> handle_request_error + module Internal_for_tests = struct module Nonce_bitset = Nonce_bitset module Address_nonce = Address_nonce diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index 3df3d22b7f8f..533f61afca79 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -122,6 +122,14 @@ val pop_transactions : maximum_cumulative_size:int -> (string * Ethereum_types.legacy_transaction_object) list tzresult Lwt.t +(** [confirm_transactions ~clear_pending_queue_after ~confirmed_txs] + confirms [confirmed_txs] hash. If [drop_unconfirmed] then any + other pending transactions in the tx_queue are dropped. *) +val confirm_transactions : + clear_pending_queue_after:bool -> + confirmed_txs:Ethereum_types.hash Seq.t -> + unit tzresult Lwt.t + (**/*) module Internal_for_tests : sig -- GitLab From c0820c6ca42bcfdbe949a8a15c1d4ba0950e88de Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Tue, 11 Mar 2025 16:27:30 +0100 Subject: [PATCH 5/5] evm/tezt: adapt test tx_queue with sequencer using tx_queue --- etherlink/tezt/lib/evm_node.ml | 8 +- etherlink/tezt/lib/evm_node.mli | 9 +- etherlink/tezt/lib/setup.ml | 1 + etherlink/tezt/tests/evm_sequencer.ml | 390 ++++++++++---------------- 4 files changed, 162 insertions(+), 246 deletions(-) diff --git a/etherlink/tezt/lib/evm_node.ml b/etherlink/tezt/lib/evm_node.ml index 2e7b09707873..c7b67a5c9442 100644 --- a/etherlink/tezt/lib/evm_node.ml +++ b/etherlink/tezt/lib/evm_node.ml @@ -582,9 +582,13 @@ let wait_for_tx_queue_add_transaction ?timeout evm_node = wait_for_event ?timeout evm_node ~event:"tx_queue_add_transaction.v0" @@ fun json -> JSON.(json |> as_string |> Option.some) -let wait_for_tx_queue_transaction_confirmed ?timeout evm_node = +let wait_for_tx_queue_transaction_confirmed ?timeout ?hash evm_node = wait_for_event ?timeout evm_node ~event:"tx_queue_transaction_confirmed.v0" - @@ fun json -> JSON.(json |> as_string |> Option.some) + @@ fun json -> + let found_hash = JSON.(json |> as_string) in + match hash with + | Some hash -> if found_hash = hash then Some found_hash else None + | None -> Some found_hash let wait_for_tx_queue_injecting_transaction ?timeout evm_node = wait_for_event ?timeout evm_node ~event:"tx_queue_injecting_transaction.v0" diff --git a/etherlink/tezt/lib/evm_node.mli b/etherlink/tezt/lib/evm_node.mli index b3712b291dcb..aa078eb3aa77 100644 --- a/etherlink/tezt/lib/evm_node.mli +++ b/etherlink/tezt/lib/evm_node.mli @@ -430,11 +430,12 @@ val wait_for_tx_pool_add_transaction : ?timeout:float -> t -> string Lwt.t hash. *) val wait_for_tx_queue_add_transaction : ?timeout:float -> t -> string Lwt.t -(** [wait_for_tx_queue_transaction_confirmed ?timeout evm_node] waits - for the event [tx_queue_transaction_confirmed.v0] using {!wait_for} - and returns the transaction hash. *) +(** [wait_for_tx_queue_transaction_confirmed ?timeout ?hash evm_node] + waits for the event [tx_queue_transaction_confirmed.v0] using + {!wait_for} and returns the transaction hash. If [hash] is + provided, wait for that hash to be confirmed. *) val wait_for_tx_queue_transaction_confirmed : - ?timeout:float -> t -> string Lwt.t + ?timeout:float -> ?hash:string -> t -> string Lwt.t (** [wait_for_tx_queue_injecting_transaction ?timeout evm_node] waits for the event [tx_queue_injecting_transaction.v0] using diff --git a/etherlink/tezt/lib/setup.ml b/etherlink/tezt/lib/setup.ml index ef8e8bf551c0..e266aea99ec5 100644 --- a/etherlink/tezt/lib/setup.ml +++ b/etherlink/tezt/lib/setup.ml @@ -487,6 +487,7 @@ let setup_sequencer_internal ?max_delayed_inbox_blueprint_length ?rpc_server ?enable_websocket:websockets ?spawn_rpc + ?enable_tx_queue (* When adding new experimental feature please make sure it's a good idea to activate it for all test or not. *) () diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index da98bfd8ed42..92cfe2aad3e0 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -10882,6 +10882,7 @@ let test_tx_queue = ~use_threshold_encryption:Register_without_feature ~use_dal:Register_without_feature ~websockets:false + ~enable_tx_queue:true ~title:"Submits a transaction to an observer with a tx queue." @@ fun {sequencer; observer; _} _protocol -> let* () = Evm_node.terminate observer in @@ -10930,52 +10931,15 @@ let test_tx_queue = inject, ...) *) let nb_txs = 10 in - let wait_for_all_tx_process ~name ~waiter = - let rec aux total = - if total = nb_txs then ( - Log.info "All (%d) txs processed: \"%s\"." total name ; - unit) - else if total > nb_txs then - Test.fail - "more transaction where processed (%s) than expected, impossible" - name - else - let* nb = waiter () in - let total = total + nb in - Log.debug "Processed %d of txs. (%s)" total name ; - aux total - in - aux 0 + let wait_for_all_tx_process_p ~hashes ~name ~waiter = + let* () = Lwt_list.iter_p (fun hash -> waiter ~hash) hashes in + Log.info "All (%d) txs processed: \"%s\"." (List.length hashes) name ; + unit in (* Promises that checks that [nb_txs] txs have been seen with different events. *) - (* Checks that all txs were added to the observer tx_queue *) - let observer_wait_tx_added = - let waiter () = - let* _ = Evm_node.wait_for_tx_queue_add_transaction observer in - return 1 - in - wait_for_all_tx_process ~name:"tx added in observer queue" ~waiter - in - - (* Checks that all txs were added to the sequencer tx_pool *) - let sequencer_wait_tx_added = - let waiter () = - let* _ = Evm_node.wait_for_tx_pool_add_transaction sequencer in - return 1 - in - wait_for_all_tx_process ~name:"tx added in sequencer tx pool" ~waiter - in - - (* Checks that all txs were injected to the sequencer by the - observer *) - let observer_wait_tx_injected = - let waiter () = Evm_node.wait_for_tx_queue_injecting_transaction observer in - wait_for_all_tx_process ~name:"tx injected by observer queue" ~waiter - in - (* Test start here *) Log.info "Sending %d transactions to the observer and check after each submition \ @@ -10984,46 +10948,40 @@ let test_tx_queue = let* hashes = fold nb_txs [] @@ fun i hashes -> let* raw_tx = raw_tx ~nonce:i in - let*@ hash = Rpc.send_raw_transaction ~raw_tx observer in - let* () = check_tx_is_found ~__LOC__ ~hash ~node:observer in + let* hash = + let*@ hash = Rpc.send_raw_transaction ~raw_tx observer in + return hash + and* _ = Evm_node.wait_for_tx_queue_add_transaction observer + and* _ = Evm_node.wait_for_tx_queue_add_transaction sequencer + and* _ = Evm_node.wait_for_tx_queue_injecting_transaction observer in + let* () = check_tx_is_found ~__LOC__ ~hash ~node:observer + and* () = check_tx_is_found ~__LOC__ ~hash ~node:sequencer in return (hash :: hashes) - and* _ = observer_wait_tx_added - and* _ = observer_wait_tx_injected - and* _ = sequencer_wait_tx_added in + in Log.info "Produce enough block to include all txs and make sure they are confirmed \ by the observer" ; (* Checks that all txs were confirmed in the observer *) let observer_wait_tx_confirmed = - let waiter () = - let* _ = Evm_node.wait_for_tx_queue_transaction_confirmed observer in - return 1 - in - wait_for_all_tx_process ~name:"tx confirmed in observer" ~waiter - in - (* Checks that all txs were included in a block by the sequencer *) - let* () = - let res = ref None in - let _p = - let* () = - let waiter () = - let* _hash = Evm_node.wait_for_block_producer_tx_injected sequencer in - return 1 - in - wait_for_all_tx_process ~name:"tx included by sequencer" ~waiter + let waiter ~hash = + let* _ = + Evm_node.wait_for_tx_queue_transaction_confirmed observer ~hash in - res := Some () ; unit in - let result_f () = return !res in - bake_until - ~__LOC__ - ~bake:(fun () -> - let*@ _ = produce_block sequencer in - unit) - ~result_f - () + wait_for_all_tx_process_p ~hashes ~name:"tx confirmed in observer" ~waiter + in + + (* Checks that all txs were included in a unique block by the sequencer *) + let* () = + let*@ included_nb_txs = produce_block sequencer in + Check.( + (included_nb_txs = nb_txs) + int + ~__LOC__ + ~error_msg:"Produce block included %L transaction expected %R") ; + unit and* _ = observer_wait_tx_confirmed in Log.info @@ -11050,6 +11008,7 @@ let test_tx_queue_clear = ~kernels:[Latest] ~use_dal:Register_without_feature ~use_threshold_encryption:Register_without_feature + ~websockets:false @@ fun { client; l1_contracts; @@ -11148,6 +11107,7 @@ let test_tx_queue_nonce = ~use_threshold_encryption:Register_without_feature ~use_dal:Register_without_feature ~websockets:false + ~enable_tx_queue:true ~title: "Submits transactions to an observer with a tx queue and make sure it \ can respond to getTransactionCount." @@ -11172,6 +11132,7 @@ let test_tx_queue_nonce = let*@ _ = produce_block sequencer in unit and* () = Evm_node.wait_for_blueprint_applied observer 1 in + let check_nonce ~__LOC__ ~evm_node ~block ~expected = let*@ nonce = Rpc.get_transaction_count @@ -11217,31 +11178,19 @@ let test_tx_queue_nonce = let send_and_wait_sequencer_receive ~nonce = let wait_sequencer_see_tx = - Evm_node.wait_for_tx_pool_add_transaction sequencer + Evm_node.wait_for_tx_queue_add_transaction sequencer in - let* _ = - let*@ _hash = send_raw_tx ~nonce in - unit + let* hash = + let*@ hash = send_raw_tx ~nonce in + return hash and* _ = wait_sequencer_see_tx in - unit + return hash in - let wait_for_all_tx_process ~nb_txs ~name ~waiter = - let rec aux total = - if total = nb_txs then ( - Log.info "All (%d) txs processed: \"%s\"." total name ; - unit) - else if total > nb_txs then - Test.fail - "more transaction where processed (%s) than expected, impossible" - name - else - let* nb = waiter () in - let total = total + nb in - Log.debug "Processed %d of txs. (%s)" total name ; - aux total - in - aux 0 + let wait_for_all_tx_process_p ~hashes ~name ~waiter = + let* () = Lwt_list.iter_p (fun hash -> waiter ~hash) hashes in + Log.info "All (%d) txs processed: \"%s\"." (List.length hashes) name ; + unit in (* Test start here *) @@ -11256,32 +11205,25 @@ let test_tx_queue_nonce = "Sending %d transactions to the observer and check after each that the \ nonce in pending is correct" nb_txs ; - let* _hashes = - fold nb_txs () @@ fun i () -> - let* () = send_and_wait_sequencer_receive ~nonce:i in - check_nonce - ~__LOC__ - ~check_observer:true - ~check_sequencer:true - ~block:"pending" - ~expected:(i + 1) - () - in - - let* () = - check_nonce - ~__LOC__ - ~check_observer:true - ~check_sequencer:true - ~block:"pending" - ~expected:nb_txs - () + let* hashes = + fold nb_txs [] @@ fun i hashes -> + let* hash = send_and_wait_sequencer_receive ~nonce:i in + let* () = + check_nonce + ~__LOC__ + ~check_observer:true + ~check_sequencer:true + ~block:"pending" + ~expected:(i + 1) + () + in + return (hash :: hashes) in Log.info "Send another txs to create a gap and check that the nonce in pending is \ still the same" ; - let* () = send_and_wait_sequencer_receive ~nonce:(nb_txs + 1) in + let* _lost_hash = send_and_wait_sequencer_receive ~nonce:(nb_txs + 1) in let* () = check_nonce @@ -11296,84 +11238,78 @@ let test_tx_queue_nonce = Log.info "Send missing nonce to fill the gap and check that the nonce in pending is \ now correct" ; - let* () = send_and_wait_sequencer_receive ~nonce:nb_txs in + let* hash = send_and_wait_sequencer_receive ~nonce:nb_txs in Log.info "produce enough block to include all txs and make sure the nonce of latest \ and pending is equal." ; (* Checks that all txs were confirmed in the observer *) let observer_wait_tx_confirmed () = - let waiter () = - let* _ = Evm_node.wait_for_tx_queue_transaction_confirmed observer in - return 1 + let waiter ~hash = + let* _ = + Evm_node.wait_for_tx_queue_transaction_confirmed ~hash observer + in + unit in - wait_for_all_tx_process - ~nb_txs:(nb_txs + 2) - ~name:"tx confirmed in observer" - ~waiter + let hashes = hash :: hashes in + wait_for_all_tx_process_p ~hashes ~name:"tx confirmed in observer" ~waiter in - (* Checks that all txs were included in a block by the sequencer *) - let sequencer_wait_tx_included () = - let waiter () = - let* _hash = Evm_node.wait_for_block_producer_tx_injected sequencer in - return 1 - in - wait_for_all_tx_process - ~nb_txs:(nb_txs + 2) - ~name:"tx included by sequencer" - ~waiter - in + (* Checks that all txs were included in a unique block by the sequencer *) + let* () = + let*@ included_nb_txs = produce_block sequencer in + Check.( + (included_nb_txs = nb_txs + 2) + int + ~__LOC__ + ~error_msg:"Produce block included %L transaction expected %R") ; + unit + and* _ = observer_wait_tx_confirmed () in - (* produce enough blocks to include all txs submited *) - let produce_block_until_all_included () = - let res = ref None in - let _p = - let* () = sequencer_wait_tx_included () in - res := Some () ; - unit - in - let result_f () = return !res in - bake_until + let* () = + check_nonce ~__LOC__ - ~bake:(fun () -> - let*@ _ = produce_block sequencer in - unit) - ~result_f + ~check_observer:true + ~check_sequencer:true + ~block:"latest" + ~expected:(nb_txs + 1) + (* latest two transaction were submitted in the wrong order, + only the second one is correctly applied. The other still + lives in the pending state of the observer tx_queue for + now, it's reason for "pending" still it. *) () in - - let* () = produce_block_until_all_included () - and* () = observer_wait_tx_confirmed () in - let* () = + (* In the observer the wrong ordered tx is still in pending *) check_nonce ~__LOC__ ~check_observer:true - ~check_sequencer:true + ~check_sequencer:false ~block:"pending" ~expected:(nb_txs + 2) () in let* () = + (* In the sequencer the wrong ordered tx is dropped when creating + the block *) check_nonce ~__LOC__ - ~check_observer:true + ~check_observer:false ~check_sequencer:true - ~block:"latest" - ~expected:(nb_txs + 2) + ~block:"pending" + ~expected:(nb_txs + 1) () in Log.info "Try to send a transaction with a nonce in the past." ; - let*@? _hash = send_raw_tx ~nonce:(nb_txs + 1) in + let*@? _hash = send_raw_tx ~nonce:nb_txs in (* still true with a valid tx in pending. *) - let* () = send_and_wait_sequencer_receive ~nonce:(nb_txs + 3) in - let*@? _hash = send_raw_tx ~nonce:(nb_txs + 1) in + let* _hash = send_and_wait_sequencer_receive ~nonce:(nb_txs + 1) in + let*@? _hash = send_raw_tx ~nonce:nb_txs in - Log.info "Try to send a transaction with an nonce already pending." ; - let* () = send_and_wait_sequencer_receive ~nonce:(nb_txs + 3) in + Log.info "Try to send a transaction with an nonce already pending, is valid." ; + let* _hash = send_and_wait_sequencer_receive ~nonce:(nb_txs + 1) in unit let test_spawn_rpc = @@ -11465,6 +11401,7 @@ let test_tx_queue_limit = ~use_threshold_encryption:Register_without_feature ~use_dal:Register_without_feature ~websockets:false + ~enable_tx_queue:true (* enables it in the sequencer *) ~title: "Submits transactions to an observer with a tx queue and make sure its \ limit are respected." @@ -11477,6 +11414,7 @@ let test_tx_queue_limit = let max_number_of_txs = 10 in let* () = + (* modify the config of the observer. *) Evm_node.Config_file.update observer @@ Evm_node.patch_config_with_experimental_feature ~enable_tx_queue:true @@ -11491,106 +11429,78 @@ let test_tx_queue_limit = let* () = Evm_node.run observer in (* helper to craft a tx with given nonce. *) - let raw_tx ~nonce = - Cast.craft_tx - ~source_private_key:Eth_account.bootstrap_accounts.(0).private_key - ~chain_id:1337 - ~nonce - ~gas_price:1_000_000_000 - ~gas:23_300 - ~value:Wei.one - ~address:Eth_account.bootstrap_accounts.(1).address - () + let send_raw_tx ~nonce = + let* raw_tx = + Cast.craft_tx + ~source_private_key:Eth_account.bootstrap_accounts.(0).private_key + ~chain_id:1337 + ~nonce + ~gas_price:1_000_000_000 + ~gas:23_300 + ~value:Wei.one + ~address:Eth_account.bootstrap_accounts.(1).address + () + in + Rpc.send_raw_transaction ~raw_tx observer + in + + let send_and_wait_sequencer_receive ~nonce = + let wait_sequencer_see_tx = + Evm_node.wait_for_tx_queue_add_transaction sequencer + in + let* hash = + let*@ hash = send_raw_tx ~nonce in + return hash + and* _ = wait_sequencer_see_tx in + return hash in Log.info "send %d txs, all are successfully added to the queue" max_number_of_txs ; - let* () = - Lwt_list.iter_p - (fun i -> - let* raw_tx = raw_tx ~nonce:i in - let*@ _hash = Rpc.send_raw_transaction ~raw_tx observer in - unit) - (range 0 (max_number_of_txs - 1)) + let* hashes = + fold max_number_of_txs [] (fun i hashes -> + let* hash = send_and_wait_sequencer_receive ~nonce:i in + return (hash :: hashes)) in Log.info "Then send an additional txs, that fails" ; - let*@? _error = - let* raw_tx = raw_tx ~nonce:max_number_of_txs in - Rpc.send_raw_transaction ~raw_tx observer - in + let*@? _error = send_raw_tx ~nonce:max_number_of_txs in - Log.info "Wait for all txs to be included by the sequencer" ; - let wait_for_all_tx_process ~nb_txs ~name ~waiter = - let rec aux total = - if total = nb_txs then ( - Log.info "All (%d) txs processed: \"%s\"." total name ; - unit) - else if total > nb_txs then - Test.fail - "more transaction where processed (%s) than expected, impossible" - name - else - let* nb = waiter () in - let total = total + nb in - Log.debug "Processed %d of txs. (%s)" total name ; - aux total - in - aux 0 - in - - (* Checks that all txs were included in a block by the sequencer *) - let sequencer_wait_tx_included () = - let waiter () = - let* _hash = Evm_node.wait_for_block_producer_tx_injected sequencer in - return 1 - in - wait_for_all_tx_process - ~nb_txs:max_number_of_txs - ~name:"tx included by sequencer" - ~waiter - in - - (* produce enough blocks to include all txs submited *) - let produce_block_until_all_included () = - let res = ref None in - let _p = - let* () = sequencer_wait_tx_included () in - res := Some () ; - unit - in - let result_f () = return !res in - bake_until - ~__LOC__ - ~bake:(fun () -> - let*@ _ = produce_block sequencer in - unit) - ~result_f - () - in + Log.info "Produce a block, all transaction are confirmed" ; let observer_wait_tx_confirmed () = - let waiter () = - let* _ = Evm_node.wait_for_tx_queue_transaction_confirmed observer in - return 1 + let* () = + Lwt_list.iter_p + (fun hash -> + let* _ = + Evm_node.wait_for_tx_queue_transaction_confirmed ~hash observer + in + Log.debug "tx %s confirmed" hash ; + unit) + hashes in - wait_for_all_tx_process - ~nb_txs:max_number_of_txs - ~name:"tx confirmed in observer" - ~waiter + Log.info "All (%d) txs confirmed in observer" max_number_of_txs ; + unit in - let* () = produce_block_until_all_included () - and* () = observer_wait_tx_confirmed () in + (* Checks that all txs were included in a unique block by the sequencer *) + let* () = + let*@ included_nb_txs = produce_block sequencer in + Check.( + (included_nb_txs = max_number_of_txs) + int + ~__LOC__ + ~error_msg:"Produce block included %L transaction expected %R") ; + unit + and* _ = observer_wait_tx_confirmed () in Log.info "Resend %d txs, all are successfully added to the queue" max_number_of_txs ; let* () = - Lwt_list.iter_p + Lwt_list.iter_s (fun i -> - let* raw_tx = raw_tx ~nonce:i in - let*@ _hash = Rpc.send_raw_transaction ~raw_tx observer in + let* _ = send_and_wait_sequencer_receive ~nonce:i in unit) (range max_number_of_txs ((2 * max_number_of_txs) - 1)) in -- GitLab