diff --git a/etherlink/bin_courier/main.ml b/etherlink/bin_courier/main.ml index 339556abb1765d292906ec68bd6efbe65d2b7720..68b676afabab198fa19f03184788eaa01aba4f6b 100644 --- a/etherlink/bin_courier/main.ml +++ b/etherlink/bin_courier/main.ml @@ -135,9 +135,8 @@ module Constant = struct Efunc_core.Types.a "ff00000000000000000000000000000000000001" end -let start_blueprint_follower ~relay_endpoint = +let start_blueprint_follower ~container ~relay_endpoint ~rpc_endpoint = let open Lwt_result_syntax in - let open Floodgate_lib in let* next_blueprint_number = Batch.call (module Rpc_encodings.Block_number) @@ -149,10 +148,14 @@ let start_blueprint_follower ~relay_endpoint = let* time_between_blocks = Evm_services.get_time_between_blocks ~fallback:(Time_between_blocks 10.) - ~evm_node_endpoint:relay_endpoint - ~timeout:10. + ~evm_node_endpoint:rpc_endpoint + ~timeout:Network_info.timeout () in + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in Blueprints_follower.start ~multichain:false ~time_between_blocks @@ -164,7 +167,10 @@ let start_blueprint_follower ~relay_endpoint = let*! () = Floodgate_events.received_blueprint number in let* () = match Blueprint_decoder.transaction_hashes blueprint with - | Ok hashes -> List.iter_es Tx_queue.confirm hashes + | Ok hashes -> + Tx_container.confirm_transactions + ~clear_pending_queue_after:false + ~confirmed_txs:(List.to_seq hashes) | Error _ -> return_unit in return (`Continue Blueprints_follower.{sbl_callbacks_activated = false})) @@ -179,29 +185,18 @@ let withdraw_data receiver = let receiver = Tezos_crypto.Signature.Public_key_hash.to_b58check receiver in Efunc_core.Evm.encode ~name:"withdraw_base58" [`string] [`string receiver] -let withdraw ~endpoint ~infos value from receiver = +let withdraw ~container ~endpoint ~infos value from receiver = let open Lwt_result_syntax in + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in let result, rwaker = Lwt.task () in - let hash, hwaker = Lwt.task () in - let*! () = - Tx_queue.transfer - ~callback: - (let open Lwt_syntax in - function - | `Confirmed -> - let* () = Events.confirmed_withdrawal () in - Lwt.wakeup rwaker (Ok ()) ; - return_unit - | `Refused -> - Lwt.wakeup rwaker (error_with "Withdrawal refused") ; - return_unit - | `Dropped -> - Lwt.wakeup rwaker (error_with "Withdrawal dropped") ; - return_unit - | `Accepted hash -> - Lwt.wakeup hwaker hash ; - Events.accepted_transaction hash) - ~gas_limit:(Z.of_int 16_150_912) + let gas_limit = Z.of_int 16_150_912 in + let fees = Z.(gas_limit * infos.Network_info.base_fee_per_gas) in + let* raw_tx, transaction_object = + Craft.transfer_with_obj_exn + ~gas_limit ~infos ~from ~to_:Constant.withdrawal_contract @@ -209,15 +204,43 @@ let withdraw ~endpoint ~infos value from receiver = ~data:(withdraw_data receiver) () in + let txn_hash = Transaction_object.hash transaction_object in + let callback reason = + match reason with + | `Confirmed -> + let*! () = Events.confirmed_withdrawal () in + Lwt.wakeup rwaker (Ok ()) ; + Account.debit from Z.(value + fees) ; + Account.increment_nonce from ; + Lwt.return_unit + | `Refused -> + Lwt.wakeup rwaker (error_with "Withdrawal refused") ; + Lwt.return_unit + | `Dropped -> + Lwt.wakeup rwaker (error_with "Withdrawal dropped") ; + Lwt.return_unit + | `Accepted -> + let*! () = Events.accepted_transaction txn_hash in + Lwt.return_unit + in + let next_nonce = Ethereum_types.quantity_of_z from.nonce in + let* add_res = + Tx_container.add ~callback ~next_nonce transaction_object ~raw_tx + in + let* () = + match add_res with + | Ok (_ : Ethereum_types.hash) -> return_unit + | Error e -> + Lwt.fail_with @@ Format.asprintf "Error adding to the tx_queue: %s" e + in let* () = result in - let*! hash in let* receipt_rpc_result = Batch.call (module Rpc_encodings.Get_transaction_receipt) ~keep_alive:true ~timeout:10. ~evm_node_endpoint:endpoint - hash + txn_hash in match receipt_rpc_result with | None -> failwith "Could not find the receipt" @@ -252,18 +275,37 @@ let command = @@ stop) (fun network amount signer receiver _ -> let open Lwt_result_syntax in - let* () = - Tx_queue.start + let start_container, container = + Evm_node_lib_dev.Tx_queue.tx_container ~chain_family:EVM + in + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in + let max_size = 999_999 in + let tx_per_addr_limit = Int64.of_int 999_999 in + let max_transaction_batch_length = Some 300 in + let max_lifespan_s = 2 in + let config : Evm_node_config.Configuration.tx_queue = + { + max_size; + max_transaction_batch_length; + max_lifespan_s; + tx_per_addr_limit; + } + in + let* () = start_container ~config ~keep_alive:true ~timeout:10. () in + let _ = + start_blueprint_follower + ~container ~relay_endpoint:(Network.relay network) - ~max_transaction_batch_length:None - ~inclusion_timeout:5. - () + ~rpc_endpoint:(Network.endpoint network) in - let _ = - start_blueprint_follower ~relay_endpoint:(Network.relay network) + Tx_container.tx_queue_beacon + ~evm_node_endpoint:(Rpc (Network.endpoint network)) + ~tick_interval:0.1 in - let _ = Tx_queue.beacon ~tick_interval:0.1 in let* infos = Floodgate_lib.Network_info.fetch ~rpc_endpoint:(Network.endpoint network) @@ -272,7 +314,13 @@ let command = let* from = Account.from_signer ~evm_node_endpoint:(Network.endpoint network) signer in - withdraw ~endpoint:(Network.endpoint network) ~infos amount from receiver) + withdraw + ~container + ~endpoint:(Network.endpoint network) + ~infos + amount + from + receiver) let global_options = Tezos_clic.no_options diff --git a/etherlink/bin_floodgate/lib_floodgate/craft.ml b/etherlink/bin_floodgate/lib_floodgate/craft.ml index 6ca618857d5848a0434e83d12bae97eebf12f5d6..5e2591a90d50799bd3657251af27b3c45e329517 100644 --- a/etherlink/bin_floodgate/lib_floodgate/craft.ml +++ b/etherlink/bin_floodgate/lib_floodgate/craft.ml @@ -60,3 +60,13 @@ let transfer_exn ?nonce ?to_ ?data ~value ~gas_limit ~infos ~from () = | Error err -> Stdlib.failwith (Format.asprintf "Could not craft the transfer: %a" pp_print_trace err) + +let transfer_with_obj_exn ?nonce ?to_ ?data ~value ~gas_limit ~infos ~from () = + let open Lwt_result_syntax in + let*! raw_tx = + transfer_exn ?nonce ~infos ~from ?to_ ~gas_limit ~value ?data () + in + let*? transaction_object = + Transaction_object.decode @@ Ethereum_types.hex_to_bytes raw_tx + in + return (raw_tx, transaction_object) diff --git a/etherlink/bin_floodgate/lib_floodgate/craft.mli b/etherlink/bin_floodgate/lib_floodgate/craft.mli index 1b5d7ad11c41b16b17a325e1e51446aa7491e3ba..846d78d7f651fb07e68724c0575e698e1963629a 100644 --- a/etherlink/bin_floodgate/lib_floodgate/craft.mli +++ b/etherlink/bin_floodgate/lib_floodgate/craft.mli @@ -20,6 +20,8 @@ val transfer : unit -> Ethereum_types.hex tzresult Lwt.t +(** Same as {!transfer}, but raises [Failure] instead of returning an + [Error] when the transaction cannot be crafted. *) val transfer_exn : ?nonce:Z.t -> ?to_:Efunc_core.Eth.address -> @@ -30,3 +32,18 @@ val transfer_exn : from:Account.t -> unit -> Ethereum_types.hex Lwt.t + +(** Same as {!transfer_exn}, but also returns the decoded + {!Transaction_object.t} corresponding to the signed transaction. + Returns [Error _] if decoding the raw transaction fails, and may + raise [Failure] if the transaction cannot be crafted. *) +val transfer_with_obj_exn : + ?nonce:Z.t -> + ?to_:Efunc_core.Eth.address -> + ?data:Efunc_core.Private.b -> + value:Z.t -> + gas_limit:Z.t -> + infos:Network_info.t -> + from:Account.t -> + unit -> + (Ethereum_types.hex * Transaction_object.t) tzresult Lwt.t diff --git a/etherlink/bin_floodgate/lib_floodgate/floodgate.ml b/etherlink/bin_floodgate/lib_floodgate/floodgate.ml index 03ab31f2518f2bf6b4616e20fbe320b74e63dc01..5c1465cd4e3dc1b19239f92ff659bb7bec31736a 100644 --- a/etherlink/bin_floodgate/lib_floodgate/floodgate.ml +++ b/etherlink/bin_floodgate/lib_floodgate/floodgate.ml @@ -6,6 +6,82 @@ (* *) (*****************************************************************************) +let start_container, container = + Evm_node_lib_dev.Tx_queue.tx_container ~chain_family:EVM + +let transfer ?(callback = fun _ -> Lwt.return_unit) ?to_ ?(value = Z.zero) + ?nonce ?data ~gas_limit ~infos ~from () = + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in + let open Lwt_result_syntax in + let fees = Z.(gas_limit * infos.Network_info.base_fee_per_gas) in + let callback reason = + (match reason with + | `Confirmed -> + Account.debit from Z.(value + fees) ; + Account.increment_nonce from + | _ -> ()) ; + callback reason + in + let* raw_tx, transaction_object = + Craft.transfer_with_obj_exn + ?nonce + ~infos + ~from + ?to_ + ~gas_limit + ~value + ?data + () + in + let next_nonce = Ethereum_types.quantity_of_z from.nonce in + let* add_res = + Tx_container.add ~callback ~next_nonce transaction_object ~raw_tx + in + match add_res with + | Ok (_ : Ethereum_types.hash) -> return_unit + | Error e -> + Lwt.fail_with @@ Format.asprintf "Error adding to the tx_queue: %s" e + +let send_raw_transaction ~relay_endpoint txn = + let open Lwt_result_syntax in + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in + let module Srt = Rpc_encodings.Send_raw_transaction in + let uuid_seed = Random.get_state () in + let req_id = Uuidm.(v4_gen uuid_seed () |> to_string ~upper:false) in + let txn = + Rpc_encodings.JSONRPC. + { + method_ = Srt.method_; + parameters = Some (Data_encoding.Json.construct Srt.input_encoding txn); + id = Some (Id_string req_id); + } + in + + let* response = + Rollup_services.call_service + ~keep_alive:true + ~base:relay_endpoint + ~timeout:Network_info.timeout + (Batch.dispatch_service ~path:Resto.Path.root) + () + () + txn + in + + match response.value with + | Ok res -> + let hash = Data_encoding.Json.destruct Srt.output_encoding res in + return hash + | Error error -> + let*! () = Floodgate_events.rpc_error error in + failwith "Could not send transaction" + type attempt = Always | Never | Number of int let one_xtz = Z.(of_int 1_000_000_000 * of_int 1_000_000_000) @@ -36,12 +112,12 @@ let controller_from_signer ~rpc_endpoint ~min_balance controller = wait for it to be confirmed. It will return an error if the transactions is not confirmed, either because it was dropped or refused. *) let send_transaction_and_wait ~infos ~gas_limit ~from ~to_ ~value = - let open Lwt_syntax in + let open Lwt_result_syntax in let result, waker = Lwt.wait () in let* () = - Tx_queue.transfer + transfer ~callback:(function - | `Accepted _ -> return_unit + | `Accepted -> Lwt.return_unit | `Confirmed -> Lwt.wakeup waker (Ok ()) ; Lwt.return_unit @@ -119,7 +195,7 @@ let spam_with_account ~txs_per_salvo ~token ~infos ~gas_limit account in let callback reason = match reason with - | `Accepted _ -> return_unit + | `Accepted -> return_unit | `Refused -> let* () = Floodgate_events.transaction_refused account in maybe_retry () @@ -143,15 +219,8 @@ let spam_with_account ~txs_per_salvo ~token ~infos ~gas_limit account let* () = Floodgate_events.transaction_dropped account in maybe_retry () in - Tx_queue.transfer - ~nonce - ~infos - ~callback - ~gas_limit - ~from:account - ~to_ - ?data - () + Misc.unwrap_error_monad @@ fun () -> + transfer ~nonce ~infos ~callback ~gas_limit ~from:account ~to_ ?data () in let* () = retry_transfer 0 in if not is_last_nonce then salvo ~start ~nonce_limit ~nonce:(Z.succ nonce) @@ -179,12 +248,26 @@ let rec get_transaction_receipt rpc_endpoint txn_hash = let*! () = Lwt_unix.sleep 0.1 in get_transaction_receipt rpc_endpoint txn_hash -let wait_for_receipt ~rpc_endpoint ?to_ ?value ?data ?nonce ~gas_limit ~infos - ~from () = - let open Lwt_syntax in +let wait_for_receipt ~rpc_endpoint ?to_ ?(value = Z.zero) ?data ?nonce + ~gas_limit ~infos ~from () = + let open Lwt_result_syntax in let res, waiter = Lwt.task () in - let txn_hash = ref Ethereum_types.(Hash (Hex "")) in - let callback = function + let* raw_tx, transaction_object = + Craft.transfer_with_obj_exn + ?nonce + ~infos + ~from + ?to_ + ~gas_limit + ~value + ?data + () + in + let txn_hash = Transaction_object.hash transaction_object in + let fees = Z.(gas_limit * infos.Network_info.base_fee_per_gas) in + let callback status = + let open Lwt_syntax in + match status with | `Refused -> Lwt.wakeup waiter @@ Result_syntax.tzfail @@ -199,25 +282,27 @@ let wait_for_receipt ~rpc_endpoint ?to_ ?value ?data ?nonce ~gas_limit ~infos "Could not get the transaction receipt: transaction was \ dropped]") ; return_unit - | `Accepted hash -> - txn_hash := hash ; - return_unit + | `Accepted -> return_unit | `Confirmed -> - let* result = get_transaction_receipt rpc_endpoint !txn_hash in + Account.debit from Z.(value + fees) ; + Account.increment_nonce from ; + let* result = get_transaction_receipt rpc_endpoint txn_hash in Lwt.wakeup waiter result ; return_unit in + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in + let next_nonce = Ethereum_types.quantity_of_z from.nonce in + let* add_res = + Tx_container.add ~callback ~next_nonce transaction_object ~raw_tx + in let* () = - Tx_queue.transfer - ~callback - ?to_ - ?value - ?data - ?nonce - ~gas_limit - ~infos - ~from - () + match add_res with + | Ok (_ : Ethereum_types.hash) -> return_unit + | Error e -> + Lwt.fail_with @@ Format.asprintf "Error adding to the tx_queue: %s" e in res @@ -266,8 +351,8 @@ let fund_fresh_account ~infos ~relay_endpoint ~initial_balance ~gas_limit ~value:Z.(node.balance - fees - fees) () in - let* _ = Tx_queue.Misc.send_raw_transaction ~relay_endpoint txn in - let* _ = Tx_queue.Misc.send_raw_transaction ~relay_endpoint txn' in + let* _ = send_raw_transaction ~relay_endpoint txn in + let* _ = send_raw_transaction ~relay_endpoint txn' in let*! () = Floodgate_events.reimbursed_controller node in return_unit) in @@ -384,6 +469,10 @@ let start_new_head_monitor ~ws_uri = in let* () = Websocket_client.connect ws_client in let* heads_subscription = Websocket_client.subscribe_newHeads ws_client in + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in lwt_stream_iter_es (fun head -> let*? block = head in @@ -393,7 +482,9 @@ let start_new_head_monitor ~ws_uri = match block.Ethereum_types.transactions with | TxHash hashes -> State.incr_transactions_count (List.length hashes) ; - List.iter_es Tx_queue.confirm hashes + Tx_container.confirm_transactions + ~clear_pending_queue_after:false + ~confirmed_txs:(List.to_seq hashes) | TxFull _ -> return_unit) heads_subscription.stream @@ -414,6 +505,10 @@ let start_blueprint_follower ~relay_endpoint ~rpc_endpoint = ~timeout:Network_info.timeout () in + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in Blueprints_follower.start ~multichain:false ~time_between_blocks @@ -427,7 +522,9 @@ let start_blueprint_follower ~relay_endpoint ~rpc_endpoint = match Blueprint_decoder.transaction_hashes blueprint with | Ok hashes -> State.incr_transactions_count (List.length hashes) ; - List.iter_es Tx_queue.confirm hashes + Tx_container.confirm_transactions + ~clear_pending_queue_after:false + ~confirmed_txs:(List.to_seq hashes) | Error _ -> return_unit in return (`Continue Blueprints_follower.{sbl_callbacks_activated = false})) @@ -440,10 +537,12 @@ let start_blueprint_follower ~relay_endpoint ~rpc_endpoint = let run ~(scenario : [< `ERC20 | `XTZ]) ~relay_endpoint ~rpc_endpoint ~ws_endpoint ~controller ~max_active_eoa ~max_transaction_batch_length - ~spawn_interval ~tick_interval ~base_fee_factor ~initial_balance - ~txs_per_salvo ~elapsed_time_between_report ~dummy_data_size ~retry_attempt - = + ~max_lifespan_s ~spawn_interval ~tick_interval ~base_fee_factor + ~initial_balance ~txs_per_salvo ~elapsed_time_between_report + ~dummy_data_size ~retry_attempt = State.dummy_data_size := dummy_data_size ; + let tx_per_addr_limit = Int64.of_int 999_999 in + let max_size = 999_999 in let open Lwt_result_syntax in let* controller = controller_from_signer @@ -452,13 +551,23 @@ let run ~(scenario : [< `ERC20 | `XTZ]) ~relay_endpoint ~rpc_endpoint controller in let* infos = Network_info.fetch ~rpc_endpoint ~base_fee_factor in - let* () = Tx_queue.start ~relay_endpoint ~max_transaction_batch_length () in + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in + let config : Configuration.tx_queue = + {max_size; max_transaction_batch_length; max_lifespan_s; tx_per_addr_limit} + in + let* () = start_container ~config ~keep_alive:true ~timeout:10. () in let*! () = Floodgate_events.is_ready infos.chain_id infos.base_fee_per_gas in let* () = match ws_endpoint with | Some ws_uri -> start_new_head_monitor ~ws_uri | None -> start_blueprint_follower ~relay_endpoint ~rpc_endpoint - and* () = Tx_queue.beacon ~tick_interval + and* () = + Tx_container.tx_queue_beacon + ~evm_node_endpoint:(Rpc rpc_endpoint) + ~tick_interval and* () = let* token, gas_limit = prepare_scenario ~rpc_endpoint ~scenario ~dummy_data_size infos controller diff --git a/etherlink/bin_floodgate/lib_floodgate/floodgate.mli b/etherlink/bin_floodgate/lib_floodgate/floodgate.mli index ef83cca1f386333eac93a0e4ab467b9304a588e0..5db081a377146a616cef3fe2afdaf99f36b26d7f 100644 --- a/etherlink/bin_floodgate/lib_floodgate/floodgate.mli +++ b/etherlink/bin_floodgate/lib_floodgate/floodgate.mli @@ -16,6 +16,7 @@ val run : controller:Signer.t -> max_active_eoa:int -> max_transaction_batch_length:int option -> + max_lifespan_s:int -> spawn_interval:float -> tick_interval:float -> base_fee_factor:float -> diff --git a/etherlink/bin_floodgate/lib_floodgate/tx_queue.ml b/etherlink/bin_floodgate/lib_floodgate/tx_queue.ml deleted file mode 100644 index 89c242023f4e801d357a94f97f9fdf2d62ad64f9..0000000000000000000000000000000000000000 --- a/etherlink/bin_floodgate/lib_floodgate/tx_queue.ml +++ /dev/null @@ -1,429 +0,0 @@ -(*****************************************************************************) -(* *) -(* SPDX-License-Identifier: MIT *) -(* Copyright (c) 2025 Functori *) -(* Copyright (c) 2025 Nomadic Labs *) -(* *) -(*****************************************************************************) - -open Tezos_workers - -type parameters = { - relay_endpoint : Uri.t; - max_transaction_batch_length : int option; - inclusion_timeout : float; -} - -type callback = - [`Accepted of Ethereum_types.hash | `Confirmed | `Dropped | `Refused] -> - unit Lwt.t - -type request = {payload : Ethereum_types.hex; callback : callback} - -type pending = {callback : callback; since : Ptime.t} - -module Pending_transactions = struct - open Ethereum_types - module S = String.Hashtbl - - type t = pending S.t - - let empty () = S.create 1000 - - let add htbl (Hash (Hex hash)) callback = - S.add htbl hash ({callback; since = Time.System.now ()} : pending) - - let pop htbl (Hash (Hex hash)) = - match S.find htbl hash with - | Some pending -> - S.remove htbl hash ; - Some pending - | None -> None - - let drop timeout htbl = - let now = Time.System.now () in - let dropped = ref [] in - S.filter_map_inplace - (fun _hash pending -> - let lifespan = Ptime.diff now pending.since in - if Ptime.Span.compare lifespan timeout > 0 then ( - dropped := pending :: !dropped ; - None) - else Some pending) - htbl ; - !dropped -end - -type state = { - relay_endpoint : Uri.t; - mutable queue : request Queue.t; - pending : Pending_transactions.t; - max_transaction_batch_length : int option; - inclusion_timeout : Ptime.Span.t; -} - -module Types = struct - type nonrec state = state - - type nonrec parameters = parameters -end - -module Name = struct - type t = unit - - let encoding = Data_encoding.unit - - let base = ["floodgate"; "injector"] - - let pp _fmt () = () - - let equal () () = true -end - -module Request = struct - type ('a, 'b) t = - | Inject : { - payload : Ethereum_types.hex; - callback : callback; - } - -> (unit, tztrace) t - | Confirm : {txn_hash : Ethereum_types.hash} -> (unit, tztrace) t - | Tick : (unit, tztrace) t - - type view = View : _ t -> view - - let view req = View req - - let encoding = - let open Data_encoding in - (* This encoding is used to encode only *) - union - [ - case - Json_only - ~title:"Inject" - (obj2 - (req "request" (constant "inject")) - (req "payload" Ethereum_types.hex_encoding)) - (function - | View (Inject {payload; _}) -> Some ((), payload) | _ -> None) - (fun _ -> assert false); - case - Json_only - ~title:"Confirm" - (obj2 - (req "request" (constant "confirm")) - (req "transaction_hash" Ethereum_types.hash_encoding)) - (function - | View (Confirm {txn_hash}) -> Some ((), txn_hash) | _ -> None) - (fun _ -> assert false); - case - Json_only - ~title:"Tick" - (obj1 (req "request" (constant "flush"))) - (function View Tick -> Some () | _ -> None) - (fun _ -> assert false); - ] - - let pp fmt (View r) = - let open Format in - match r with - | Inject {payload = Hex txn; _} -> fprintf fmt "Inject %s" txn - | Confirm {txn_hash = Hash (Hex txn_hash)} -> - fprintf fmt "Confirm %s" txn_hash - | Tick -> fprintf fmt "Tick" -end - -module Worker = Worker.MakeSingle (Name) (Request) (Types) - -type worker = Worker.infinite Worker.queue Worker.t - -let uuid_seed = Random.get_state () - -let send_transactions_batch ~relay_endpoint transactions = - let open Lwt_result_syntax in - let module M = Map.Make (String) in - let module Srt = Rpc_encodings.Send_raw_transaction in - if Seq.is_empty transactions then return_unit - else - let rev_batch, callbacks = - Seq.fold_left - (fun (rev_batch, callbacks) {payload; callback} -> - let req_id = Uuidm.(v4_gen uuid_seed () |> to_string ~upper:false) in - let txn = - Rpc_encodings.JSONRPC. - { - method_ = Srt.method_; - parameters = - Some (Data_encoding.Json.construct Srt.input_encoding payload); - id = Some (Id_string req_id); - } - in - - (txn :: rev_batch, M.add req_id callback callbacks)) - ([], M.empty) - transactions - in - let batch = List.rev rev_batch in - - let*! () = Floodgate_events.injecting_transactions (List.length batch) in - - let* responses = - Rollup_services.call_service - ~keep_alive:true - ~base:relay_endpoint - ~timeout:Network_info.timeout - (Batch.dispatch_batch_service ~path:Resto.Path.root) - () - () - (Batch batch) - in - - let responses = - match responses with Singleton r -> [r] | Batch rs -> rs - in - - let* missed_callbacks = - List.fold_left_es - (fun callbacks (response : Rpc_encodings.JSONRPC.response) -> - match response with - | {id = Some (Id_string req); value} -> ( - match (value, M.find_opt req callbacks) with - | value, Some callback -> - let* () = - match value with - | Ok res -> - let hash = - Data_encoding.Json.destruct Srt.output_encoding res - in - Lwt_result.ok (callback (`Accepted hash)) - | Error error -> - let*! () = Floodgate_events.rpc_error error in - Lwt_result.ok (callback `Refused) - in - return (M.remove req callbacks) - | _ -> return callbacks) - | _ -> failwith "Inconsistent response from the server") - callbacks - responses - in - - assert (M.is_empty missed_callbacks) ; - return_unit - -module Handlers = struct - open Request - - type self = worker - - let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun self request -> - let open Lwt_result_syntax in - let state = Worker.state self in - match request with - | Inject {payload; callback} -> - let instrumented_callback reason = - (match reason with - | `Accepted hash -> - Pending_transactions.add state.pending hash callback - | _ -> ()) ; - callback reason - in - Queue.add {payload; callback = instrumented_callback} state.queue ; - return_unit - | Confirm {txn_hash} -> ( - match Pending_transactions.pop state.pending txn_hash with - | Some {callback; _} -> - Lwt.async (fun () -> callback `Confirmed) ; - return_unit - | None -> return_unit) - | Tick -> - let all_transactions = Queue.to_seq state.queue in - let* transactions_to_inject, remaining_transactions = - match state.max_transaction_batch_length with - | None -> return (all_transactions, Seq.empty) - | Some max_transaction_batch_length -> - let when_negative_length = - TzTrace.make - (Exn (Failure "Negative max_transaction_batch_length")) - in - let*? transactions_to_inject = - Seq.take - ~when_negative_length - max_transaction_batch_length - all_transactions - in - let*? remaining_transactions = - Seq.drop - ~when_negative_length - max_transaction_batch_length - all_transactions - in - return (transactions_to_inject, remaining_transactions) - in - state.queue <- Queue.of_seq remaining_transactions ; - - let+ () = - send_transactions_batch - ~relay_endpoint:state.relay_endpoint - transactions_to_inject - in - - let txns = - Pending_transactions.drop state.inclusion_timeout state.pending - in - List.iter - (fun {callback; _} -> Lwt.async (fun () -> callback `Dropped)) - txns - - type launch_error = tztrace - - let on_launch _self () - ({relay_endpoint; max_transaction_batch_length; inclusion_timeout} : - parameters) = - let open Lwt_result_syntax in - return - { - relay_endpoint; - queue = Queue.create (); - pending = Pending_transactions.empty (); - max_transaction_batch_length; - inclusion_timeout = - Ptime.Span.of_float_s inclusion_timeout - |> WithExceptions.Option.get ~loc:__LOC__; - } - - let on_error (type a b) _self _status_request (_r : (a, b) Request.t) - (_errs : b) : [`Continue | `Shutdown] tzresult Lwt.t = - Lwt_result_syntax.return `Continue - - let on_completion _ _ _ _ = Lwt.return_unit - - let on_no_request _ = Lwt.return_unit - - let on_close _ = Lwt.return_unit -end - -let table = Worker.create_table Queue - -type worker_promise = { - mutable promise : worker Lwt.t; - mutable resolver : worker Lwt.u; -} - -let worker_promise = - let promise, resolver = Lwt.task () in - {promise; resolver} - -type error += No_worker - -let worker () = - match Lwt.state worker_promise.promise with - | Lwt.Return worker -> Ok worker - | Lwt.Fail e -> Result_syntax.tzfail (error_of_exn e) - | Lwt.Sleep -> Result_syntax.tzfail No_worker - -let tick () = - let open Lwt_result_syntax in - let*? worker = worker () in - let*! (_was_pushed : bool) = Worker.Queue.push_request worker Tick in - return_unit - -let rec beacon ~tick_interval = - let open Lwt_result_syntax in - let* () = tick () in - let*! () = Lwt_unix.sleep tick_interval in - beacon ~tick_interval - -let inject ?(callback = fun _ -> Lwt_syntax.return_unit) txn = - let open Lwt_syntax in - let* worker = worker_promise.promise in - let* (_was_pushed : bool) = - Worker.Queue.push_request worker (Inject {payload = txn; callback}) - in - return_unit - -let confirm txn_hash = - let open Lwt_result_syntax in - let*? worker = worker () in - let*! (was_pushed : bool) = - Worker.Queue.push_request worker (Confirm {txn_hash}) - in - assert was_pushed ; - return_unit - -let start ~relay_endpoint ~max_transaction_batch_length - ?(inclusion_timeout = 2.) () = - let open Lwt_result_syntax in - let* worker = - Worker.launch - table - () - {relay_endpoint; max_transaction_batch_length; inclusion_timeout} - (module Handlers) - in - Lwt.wakeup worker_promise.resolver worker ; - let*! () = Floodgate_events.tx_queue_is_ready () in - return_unit - -let transfer ?(callback = fun _ -> Lwt.return_unit) ?to_ ?(value = Z.zero) - ?nonce ?data ~gas_limit ~infos ~from () = - let open Lwt_syntax in - let fees = Z.(gas_limit * infos.Network_info.base_fee_per_gas) in - let callback reason = - (match reason with - | `Confirmed -> - Account.debit from Z.(value + fees) ; - Account.increment_nonce from - | _ -> ()) ; - callback reason - in - let* txn = - Craft.transfer_exn ?nonce ~infos ~from ?to_ ~gas_limit ~value ?data () - in - inject ~callback txn - -let shutdown () = - let open Lwt_syntax in - let worker = worker () |> Result.to_option in - (* Prepare promise for next restart *) - let promise, resolver = Lwt.task () in - worker_promise.promise <- promise ; - worker_promise.resolver <- resolver ; - let* () = Option.iter_s Worker.shutdown worker in - return_unit - -module Misc = struct - let send_raw_transaction ~relay_endpoint txn = - let open Lwt_result_syntax in - let receipt, receipt_waker = Lwt.task () in - - let* () = - send_transactions_batch - ~relay_endpoint - List.( - to_seq - [ - { - payload = txn; - callback = - (function - | `Accepted hash -> - Lwt.return @@ Lwt.wakeup receipt_waker (Ok hash) - | `Confirmed -> - (* [send_transactions_batch] does not call [callback] with - [`Confirmed]. *) - assert false - | `Refused | `Dropped -> - Lwt.return - @@ Lwt.wakeup - receipt_waker - (Error [error_of_fmt "Could not send transaction"])); - }; - ]) - in - - receipt -end diff --git a/etherlink/bin_floodgate/lib_floodgate/tx_queue.mli b/etherlink/bin_floodgate/lib_floodgate/tx_queue.mli deleted file mode 100644 index 8b5e9358cd8f32e75e3f598d1f0c8dd0be18af48..0000000000000000000000000000000000000000 --- a/etherlink/bin_floodgate/lib_floodgate/tx_queue.mli +++ /dev/null @@ -1,87 +0,0 @@ -(*****************************************************************************) -(* *) -(* SPDX-License-Identifier: MIT *) -(* Copyright (c) 2025 Functori *) -(* Copyright (c) 2025 Nomadic Labs *) -(* *) -(*****************************************************************************) - -(** The Tx_queue is a worker allowing to batch raw transactions in a single - [eth_sendRawTransaction] at a regular interval. It provides a non-blocking - interface based on the use of callbacks. *) - -(** A [callback] is called by the [Tx_queue] at various stage of a submitted - transaction life. - - The next tick after its insertion in the queue, a transaction is submitted - to the relay node within a batch of [eth_sendRawTransaction] requests. - - {ul - {li Depending on the result of the RPC, its [callback] is called with - either [`Accepted hash] (where [hash] is the hash of the raw - transaction) or [`Refused]).} - {li As soon as the transaction appears in a blueprint, its callback is - called with [`Confirmed]. If this does not happen before 2s, the - [callback] is called with [`Dropped].}} *) -type callback = - [`Accepted of Ethereum_types.hash | `Confirmed | `Dropped | `Refused] -> - unit Lwt.t - -(** A [request] submitted to the [Tx_queue] consists in a payload (that is, a - raw transaction) and a {!callback} that will be used to advertise the - transaction life cycle. *) -type request = {payload : Ethereum_types.hex; callback : callback} - -(** [start ~relay_endpoint ~max_transaction_batch_length ()] starts - the worker, meaning it is possible to call {!inject}, {!confirm} - and {!beacon}. *) -val start : - relay_endpoint:Uri.t -> - max_transaction_batch_length:int option -> - ?inclusion_timeout:float -> - unit -> - unit tzresult Lwt.t - -(** [inject ?callback raw_txn] pushes the raw transaction [raw_txn] to the - worker queue. - - {b Note:} The promise will be sleeping until at least {!start} is called. *) -val inject : ?callback:callback -> Ethereum_types.hex -> unit Lwt.t - -(** [confirm hash] is to be called by an external component to advertise a - transaction has been included in a blueprint. *) -val confirm : Ethereum_types.hash -> unit tzresult Lwt.t - -(** [beacon ~tick_interval] is a never fulfilled promise which triggers a tick - in the [Tx_queue] every [tick_interval] seconds. *) -val beacon : tick_interval:float -> unit tzresult Lwt.t - -(** [transfer ?callback ~infos sender receiver value] inject a transaction - moving [value] native tokens from [sender] to [receiver]. The nonce and - balance of [receiver] (see {!Account.t}). Additionally, a [callback] can be - provided, and will be called according to the transaction’s lifecycle (see - {!callback}). *) -val transfer : - ?callback:callback -> - ?to_:Efunc_core.Eth.address -> - ?value:Ethereum_types.NonceMap.key -> - ?nonce:Ethereum_types.NonceMap.key -> - ?data:Efunc_core.Private.b -> - gas_limit:Ethereum_types.NonceMap.key -> - infos:Network_info.t -> - from:Account.t -> - unit -> - unit Lwt.t - -(** Stops the worker for the tx queue. *) -val shutdown : unit -> unit Lwt.t - -module Misc : sig - (** [send_raw_transaction ~relay_endpoint hex_raw_txn] sends [hex_raw_txn] to - [relay_endpoint] using the [eth_sendRawTransaction] method, without going - through the [Tx_queue]. *) - val send_raw_transaction : - relay_endpoint:Uri.t -> - Ethereum_types.hex -> - Ethereum_types.hash tzresult Lwt.t -end diff --git a/etherlink/bin_floodgate/main.ml b/etherlink/bin_floodgate/main.ml index 1b6bd8f64b5f9553e8f3a2c8a499ccd66a6732c4..015579ecd034b1a576b5626daf9296a3ffa438a8 100644 --- a/etherlink/bin_floodgate/main.ml +++ b/etherlink/bin_floodgate/main.ml @@ -137,6 +137,16 @@ module Arg = struct is not given there is no limit" Parameter.int + let max_lifespan_s = + let default = "2" in + Tezos_clic.default_arg + ~default + ~long:"max-lifespan" + ~placeholder:"N" + ~doc: + "Maximum lifespan of transactions before considered dropped, in second" + Parameter.int + let spawn_interval = let default = "0.5" in Tezos_clic.default_arg @@ -284,7 +294,7 @@ let run_command = command ~desc:"Start Floodgate to spam an EVM-compatible network" Arg.( - args16 + args17 verbose relay_endpoint rpc_endpoint @@ -292,6 +302,7 @@ let run_command = controller max_active_eoa max_transaction_batch_length + max_lifespan_s spawn_interval tick_interval base_fee_factor @@ -309,6 +320,7 @@ let run_command = controller, max_active_eoa, max_transaction_batch_length, + max_lifespan_s, spawn_interval, tick_interval, base_fee_factor, @@ -339,6 +351,7 @@ let run_command = ~controller ~max_active_eoa ~max_transaction_batch_length + ~max_lifespan_s ~spawn_interval ~tick_interval ~base_fee_factor diff --git a/etherlink/tezt/benchmarks/evm_node_capacity.ml b/etherlink/tezt/benchmarks/evm_node_capacity.ml index b6baff63b5ab79899d2259023f3d88966a73b3e0..1e5fdc60f537bbe3c01714ef71870657ebd0924f 100644 --- a/etherlink/tezt/benchmarks/evm_node_capacity.ml +++ b/etherlink/tezt/benchmarks/evm_node_capacity.ml @@ -13,6 +13,8 @@ open Floodgate_lib open Evm_node_lib_dev_encoding type env = { + container : + L2_types.evm_chain_family Evm_node_lib_dev.Services_backend_sig.tx_container; sequencer : Evm_node.t; rpc_node : Evm_node.t; infos : Network_info.t; @@ -21,9 +23,11 @@ type env = { nb_contracts : int; } -let deposit_one {infos; gas_limit; rpc_node; _} ?nonce value account contract = +let deposit_one {container; infos; gas_limit; rpc_node; _} ?nonce value account + contract = let* _ = call + ~container infos rpc_node contract @@ -132,14 +136,15 @@ let transfer_gas_limit {accounts; infos; rpc_node; _} contract = in return gas_limit -let account_step infos rpc_node ?gas_limit contract ~nonce (sender : Account.t) - value (dest : Account.t) = +let account_step {container; infos; rpc_node; gas_limit; _} contract ~nonce + (sender : Account.t) value (dest : Account.t) = let* _ = call + ~container infos rpc_node contract - ?gas_limit + ~gas_limit sender ~nonce ~name:"transfer" @@ -148,8 +153,8 @@ let account_step infos rpc_node ?gas_limit contract ~nonce (sender : Account.t) in unit -let sender_step {infos; rpc_node; gas_limit; accounts; _} erc20s iteration - sender_index = +let sender_step env erc20s iteration sender_index = + let accounts = env.accounts in let sender = accounts.(sender_index mod Array.length accounts) in let dest_index = (sender_index + (7 * (iteration + 1))) mod Array.length accounts @@ -160,9 +165,7 @@ let sender_step {infos; rpc_node; gas_limit; accounts; _} erc20s iteration (fun i contract -> let nonce = Z.add nonce (Z.of_int i) in account_step - infos - rpc_node - ~gas_limit + env contract ~nonce sender @@ -204,8 +207,12 @@ let test_erc20_capacity () = let*? infos = Network_info.fetch ~rpc_endpoint:endpoint ~base_fee_factor:100. in + let start_container, container = + Evm_node_lib_dev.Tx_queue.tx_container ~chain_family:EVM + in let env = { + container; infos; sequencer; rpc_node = sequencer; @@ -214,19 +221,33 @@ let test_erc20_capacity () = nb_contracts; } in + + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in + + let max_size = 999_999 in + let tx_per_addr_limit = Int64.of_int 999_999 in + let max_transaction_batch_length = Some 300 in + let max_lifespan_s = 2 in + let config : Evm_node_config.Configuration.tx_queue = + {max_size; max_transaction_batch_length; max_lifespan_s; tx_per_addr_limit} + in let*? () = - Tx_queue.start - ~relay_endpoint:endpoint - ~max_transaction_batch_length:(Some 300) - ~inclusion_timeout:parameters.timeout - () + start_container ~config ~keep_alive:true ~timeout:parameters.timeout () in + let* () = Floodgate_events.is_ready infos.chain_id infos.base_fee_per_gas in let follower = Floodgate.start_blueprint_follower ~relay_endpoint:endpoint ~rpc_endpoint:endpoint in - let tx_queue = Tx_queue.beacon ~tick_interval:0.1 in + let tx_queue = + Tx_container.tx_queue_beacon + ~evm_node_endpoint:(Rpc endpoint) + ~tick_interval:0.1 + in Log.report "Deploying %d ERC20 contracts" nb_contracts ; let* erc20s = deploy_contracts @@ -253,7 +274,7 @@ let test_erc20_capacity () = in Lwt.cancel follower ; Lwt.cancel tx_queue ; - let* () = Tx_queue.shutdown () in + let*? () = Tx_container.shutdown () in let* () = Evm_node.terminate sequencer in stop_profile () diff --git a/etherlink/tezt/benchmarks/lib/benchmark_utils.ml b/etherlink/tezt/benchmarks/lib/benchmark_utils.ml index 3bedcfb751c441c782ad82cf9f9e53f3846b87ae..76b922bddd2c4044f3f6d314d391faede2bedf68 100644 --- a/etherlink/tezt/benchmarks/lib/benchmark_utils.ml +++ b/etherlink/tezt/benchmarks/lib/benchmark_utils.ml @@ -317,9 +317,11 @@ let rec pp_evm_value fmt (v : Efunc_core.Types.evm_value) = pp_evm_value) l -let call infos rpc_node contract sender ?gas_limit ?nonce ?(value = Z.zero) - ?name ?(check_success = false) abi params = +let call ~container infos rpc_node contract sender ?gas_limit ?nonce + ?(value = Z.zero) ?name ?(check_success = false) abi params = let open Evm_node_lib_dev_encoding.Ethereum_types in + let open Tezos_error_monad.Error_monad in + let open Lwt_result_syntax in let pp_tx fmt () = Format.fprintf fmt @@ -346,7 +348,7 @@ let call infos rpc_node contract sender ?gas_limit ?nonce ?(value = Z.zero) | Some g -> return g | None -> Log.debug " - Estimate gas limit" ; - let*? g = + let* g = estimate_gas infos rpc_node @@ -358,49 +360,72 @@ let call infos rpc_node contract sender ?gas_limit ?nonce ?(value = Z.zero) Log.debug " - Gas limit: %a" Z.pp_print g ; return g in - let tx_hash = ref (Hash (Hex "")) in - let* () = - Tx_queue.transfer - ~gas_limit + + let* raw_tx, transaction_object = + Craft.transfer_with_obj_exn + ?nonce ~infos + ~from:sender ~to_:(Efunc_core.Private.a contract) - ?data + ~gas_limit ~value - ~from:sender - ?nonce + ?data () - ~callback:(function - | `Accepted h -> - tx_hash := h ; - unit - | (`Refused | `Dropped | `Confirmed) as status -> - let c = - match status with - | `Refused -> nb_refused - | `Dropped -> nb_dropped - | `Confirmed -> nb_confirmed - in - incr c ; - Lwt.wakeup waker status ; - if check_success then - Lwt.async (fun () -> - let*? receipt = - Floodgate.get_transaction_receipt - (Evm_node.endpoint rpc_node |> Uri.of_string) - !tx_hash - in - let tx_status = Qty.to_z receipt.status in - if Z.(equal tx_status one) then unit - else - let (Hash (Hex h)) = !tx_hash in - Test.fail - "Transaction %s was included as failed:\n%a" - h - pp_tx - ()) ; - unit) in - confirmed + let tx_hash = Evm_node_lib_dev.Transaction_object.hash transaction_object in + let fees = Z.(gas_limit * infos.Network_info.base_fee_per_gas) in + let callback status = + match status with + | `Accepted -> unit + | (`Refused | `Dropped | `Confirmed) as status -> + let c = + match status with + | `Refused -> nb_refused + | `Dropped -> nb_dropped + | `Confirmed -> + Account.debit sender Z.(value + fees) ; + Account.increment_nonce sender ; + nb_confirmed + in + incr c ; + Lwt.wakeup waker status ; + if check_success then + Lwt.async (fun () -> + let open Lwt.Syntax in + let* res = + Floodgate.get_transaction_receipt + (Evm_node.endpoint rpc_node |> Uri.of_string) + tx_hash + in + match res with + | Ok receipt -> + let tx_status = Qty.to_z receipt.status in + if Z.(equal tx_status Z.one) then Lwt.return_unit + else + let (Hash (Hex h)) = tx_hash in + Test.fail + "Transaction %s was included as failed:\n%a" + h + pp_tx + () + | Error trace -> + Test.fail "Error fetching receipt: %a" pp_print_trace trace) ; + unit + in + let next_nonce = quantity_of_z sender.nonce in + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in + let* add_res = + Tx_container.add ~callback ~next_nonce transaction_object ~raw_tx + in + let* () = + match add_res with + | Ok (_ : hash) -> return_unit + | Error e -> Test.fail "Error adding to the tx_queue: %s" e confirmed + in + return confirmed type gasometer = {mutable gas : Z.t; mutable time : Ptime.Span.t} diff --git a/etherlink/tezt/benchmarks/lib/uniswap.ml b/etherlink/tezt/benchmarks/lib/uniswap.ml index e891725636e58db26b6fcd1f1116f0b9c46b9f62..f05a480ea64013ecd70fb58607a6bc4799d73c2a 100644 --- a/etherlink/tezt/benchmarks/lib/uniswap.ml +++ b/etherlink/tezt/benchmarks/lib/uniswap.ml @@ -11,6 +11,8 @@ open Benchmark_utils open Floodgate_lib type env = { + container : + L2_types.evm_chain_family Evm_node_lib_dev.Services_backend_sig.tx_container; sequencer : Evm_node.t; rpc_node : Evm_node.t; infos : Network_info.t; @@ -60,10 +62,11 @@ let deposit_wxtz_gas_limit {accounts; infos; rpc_node; wxtz_addr; _} = Log.debug "Deposit gas limit: %a@." Z.pp_print gas_limit ; return gas_limit -let deposit_wxtz ~gas_limit {infos; rpc_node; wxtz_addr; _} ?nonce value account - = +let deposit_wxtz ~gas_limit {container; infos; rpc_node; wxtz_addr; _} ?nonce + value account = let* _ = call + ~container infos rpc_node wxtz_addr @@ -155,8 +158,8 @@ let check_wxtz_deposits env = Log.report "Deposited tokens to WXTZ contract" ; unit -let add_xtz_liquidity {sequencer; rpc_node; infos; router_addr; _} ~sender - ~token_addr ~xtz ~token = +let add_xtz_liquidity {container; sequencer; rpc_node; infos; router_addr; _} + ~sender ~token_addr ~xtz ~token = let name = "addLiquidityETH" in let params_ty = [`address; `uint 256; `uint 256; `uint 256; `address; `uint 256] @@ -176,6 +179,7 @@ let add_xtz_liquidity {sequencer; rpc_node; infos; router_addr; _} ~sender wait_for_application sequencer @@ fun () -> let* _ = call + ~container infos rpc_node router_addr @@ -196,8 +200,8 @@ let add_xtz_liquidities env ~sender = add_xtz_liquidity env ~sender ~token_addr ~xtz:1000 ~token) env.gld_tokens -let add_liquidity {sequencer; rpc_node; infos; router_addr; _} ~sender ~gld - ~gld2 = +let add_liquidity {container; sequencer; rpc_node; infos; router_addr; _} + ~sender ~gld ~gld2 = let name = "addLiquidity" in let params_ty = [ @@ -228,6 +232,7 @@ let add_liquidity {sequencer; rpc_node; infos; router_addr; _} ~sender ~gld wait_for_application sequencer @@ fun () -> let* _ = call + ~container infos rpc_node router_addr @@ -258,14 +263,15 @@ let add_token_liquidities env ~sender = add_liquidity env ~sender ~gld:(p1, 100_000) ~gld2:(p2, 200_000)) (pairs env) -let approve_router {sequencer; rpc_node; infos; router_addr; _} ~sender - ~token_addr = +let approve_router {container; sequencer; rpc_node; infos; router_addr; _} + ~sender ~token_addr = let name = "approve" in let params_ty = [`address; `uint 256] in let params = [`address (address router_addr); `int max_uint256] in wait_for_application sequencer @@ fun () -> let* _ = call + ~container infos rpc_node token_addr @@ -308,6 +314,7 @@ let swap_xtz ~nb_hops env iteration sender_index = in let* _ = call + ~container:env.container env.infos env.rpc_node env.router_addr @@ -331,15 +338,24 @@ let step ({sequencer; accounts; nb_hops; _} as env) iteration = in wait_for_application sequencer step_f -let create_pair ?nonce {sequencer; rpc_node; infos; factory_addr; _} ~sender - ((n, gld_addr), (n2, gld2_addr)) = +let create_pair ?nonce {container; sequencer; rpc_node; infos; factory_addr; _} + ~sender ((n, gld_addr), (n2, gld2_addr)) = Log.info " - Create pair %s/%s" n n2 ; let name = "createPair" in let params_ty = [`address; `address] in let params = [`address (address gld_addr); `address (address gld2_addr)] in wait_for_application sequencer @@ fun () -> let* _ = - call infos rpc_node factory_addr sender ?nonce ~name params_ty params + call + ~container + infos + rpc_node + factory_addr + sender + ?nonce + ~name + params_ty + params in unit @@ -376,19 +392,34 @@ let setup ~accounts ~nb_tokens ~nb_hops ~sequencer ~rpc_node = let*? infos = Network_info.fetch ~rpc_endpoint:endpoint ~base_fee_factor:1000. in + let start_container, container = + Evm_node_lib_dev.Tx_queue.tx_container ~chain_family:EVM + in + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in + let max_size = 999_999 in + let tx_per_addr_limit = Int64.of_int 999_999 in + let max_transaction_batch_length = Some 300 in + let max_lifespan_s = 2 in + let config : Evm_node_config.Configuration.tx_queue = + {max_size; max_transaction_batch_length; max_lifespan_s; tx_per_addr_limit} + in let*? () = - Tx_queue.start - ~relay_endpoint:endpoint - ~max_transaction_batch_length:(Some 300) - ~inclusion_timeout:parameters.timeout - () + start_container ~config ~keep_alive:true ~timeout:parameters.timeout () in + let* () = Floodgate_events.is_ready infos.chain_id infos.base_fee_per_gas in let follower = Floodgate.start_blueprint_follower ~relay_endpoint:endpoint ~rpc_endpoint:endpoint in - let tx_queue = Tx_queue.beacon ~tick_interval:0.25 in + let tx_queue = + Tx_container.tx_queue_beacon + ~evm_node_endpoint:(Rpc endpoint) + ~tick_interval:0.25 + in Log.report "Deploying WXTZ" ; let* wxtz_addr = deploy_contract ~rpc_node infos ~sequencer sender `ERC20 in Log.info " WXTZ: %s" wxtz_addr ; @@ -422,6 +453,7 @@ let setup ~accounts ~nb_tokens ~nb_hops ~sequencer ~rpc_node = let env = { + container; sequencer; rpc_node; infos; @@ -453,7 +485,7 @@ let setup ~accounts ~nb_tokens ~nb_hops ~sequencer ~rpc_node = let shutdown () = Lwt.cancel follower ; Lwt.cancel tx_queue ; - let* () = Tx_queue.shutdown () in + let*? () = Tx_container.shutdown () in let* () = Evm_node.terminate sequencer in unit in diff --git a/etherlink/tezt/benchmarks/snailtracer.ml b/etherlink/tezt/benchmarks/snailtracer.ml index 92b2f27fc8e8d9e8fa84efd286799b44021cf39e..c2f3522e4abf4c2f365094b7622c4543e25504c5 100644 --- a/etherlink/tezt/benchmarks/snailtracer.ml +++ b/etherlink/tezt/benchmarks/snailtracer.ml @@ -14,6 +14,8 @@ open Benchmark_utils open Floodgate_lib type env = { + container : + L2_types.evm_chain_family Evm_node_lib_dev.Services_backend_sig.tx_container; sequencer : Evm_node.t; rpc_node : Evm_node.t; infos : Network_info.t; @@ -181,9 +183,10 @@ let ray_trace_scanlines ({width; height; _} as env) sender = speedup ; unit -let call_one {infos; rpc_node; gas_limit; contract; spp; _} sender = +let call_one {container; infos; rpc_node; gas_limit; contract; spp; _} sender = let* _ = call + ~container infos rpc_node contract @@ -240,19 +243,34 @@ let test_snailtracer () = let*? infos = Network_info.fetch ~rpc_endpoint:endpoint ~base_fee_factor:1000. in + let start_container, container = + Evm_node_lib_dev.Tx_queue.tx_container ~chain_family:EVM + in + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in + let max_size = 999_999 in + let tx_per_addr_limit = Int64.of_int 999_999 in + let max_transaction_batch_length = Some 300 in + let max_lifespan_s = 2 in + let config : Evm_node_config.Configuration.tx_queue = + {max_size; max_transaction_batch_length; max_lifespan_s; tx_per_addr_limit} + in let*? () = - Tx_queue.start - ~relay_endpoint:endpoint - ~max_transaction_batch_length:(Some 300) - ~inclusion_timeout:parameters.timeout - () + start_container ~config ~keep_alive:true ~timeout:parameters.timeout () in + let* () = Floodgate_events.is_ready infos.chain_id infos.base_fee_per_gas in let follower = Floodgate.start_blueprint_follower ~relay_endpoint:endpoint ~rpc_endpoint:endpoint in - let tx_queue = Tx_queue.beacon ~tick_interval:0.5 in + let tx_queue = + Tx_container.tx_queue_beacon + ~evm_node_endpoint:(Rpc endpoint) + ~tick_interval:0.5 + in Log.report "Deploying SnailTracer contract" ; let bin = Contracts.Snailtracer.bin () in let bin = bin ^ encode_parameters width height in @@ -264,6 +282,7 @@ let test_snailtracer () = Log.info "Will use gas limit %a" Z.pp_print gas_limit ; let env = { + container; sequencer; rpc_node; infos; @@ -282,7 +301,7 @@ let test_snailtracer () = let* () = Lwt_list.iter_s (step env) (List.init parameters.iterations succ) in Lwt.cancel follower ; Lwt.cancel tx_queue ; - let* () = Tx_queue.shutdown () in + let*? () = Tx_container.shutdown () in let* () = Evm_node.terminate sequencer in stop_profile () @@ -325,19 +344,34 @@ let test_full_image_raytracing () = let*? infos = Network_info.fetch ~rpc_endpoint:endpoint ~base_fee_factor:1000. in + let start_container, container = + Evm_node_lib_dev.Tx_queue.tx_container ~chain_family:EVM + in + let (Evm_node_lib_dev.Services_backend_sig.Evm_tx_container + (module Tx_container)) = + container + in + let max_size = 999_999 in + let tx_per_addr_limit = Int64.of_int 999_999 in + let max_transaction_batch_length = Some 300 in + let max_lifespan_s = 2 in + let config : Evm_node_config.Configuration.tx_queue = + {max_size; max_transaction_batch_length; max_lifespan_s; tx_per_addr_limit} + in let*? () = - Tx_queue.start - ~relay_endpoint:endpoint - ~max_transaction_batch_length:(Some 300) - ~inclusion_timeout:parameters.timeout - () + start_container ~config ~keep_alive:true ~timeout:parameters.timeout () in + let* () = Floodgate_events.is_ready infos.chain_id infos.base_fee_per_gas in let follower = Floodgate.start_blueprint_follower ~relay_endpoint:endpoint ~rpc_endpoint:endpoint in - let tx_queue = Tx_queue.beacon ~tick_interval:0.5 in + let tx_queue = + Tx_container.tx_queue_beacon + ~evm_node_endpoint:(Rpc endpoint) + ~tick_interval:0.5 + in Log.report "Deploying SnailTracer contract" ; let bin = Contracts.Snailtracer.bin () in let bin = bin ^ encode_parameters width height in @@ -346,9 +380,10 @@ let test_full_image_raytracing () = in Lwt.cancel follower ; Lwt.cancel tx_queue ; - let* () = Tx_queue.shutdown () in + let*? () = Tx_container.shutdown () in let env = { + container; sequencer; rpc_node; infos;