From 1845fbe4bd3e479d633777122a2144b1498234d2 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Thu, 27 Feb 2025 13:29:49 +0100 Subject: [PATCH 1/3] node: apply_blueprint returns the tx hashes --- etherlink/bin_node/lib_dev/block_producer.ml | 2 +- etherlink/bin_node/lib_dev/evm_context.ml | 101 +++++++++--------- etherlink/bin_node/lib_dev/evm_context.mli | 7 +- .../bin_node/lib_dev/evm_context_types.ml | 2 +- etherlink/bin_node/lib_dev/observer.ml | 3 +- etherlink/bin_node/lib_dev/sequencer.ml | 2 +- 6 files changed, 62 insertions(+), 55 deletions(-) diff --git a/etherlink/bin_node/lib_dev/block_producer.ml b/etherlink/bin_node/lib_dev/block_producer.ml index 05a04ffc5b39..0c3463e59d11 100644 --- a/etherlink/bin_node/lib_dev/block_producer.ml +++ b/etherlink/bin_node/lib_dev/block_producer.ml @@ -168,7 +168,7 @@ let produce_block_with_transactions ~sequencer_key ~cctxt ~timestamp Evm_state.get_delayed_inbox_item head_info.evm_state delayed_hash) delayed_hashes in - let* () = + let* _hashes = Evm_context.apply_blueprint timestamp blueprint_payload delayed_transactions in let (Qty number) = head_info.next_blueprint_number in diff --git a/etherlink/bin_node/lib_dev/evm_context.ml b/etherlink/bin_node/lib_dev/evm_context.ml index 98b11ed33118..d2ad38b71c50 100644 --- a/etherlink/bin_node/lib_dev/evm_context.ml +++ b/etherlink/bin_node/lib_dev/evm_context.ml @@ -830,48 +830,44 @@ module State = struct let rec apply_blueprint ?(events = []) ctxt conn timestamp payload delayed_transactions = let open Lwt_result_syntax in - let* _current_block = - Misc.with_timing_f_e Blueprint_events.blueprint_applied @@ fun () -> - let* evm_state, context, current_block, applied_kernel_upgrade, split_info - = - let* () = apply_evm_events conn ctxt events in - apply_blueprint_store_unsafe - ctxt - conn - timestamp - payload - delayed_transactions - in - let kernel_upgrade = - match ctxt.session.pending_upgrade with - | Some {injected_before; kernel_upgrade} - when injected_before = ctxt.session.next_blueprint_number -> - Some kernel_upgrade - | _ -> None - in + Misc.with_timing_f_e Blueprint_events.blueprint_applied @@ fun () -> + let* evm_state, context, current_block, applied_kernel_upgrade, split_info = + let* () = apply_evm_events conn ctxt events in + apply_blueprint_store_unsafe + ctxt + conn + timestamp + payload + delayed_transactions + in + let kernel_upgrade = + match ctxt.session.pending_upgrade with + | Some {injected_before; kernel_upgrade} + when injected_before = ctxt.session.next_blueprint_number -> + Some kernel_upgrade + | _ -> None + in - let*? current_block = - Transaction_object.reconstruct_block payload current_block - in + let*? current_block = + Transaction_object.reconstruct_block payload current_block + in - let*! () = - on_new_head - ?split_info - ctxt - ~applied_upgrade:applied_kernel_upgrade - evm_state - context - current_block - { - delayed_transactions; - kernel_upgrade; - blueprint = - {number = ctxt.session.next_blueprint_number; timestamp; payload}; - } - in - return current_block + let*! () = + on_new_head + ?split_info + ctxt + ~applied_upgrade:applied_kernel_upgrade + evm_state + context + current_block + { + delayed_transactions; + kernel_upgrade; + blueprint = + {number = ctxt.session.next_blueprint_number; timestamp; payload}; + } in - return_unit + return current_block and apply_evm_event_unsafe ctxt conn evm_state event latest_finalized_level = let open Lwt_result_syntax in @@ -1061,7 +1057,7 @@ module State = struct prepare_local_flushed_blueprint ctxt parent_hash flushed_blueprint in (* Apply the blueprint. *) - let* () = + let* _block = apply_blueprint ~events ctxt conn timestamp payload delayed_transactions in return ctxt.session.evm_state @@ -1479,7 +1475,7 @@ module State = struct let events = Blueprint_types.events_of_blueprint_with_events blueprint_with_events in - let* () = + let* _block = apply_blueprint ~events ctxt @@ -1660,13 +1656,22 @@ module Handlers = struct protect @@ fun () -> let ctxt = Worker.state self in State.Transaction.run ctxt @@ fun ctxt conn -> - State.apply_blueprint - ?events - ctxt - conn - timestamp - payload - delayed_transactions + let* block = + State.apply_blueprint + ?events + ctxt + conn + timestamp + payload + delayed_transactions + in + let tx_hashes = + match block.transactions with + | TxHash tx_hashes -> List.to_seq tx_hashes + | TxFull tx_objects -> + List.to_seq tx_objects |> Seq.map Transaction_object.hash + in + return tx_hashes | Last_known_L1_level -> ( protect @@ fun () -> let ctxt = Worker.state self in diff --git a/etherlink/bin_node/lib_dev/evm_context.mli b/etherlink/bin_node/lib_dev/evm_context.mli index d4493a44832d..eed3ec66b7b5 100644 --- a/etherlink/bin_node/lib_dev/evm_context.mli +++ b/etherlink/bin_node/lib_dev/evm_context.mli @@ -82,14 +82,15 @@ val apply_evm_events : (** [apply_blueprint ?events timestamp payload delayed_transactions] applies [payload] in the freshest EVM state stored under [ctxt] at timestamp [timestamp], forwards the - {!Blueprint_types.with_events}. It commits the result if the - blueprint produces the expected block. *) + {!Blueprint_types.with_events}, and returns the transaction hashes + of the created block. It commits the result if the blueprint + produces the expected block. *) val apply_blueprint : ?events:Evm_events.t list -> Time.Protocol.t -> Blueprint_types.payload -> Evm_events.Delayed_transaction.t list -> - unit tzresult Lwt.t + Ethereum_types.hash Seq.t tzresult Lwt.t val head_info : unit -> head Lwt.t diff --git a/etherlink/bin_node/lib_dev/evm_context_types.ml b/etherlink/bin_node/lib_dev/evm_context_types.ml index 03390367988b..092f98700ced 100644 --- a/etherlink/bin_node/lib_dev/evm_context_types.ml +++ b/etherlink/bin_node/lib_dev/evm_context_types.ml @@ -20,7 +20,7 @@ module Request = struct payload : Blueprint_types.payload; delayed_transactions : Evm_events.Delayed_transaction.t list; } - -> (unit, tztrace) t + -> (Ethereum_types.hash Seq.t, tztrace) t | Last_known_L1_level : (int32 option, tztrace) t | Delayed_inbox_hashes : (Ethereum_types.hash list, tztrace) t | Patch_state : { diff --git a/etherlink/bin_node/lib_dev/observer.ml b/etherlink/bin_node/lib_dev/observer.ml index 34dda6e80786..85960dc1f9be 100644 --- a/etherlink/bin_node/lib_dev/observer.ml +++ b/etherlink/bin_node/lib_dev/observer.ml @@ -53,7 +53,8 @@ let on_new_blueprint evm_node_endpoint next_blueprint_number match reorg with | Some level -> return (`Check_for_reorg level) | None -> return `Continue) - | Ok () | Error (Node_error.Diverged {must_exit = false; _} :: _) -> + | Ok _ (* tx_hashes *) + | Error (Node_error.Diverged {must_exit = false; _} :: _) -> return `Continue | Error err -> fail err else if Z.(lt level number) then diff --git a/etherlink/bin_node/lib_dev/sequencer.ml b/etherlink/bin_node/lib_dev/sequencer.ml index 62da7d22ea27..56028150ba10 100644 --- a/etherlink/bin_node/lib_dev/sequencer.ml +++ b/etherlink/bin_node/lib_dev/sequencer.ml @@ -131,7 +131,7 @@ let main ~data_dir ?(genesis_timestamp = Misc.now ()) ~cctxt ~smart_rollup_address:smart_rollup_address_b58 ~chunks:genesis_chunks in - let* () = + let* _tx_hashes = Evm_context.apply_blueprint genesis_timestamp genesis_payload [] in Blueprints_publisher.publish -- GitLab From 67ee0461ec0192f50940bf1a533cc90c1db53a8e Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Wed, 26 Feb 2025 15:06:48 +0100 Subject: [PATCH 2/3] evm/node: allows configuration of drop span in tx_queue --- etherlink/bin_node/config/configuration.ml | 24 ++++++++++++------- etherlink/bin_node/config/configuration.mli | 6 ++++- etherlink/bin_node/lib_dev/tx_queue.ml | 12 ++++++---- .../EVM Node- describe config.out | 3 ++- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/etherlink/bin_node/config/configuration.ml b/etherlink/bin_node/config/configuration.ml index 4903183c81fd..0188a2535aac 100644 --- a/etherlink/bin_node/config/configuration.ml +++ b/etherlink/bin_node/config/configuration.ml @@ -64,18 +64,26 @@ let chain_id_encoding : Ethereum_types.chain_id Data_encoding.t = type l2_chain = {chain_id : Ethereum_types.chain_id} -type tx_queue = {max_size : int; max_transaction_batch_length : int option} +type tx_queue = { + max_size : int; + max_transaction_batch_length : int option; + max_lifespan_s : int; +} + +let default_tx_queue = + {max_size = 1000; max_transaction_batch_length = None; max_lifespan_s = 2} let tx_queue_encoding = let open Data_encoding in conv - (fun {max_size; max_transaction_batch_length} -> - (max_size, max_transaction_batch_length)) - (fun (max_size, max_transaction_batch_length) -> - {max_size; max_transaction_batch_length}) - (obj2 (req "max_size" int31) (opt "max_transaction_batch_length" int31)) - -let default_tx_queue = {max_size = 1000; max_transaction_batch_length = None} + (fun {max_size; max_transaction_batch_length; max_lifespan_s} -> + (max_size, max_transaction_batch_length, max_lifespan_s)) + (fun (max_size, max_transaction_batch_length, max_lifespan_s) -> + {max_size; max_transaction_batch_length; max_lifespan_s}) + (obj3 + (req "max_size" int31) + (opt "max_transaction_batch_length" int31) + (dft "max_lifespan" int31 default_tx_queue.max_lifespan_s)) let tx_queue_opt_encoding = let open Data_encoding in diff --git a/etherlink/bin_node/config/configuration.mli b/etherlink/bin_node/config/configuration.mli index bcf09762d7b7..f08ab7054f9f 100644 --- a/etherlink/bin_node/config/configuration.mli +++ b/etherlink/bin_node/config/configuration.mli @@ -87,7 +87,11 @@ val chain_id : supported_network -> Ethereum_types.chain_id type l2_chain = {chain_id : Ethereum_types.chain_id} -type tx_queue = {max_size : int; max_transaction_batch_length : int option} +type tx_queue = { + max_size : int; + max_transaction_batch_length : int option; + max_lifespan_s : int; +} (** Configuration settings for experimental features, with no backward compatibility guarantees. *) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index e6a7be62f3d8..e88dbf44abea 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -8,8 +8,6 @@ open Tezos_workers -let two_seconds = Ptime.Span.of_int_s 2 - type parameters = {evm_node_endpoint : Uri.t; config : Configuration.tx_queue} type queue_variant = [`Accepted | `Refused] @@ -84,13 +82,13 @@ module Pending_transactions = struct Some pending | None -> None - let drop htbl = + let drop ~max_lifespan htbl = let now = Time.System.now () in let dropped = ref [] in S.filter_map_inplace (fun _hash pending -> let lifespan = Ptime.diff now pending.since in - if Ptime.Span.compare lifespan two_seconds > 0 then ( + if Ptime.Span.compare lifespan max_lifespan > 0 then ( dropped := pending :: !dropped ; None) else Some pending) @@ -329,7 +327,11 @@ module Handlers = struct transactions_to_inject in - let txns = Pending_transactions.drop state.pending in + let txns = + Pending_transactions.drop + ~max_lifespan:(Ptime.Span.of_int_s state.config.max_lifespan_s) + state.pending + in List.iter (fun {pending_callback; _} -> Lwt.async (fun () -> pending_callback `Dropped)) diff --git a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out index e3c10c5d782e..89ea48088d04 100644 --- a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out +++ b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out @@ -181,7 +181,8 @@ /* tx queue configuration */ { /* Some */ "max_size": integer ∈ [-2^30, 2^30], - "max_transaction_batch_length"?: integer ∈ [-2^30, 2^30] } + "max_transaction_batch_length"?: integer ∈ [-2^30, 2^30], + "max_lifespan"?: integer ∈ [-2^30, 2^30] } || null /* None */ || boolean -- GitLab From 4afcccfba695c6e1e5c2528bc0c425ff5c9dce52 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Wed, 26 Feb 2025 14:56:48 +0100 Subject: [PATCH 3/3] evm/node: confirm tx to tx_queue on new blueprint --- etherlink/bin_node/lib_dev/observer.ml | 14 +++-- etherlink/bin_node/lib_dev/tx_queue.ml | 5 ++ etherlink/bin_node/lib_dev/tx_queue_events.ml | 20 +++++++ .../bin_node/lib_dev/tx_queue_events.mli | 8 +++ etherlink/tezt/lib/evm_node.ml | 17 +++++- etherlink/tezt/lib/evm_node.mli | 22 ++++++-- etherlink/tezt/tests/evm_sequencer.ml | 52 ++++++++++++++++++- 7 files changed, 127 insertions(+), 11 deletions(-) diff --git a/etherlink/bin_node/lib_dev/observer.ml b/etherlink/bin_node/lib_dev/observer.ml index 85960dc1f9be..00b8f4fa84b3 100644 --- a/etherlink/bin_node/lib_dev/observer.ml +++ b/etherlink/bin_node/lib_dev/observer.ml @@ -8,6 +8,12 @@ open Ethereum_types +let confirm_txs config tx_hashes = + let open Lwt_result_syntax in + if Configuration.is_tx_queue_enabled config then + Seq.iter_ep (fun hash -> Tx_queue.confirm hash) tx_hashes + else return_unit + (** [on_new_blueprint evm_node_endpoint next_blueprint_number blueprint] applies evm events found in the blueprint, then applies the blueprint itself. @@ -21,7 +27,7 @@ open Ethereum_types into a forced blueprint. The sequencer has performed a reorganization and starts submitting blocks from the new branch. *) -let on_new_blueprint evm_node_endpoint next_blueprint_number +let on_new_blueprint config evm_node_endpoint next_blueprint_number (({delayed_transactions; blueprint; _} : Blueprint_types.with_events) as blueprint_with_events) = let open Lwt_result_syntax in @@ -53,7 +59,9 @@ let on_new_blueprint evm_node_endpoint next_blueprint_number match reorg with | Some level -> return (`Check_for_reorg level) | None -> return `Continue) - | Ok _ (* tx_hashes *) + | Ok tx_hashes -> + let* () = confirm_txs config tx_hashes in + return `Continue | Error (Node_error.Diverged {must_exit = false; _} :: _) -> return `Continue | Error err -> fail err @@ -317,7 +325,7 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync ~time_between_blocks ~evm_node_endpoint ~next_blueprint_number - (on_new_blueprint evm_node_endpoint) + (on_new_blueprint config evm_node_endpoint) and* () = if Configuration.is_tx_queue_enabled config then Tx_queue.beacon ~tick_interval:0.05 diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index e88dbf44abea..0687adc9a5ed 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -272,6 +272,11 @@ module Handlers = struct match request with | Inject {payload; tx_object; callback} -> let pending_callback (reason : pending_variant) = + let*! () = + match reason with + | `Dropped -> Tx_queue_events.transaction_dropped tx_object.hash + | `Confirmed -> Tx_queue_events.transaction_confirmed tx_object.hash + in Tx_object.remove state.tx_object tx_object.hash ; callback (reason :> all_variant) in diff --git a/etherlink/bin_node/lib_dev/tx_queue_events.ml b/etherlink/bin_node/lib_dev/tx_queue_events.ml index 3e17dcda574a..f87890652692 100644 --- a/etherlink/bin_node/lib_dev/tx_queue_events.ml +++ b/etherlink/bin_node/lib_dev/tx_queue_events.ml @@ -49,6 +49,22 @@ let add_transaction = ~pp1:(fun fmt Ethereum_types.(Hash (Hex h)) -> Format.fprintf fmt "%10s" h) ("tx_hash", Ethereum_types.hash_encoding) +let transaction_dropped = + declare_1 + ~name:"tx_queue_transaction_dropped" + ~msg:"transaction {tx_hash} dropped" + ~level:Debug + ~pp1:(fun fmt Ethereum_types.(Hash (Hex h)) -> Format.fprintf fmt "%10s" h) + ("tx_hash", Ethereum_types.hash_encoding) + +let transaction_confirmed = + declare_1 + ~name:"tx_queue_transaction_confirmed" + ~msg:"transaction {tx_hash} confirmed" + ~level:Debug + ~pp1:(fun fmt Ethereum_types.(Hash (Hex h)) -> Format.fprintf fmt "%10s" h) + ("tx_hash", Ethereum_types.hash_encoding) + let is_ready () = emit is_ready () let shutdown () = emit shutdown () @@ -57,5 +73,9 @@ let injecting_transactions n = emit injecting_transactions n let add_transaction tx = emit add_transaction tx +let transaction_dropped tx = emit transaction_dropped tx + +let transaction_confirmed tx = emit transaction_confirmed tx + let rpc_error (error : Rpc_encodings.JSONRPC.error) = emit rpc_error (Int32.of_int error.code, error.message) diff --git a/etherlink/bin_node/lib_dev/tx_queue_events.mli b/etherlink/bin_node/lib_dev/tx_queue_events.mli index 305ce9aa3276..87b58ccea2a5 100644 --- a/etherlink/bin_node/lib_dev/tx_queue_events.mli +++ b/etherlink/bin_node/lib_dev/tx_queue_events.mli @@ -21,5 +21,13 @@ val injecting_transactions : int -> unit Lwt.t queue. *) val add_transaction : Ethereum_types.hash -> unit Lwt.t +(** [transaction_dropped tx_hash] Advertises [tx_hash] was dropped to the tx + queue. *) +val transaction_dropped : Ethereum_types.hash -> unit Lwt.t + +(** [transaction_confirmed tx_hash] Advertises [tx_hash] was confirmed + to the tx queue. *) +val transaction_confirmed : Ethereum_types.hash -> unit Lwt.t + (** [rpc_error error] advertises an RPC produced the error [error]. *) val rpc_error : Rpc_encodings.JSONRPC.error -> unit Lwt.t diff --git a/etherlink/tezt/lib/evm_node.ml b/etherlink/tezt/lib/evm_node.ml index 7840137ef7b8..4559e1b6a90c 100644 --- a/etherlink/tezt/lib/evm_node.ml +++ b/etherlink/tezt/lib/evm_node.ml @@ -570,6 +570,10 @@ let wait_for_tx_queue_add_transaction ?timeout evm_node = wait_for_event ?timeout evm_node ~event:"tx_queue_add_transaction.v0" @@ fun json -> JSON.(json |> as_string |> Option.some) +let wait_for_tx_queue_transaction_confirmed ?timeout evm_node = + wait_for_event ?timeout evm_node ~event:"tx_queue_transaction_confirmed.v0" + @@ fun json -> JSON.(json |> as_string |> Option.some) + let wait_for_tx_queue_injecting_transaction ?timeout evm_node = wait_for_event ?timeout evm_node ~event:"tx_queue_injecting_transaction.v0" @@ fun json -> JSON.(json |> as_int |> Option.some) @@ -1265,11 +1269,13 @@ let optional_json_put ~name v f json = (name, JSON.annotate ~origin:"evm_node.config_patch" @@ value_json) json +type tx_queue_config = {max_size : int; max_lifespan : int} + let patch_config_with_experimental_feature ?(drop_duplicate_when_injection = false) ?(blueprints_publisher_order_enabled = false) ?(next_wasm_runtime = true) ?rpc_server ?(enable_websocket = false) ?max_websocket_message_length - ?(enable_tx_queue = false) ?spawn_rpc () = + ?(enable_tx_queue = false) ?tx_queue_config ?spawn_rpc () = JSON.update "experimental_features" @@ fun json -> conditional_json_put drop_duplicate_when_injection @@ -1291,7 +1297,14 @@ let patch_config_with_experimental_feature |> conditional_json_put enable_tx_queue ~name:"enable_tx_queue" - (`O [("max_size", `Float 1000.)]) + (match tx_queue_config with + | Some {max_size; max_lifespan} -> + `O + [ + ("max_size", `Float (Float.of_int max_size)); + ("max_lifespan", `Float (Float.of_int max_lifespan)); + ] + | None -> `Bool true) |> optional_json_put max_websocket_message_length ~name:"max_websocket_message_length" diff --git a/etherlink/tezt/lib/evm_node.mli b/etherlink/tezt/lib/evm_node.mli index 3087942eebca..f6c6742bd4ac 100644 --- a/etherlink/tezt/lib/evm_node.mli +++ b/etherlink/tezt/lib/evm_node.mli @@ -299,11 +299,16 @@ val spawn_init_config_minimal : type rpc_server = Resto | Dream -(** [patch_config_with_experimental_feature ?drop_duplicate_when_injection - ?next_wasm_runtime ?rpc_server ?enable_websocket - ?max_websocket_message_length json_config] patches a config to add - experimental feature. Each optional argument add the correspondent - experimental feature. *) +type tx_queue_config = {max_size : int; max_lifespan : int} + +(** [patch_config_with_experimental_feature + ?drop_duplicate_when_injection ?next_wasm_runtime ?rpc_server + ?enable_websocket ?max_websocket_message_length json_config] + patches a config to add experimental feature. Each optional + argument add the correspondent experimental feature. + + if [enable_tx_queue] is true then set the config to [true] or to + [tx_queue_config] if it's defined. *) val patch_config_with_experimental_feature : ?drop_duplicate_when_injection:bool -> ?blueprints_publisher_order_enabled:bool -> @@ -312,6 +317,7 @@ val patch_config_with_experimental_feature : ?enable_websocket:bool -> ?max_websocket_message_length:int -> ?enable_tx_queue:bool -> + ?tx_queue_config:tx_queue_config -> ?spawn_rpc:int -> unit -> JSON.t -> @@ -403,6 +409,12 @@ val wait_for_tx_pool_add_transaction : ?timeout:float -> t -> string Lwt.t hash. *) val wait_for_tx_queue_add_transaction : ?timeout:float -> t -> string Lwt.t +(** [wait_for_tx_queue_transaction_confirmed ?timeout evm_node] waits + for the event [tx_queue_transaction_confirmed.v0] using {!wait_for} + and returns the transaction hash. *) +val wait_for_tx_queue_transaction_confirmed : + ?timeout:float -> t -> string Lwt.t + (** [wait_for_tx_queue_injecting_transaction ?timeout evm_node] waits for the event [tx_queue_injecting_transaction.v0] using {!wait_for} and returns the number of transactions injected. *) diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index d34c63b47dbf..85d21d7c008c 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -10240,10 +10240,24 @@ let test_tx_queue = ~kernels:[Latest] (* node only test *) ~use_threshold_encryption:Register_without_feature ~use_dal:Register_without_feature - ~enable_tx_queue:true ~websockets:false ~title:"Submits a transaction to an observer with a tx queue." @@ fun {sequencer; observer; _} _protocol -> + let* () = Evm_node.terminate observer in + + let* () = + Evm_node.Config_file.update observer + @@ Evm_node.patch_config_with_experimental_feature + ~enable_tx_queue:true + ~tx_queue_config: + { + max_size = 1000; + max_lifespan = 100000 (* absurd value so no TX are dropped *); + } + () + in + let* () = Evm_node.run observer in + let* () = let*@ _ = produce_block sequencer in unit @@ -10334,6 +10348,42 @@ let test_tx_queue = and* _ = observer_wait_tx_added and* _ = observer_wait_tx_injected and* _ = sequencer_wait_tx_added in + + Log.info + "Produce enough block to include all txs and make sure they are confirmed \ + by the observer" ; + (* Checks that all txs were confirmed in the observer *) + let observer_wait_tx_confirmed = + let waiter () = + let* _ = Evm_node.wait_for_tx_queue_transaction_confirmed observer in + return 1 + in + wait_for_all_tx_process ~name:"tx confirmed in observer" ~waiter + in + (* Checks that all txs were included in a block by the sequencer *) + let* () = + let res = ref None in + let _p = + let* () = + let waiter () = + let* _hash = Evm_node.wait_for_block_producer_tx_injected sequencer in + return 1 + in + wait_for_all_tx_process ~name:"tx included by sequencer" ~waiter + in + res := Some () ; + unit + in + let result_f () = return !res in + bake_until + ~__LOC__ + ~bake:(fun () -> + let*@ _ = produce_block sequencer in + unit) + ~result_f + () + and* _ = observer_wait_tx_confirmed in + Log.info "Verifying that all transactions can be retrieved both in the observer and \ in the sequencer" ; -- GitLab