From baae83b41c3fa9b37b71c606bc9262f76ec65639 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Sun, 18 May 2025 01:53:44 +0200 Subject: [PATCH 1/6] Etherlink/Node: introduce L2_transaction This commit introduces a `L2_transaction` module interface which abstracts the three types manipulated by the Tx queue (transaction_object, legacy_transaction_object, and address) and instantiates this module with the Etherlink types. --- etherlink/bin_node/lib_dev/tx_queue_types.ml | 67 +++++++++++++++++++ etherlink/bin_node/lib_dev/tx_queue_types.mli | 40 +++++++++++ 2 files changed, 107 insertions(+) create mode 100644 etherlink/bin_node/lib_dev/tx_queue_types.ml create mode 100644 etherlink/bin_node/lib_dev/tx_queue_types.mli diff --git a/etherlink/bin_node/lib_dev/tx_queue_types.ml b/etherlink/bin_node/lib_dev/tx_queue_types.ml new file mode 100644 index 000000000000..bc3d418a8e30 --- /dev/null +++ b/etherlink/bin_node/lib_dev/tx_queue_types.ml @@ -0,0 +1,67 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +module type L2_transaction = sig + type t + + type legacy + + type address + + val address_encoding : address Data_encoding.t + + val hash_of_tx_object : legacy -> Ethereum_types.hash + + val address_to_string : address -> string + + val from_address_of_tx_object : legacy -> address + + val nonce_of_tx_object : legacy -> Ethereum_types.quantity + + val transaction_object_from_legacy : legacy -> t + + module AddressMap : Map.S with type key = address + + val make_txpool : + pending:legacy Ethereum_types.NonceMap.t AddressMap.t -> + queued:legacy Ethereum_types.NonceMap.t AddressMap.t -> + Ethereum_types.txpool +end + +module Eth_transaction_object : + L2_transaction + with type t = Transaction_object.t + and type legacy = Ethereum_types.legacy_transaction_object + and type address = Ethereum_types.address + and module AddressMap = Ethereum_types.AddressMap = struct + open Ethereum_types + + type t = Transaction_object.t + + type legacy = legacy_transaction_object + + type nonrec address = address + + let address_encoding = address_encoding + + let hash_of_tx_object (tx_object : legacy_transaction_object) = tx_object.hash + + let address_to_string (Address (Hex s)) = s + + let from_address_of_tx_object (tx_object : legacy_transaction_object) = + tx_object.from + + let nonce_of_tx_object (tx_object : legacy_transaction_object) = + tx_object.nonce + + let transaction_object_from_legacy = + Transaction_object.from_store_transaction_object + + module AddressMap = AddressMap + + let make_txpool ~pending ~queued = {pending; queued} +end diff --git a/etherlink/bin_node/lib_dev/tx_queue_types.mli b/etherlink/bin_node/lib_dev/tx_queue_types.mli new file mode 100644 index 000000000000..d00734a2e4d2 --- /dev/null +++ b/etherlink/bin_node/lib_dev/tx_queue_types.mli @@ -0,0 +1,40 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +module type L2_transaction = sig + type t + + type legacy + + type address + + val address_encoding : address Data_encoding.t + + val hash_of_tx_object : legacy -> Ethereum_types.hash + + val address_to_string : address -> string + + val from_address_of_tx_object : legacy -> address + + val nonce_of_tx_object : legacy -> Ethereum_types.quantity + + val transaction_object_from_legacy : legacy -> t + + module AddressMap : Map.S with type key = address + + val make_txpool : + pending:legacy Ethereum_types.NonceMap.t AddressMap.t -> + queued:legacy Ethereum_types.NonceMap.t AddressMap.t -> + Ethereum_types.txpool +end + +module Eth_transaction_object : + L2_transaction + with type t = Transaction_object.t + and type legacy = Ethereum_types.legacy_transaction_object + and type address = Ethereum_types.address + and module AddressMap = Ethereum_types.AddressMap -- GitLab From 2cc1fee29a6012ee2e5d206de706e69d4dfb3fed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Tue, 17 Jun 2025 00:56:11 +0200 Subject: [PATCH 2/6] Etherlink/Node/Tx_queue: move Internal_for_tests down Co-authored-by: Thomas Letan --- etherlink/bin_node/lib_dev/tx_queue.ml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index c7f567c644e8..884b531f6fda 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -998,11 +998,6 @@ module Handlers = struct let on_close _ = Lwt.return_unit end -module Internal_for_tests = struct - module Nonce_bitset = Nonce_bitset - module Address_nonce = Address_nonce -end - module Tx_container = struct let table = Worker.create_table Queue @@ -1162,3 +1157,8 @@ end let start = Tx_container.start let tx_container = Services_backend_sig.Evm_tx_container (module Tx_container) + +module Internal_for_tests = struct + module Nonce_bitset = Nonce_bitset + module Address_nonce = Address_nonce +end -- GitLab From 62512466b1dfb9af1f2d4ad4930a44a58276c2c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Tue, 17 Jun 2025 00:59:35 +0200 Subject: [PATCH 3/6] Etherlink/Node/Tx_queue: move opening of Tx_container module up Co-authored-by: Thomas Letan --- etherlink/bin_node/lib_dev/tx_queue.ml | 1450 ++++++++++++------------ 1 file changed, 731 insertions(+), 719 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 884b531f6fda..dcb145f429db 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -256,749 +256,761 @@ module Address_nonce = struct | None -> return next_nonce end -module Transaction_objects = struct - open Ethereum_types - module S = String.Hashtbl - - type t = Ethereum_types.legacy_transaction_object S.t - - let empty ~start_size = S.create start_size - - let add htbl - (({hash = Hash (Hex hash); _} : Ethereum_types.legacy_transaction_object) - as tx_object) = - (* Here we uses `add` and not `replace`. If a transaction is - submitted multiple times then we register it each time in the - hashtable. Meaning that for `find` to returns None, we must - call `remove` as many times as the transaction was added. *) - S.add htbl hash tx_object - - let find htbl (Hash (Hex hash)) = S.find htbl hash - - let remove htbl (Hash (Hex hash)) = S.remove htbl hash -end - -module Pending_transactions = struct - open Ethereum_types - module S = String.Hashtbl - - type t = pending_request S.t - - let empty ~start_size = S.create start_size - - let add htbl (Hash (Hex hash)) pending_callback = - S.replace - htbl - hash - ({pending_callback; since = Time.System.now ()} : pending_request) - - let pop htbl (Hash (Hex hash)) = - match S.find htbl hash with - | Some pending -> - S.remove htbl hash ; - Some pending - | None -> None - - let drop ~max_lifespan htbl = - let now = Time.System.now () in - let dropped = ref [] in - S.filter_map_inplace - (fun _hash pending -> - let lifespan = Ptime.diff now pending.since in - if Ptime.Span.compare lifespan max_lifespan > 0 then ( - dropped := pending :: !dropped ; - None) - else Some pending) - htbl ; - !dropped - - let to_seq = S.to_seq_values - - let clear = S.clear -end - -module Transactions_per_addr = struct - module S = String.Hashtbl - - type t = int64 S.t - - let empty ~start_size = S.create start_size - - let remove s (Ethereum_types.Address (Hex h)) = S.remove s h - - let find s (Ethereum_types.Address (Hex h)) = S.find s h - - let add s (Ethereum_types.Address (Hex h)) i = S.replace s h i - - let decrement s address = - let current = find s address in - match current with - | Some i when i <= 1L -> remove s address - | Some i -> add s address (Int64.pred i) - | None -> () - - let increment s address = - let current = find s address in - match current with - | Some i -> add s address (Int64.succ i) - | None -> add s address 1L -end - -type state = { - mutable queue : queue_request Queue.t; - pending : Pending_transactions.t; - tx_object : Transaction_objects.t; - tx_per_address : Transactions_per_addr.t; - address_nonce : Address_nonce.t; - config : Configuration.tx_queue; - keep_alive : bool; - mutable locked : bool; -} - -module Types = struct - type nonrec state = state - - type nonrec parameters = parameters -end - -module Name = struct - type t = unit - - let encoding = Data_encoding.unit - - let base = ["evm_node_worker"; "tx_queue"] - - let pp _fmt () = () - - let equal () () = true -end - -module Request = struct - type ('a, 'b) t = - | Inject : request -> ((unit, string) result, tztrace) t - | Find : { - txn_hash : Ethereum_types.hash; - } - -> (Ethereum_types.legacy_transaction_object option, tztrace) t - | Nonce : { - next_nonce : Ethereum_types.quantity; - address : Ethereum_types.address; - } - -> (Ethereum_types.quantity, tztrace) t - | Tick : {evm_node_endpoint : endpoint} -> (unit, tztrace) t - | Clear : (unit, tztrace) t - | Lock_transactions : (unit, tztrace) t - | Unlock_transactions : (unit, tztrace) t - | Is_locked : (bool, tztrace) t - | Content : (Ethereum_types.txpool, tztrace) t - | Pop_transactions : { - validation_state : 'a; - validate_tx : - 'a -> - string -> - Ethereum_types.legacy_transaction_object -> - [`Keep of 'a | `Drop | `Stop] tzresult Lwt.t; - } - -> ((string * Ethereum_types.legacy_transaction_object) list, tztrace) t - | Confirm_transactions : { - confirmed_txs : Ethereum_types.hash Seq.t; - clear_pending_queue_after : bool; - } - -> (unit, tztrace) t - - let name (type a b) (t : (a, b) t) = - match t with - | Inject _ -> "Inject" - | Find _ -> "Find" - | Nonce _ -> "Nonce" - | Tick _ -> "Tick" - | Clear -> "Clear" - | Lock_transactions -> "Lock_transactions" - | Unlock_transactions -> "Unlock_transactions" - | Is_locked -> "Is_locked" - | Content -> "Content" - | Pop_transactions _ -> "Pop_transactions" - | Confirm_transactions _ -> "Confirm_transactions" - - type view = View : _ t -> view - - let view req = View req - - let endpoint_encoding = - let open Data_encoding in - conv - (function Rpc uri -> Uri.to_string uri | Websocket _ -> "[websocket]") - (fun _ -> assert false) - string - - let encoding = - let open Data_encoding in - (* This encoding is used to encode only *) - union - [ - case - Json_only - ~title:"Inject" - (obj2 - (req "request" (constant "inject")) - (req "payload" Ethereum_types.hex_encoding)) - (function - | View (Inject {payload; _}) -> Some ((), payload) | _ -> None) - (fun _ -> assert false); - case - Json_only - ~title:"Tick" - (obj2 - (req "request" (constant "tick")) - (req "evm_node_endpoint" endpoint_encoding)) - (function - | View (Tick {evm_node_endpoint}) -> Some ((), evm_node_endpoint) - | _ -> None) - (fun _ -> assert false); - case - Json_only - ~title:"Find" - (obj2 - (req "request" (constant "find")) - (req "transaction_hash" Ethereum_types.hash_encoding)) - (function View (Find {txn_hash}) -> Some ((), txn_hash) | _ -> None) - (fun _ -> assert false); - case - Json_only - ~title:"Clear" - (obj1 (req "request" (constant "clear"))) - (function View Clear -> Some () | _ -> None) - (fun _ -> assert false); - case - Json_only - ~title:"Nonce" - (obj3 - (req "request" (constant "nonce")) - (req "next_nonce" Ethereum_types.quantity_encoding) - (req "address" Ethereum_types.address_encoding)) - (function - | View (Nonce {next_nonce; address}) -> - Some ((), next_nonce, address) - | _ -> None) - (fun _ -> assert false); - case - Json_only - ~title:"Lock_transactions" - (obj1 (req "request" (constant "lock_transactions"))) - (function View Lock_transactions -> Some () | _ -> None) - (fun _ -> assert false); - case - Json_only - ~title:"Unlock_transactions" - (obj1 (req "request" (constant "unlock_transactions"))) - (function View Unlock_transactions -> Some () | _ -> None) - (fun _ -> assert false); - case - Json_only - ~title:"Is_locked" - (obj1 (req "request" (constant "is_locked"))) - (function View Is_locked -> Some () | _ -> None) - (fun _ -> assert false); - case - Json_only - ~title:"Content" - (obj1 (req "request" (constant "content"))) - (function View Content -> Some () | _ -> None) - (fun _ -> assert false); - case - Json_only - ~title:"Pop_transactions" - (obj1 (req "request" (constant "pop_transactions"))) - (function - | View (Pop_transactions {validation_state = _; validate_tx = _}) -> - Some () - | _ -> None) - (fun _ -> assert false); - case - Json_only - ~title:"Confirm_transactions" - (obj3 - (req "request" (constant "confirm_transactions")) - (req "confirmed_txs" (list Ethereum_types.hash_encoding)) - (req "clear_pending_queue_after" bool)) - (function - | View - (Confirm_transactions - {confirmed_txs; clear_pending_queue_after}) -> - Some ((), List.of_seq confirmed_txs, clear_pending_queue_after) - | _ -> None) - (fun _ -> assert false); - ] - - let pp fmt (View r) = - let open Format in - match r with - | Inject {payload = Hex txn; _} -> fprintf fmt "Inject %s" txn - | Find {txn_hash = Hash (Hex txn_hash)} -> fprintf fmt "Find %s" txn_hash - | Tick _ -> fprintf fmt "Tick" - | Clear -> fprintf fmt "Clear" - | Nonce {next_nonce = _; address = Address (Hex address)} -> - fprintf fmt "Nonce %s" address - | Lock_transactions -> Format.fprintf fmt "Locking the transactions" - | Unlock_transactions -> Format.fprintf fmt "Unlocking the transactions" - | Is_locked -> Format.fprintf fmt "Checking if the tx queue is locked" - | Content -> fprintf fmt "Content" - | Pop_transactions {validation_state = _; validate_tx = _} -> - fprintf fmt "Popping transactions with validation function" - | Confirm_transactions _ -> fprintf fmt "Confirming transactions" -end - -module Worker = Octez_telemetry.Worker.MakeSingle (Name) (Request) (Types) - -type worker = Worker.infinite Worker.queue Worker.t +module Tx_container = struct + module Transaction_objects = struct + open Ethereum_types + module S = String.Hashtbl + + type t = Ethereum_types.legacy_transaction_object S.t + + let empty ~start_size = S.create start_size + + let add htbl + (({hash = Hash (Hex hash); _} : + Ethereum_types.legacy_transaction_object) as tx_object) = + (* Here we uses `add` and not `replace`. If a transaction is + submitted multiple times then we register it each time in the + hashtable. Meaning that for `find` to returns None, we must + call `remove` as many times as the transaction was added. *) + S.add htbl hash tx_object + + let find htbl (Hash (Hex hash)) = S.find htbl hash + + let remove htbl (Hash (Hex hash)) = S.remove htbl hash + end + + module Pending_transactions = struct + open Ethereum_types + module S = String.Hashtbl + + type t = pending_request S.t + + let empty ~start_size = S.create start_size + + let add htbl (Hash (Hex hash)) pending_callback = + S.replace + htbl + hash + ({pending_callback; since = Time.System.now ()} : pending_request) + + let pop htbl (Hash (Hex hash)) = + match S.find htbl hash with + | Some pending -> + S.remove htbl hash ; + Some pending + | None -> None + + let drop ~max_lifespan htbl = + let now = Time.System.now () in + let dropped = ref [] in + S.filter_map_inplace + (fun _hash pending -> + let lifespan = Ptime.diff now pending.since in + if Ptime.Span.compare lifespan max_lifespan > 0 then ( + dropped := pending :: !dropped ; + None) + else Some pending) + htbl ; + !dropped + + let to_seq = S.to_seq_values + + let clear = S.clear + end + + module Transactions_per_addr = struct + module S = String.Hashtbl + + type t = int64 S.t + + let empty ~start_size = S.create start_size + + let remove s (Ethereum_types.Address (Hex h)) = S.remove s h + + let find s (Ethereum_types.Address (Hex h)) = S.find s h + + let add s (Ethereum_types.Address (Hex h)) i = S.replace s h i + + let decrement s address = + let current = find s address in + match current with + | Some i when i <= 1L -> remove s address + | Some i -> add s address (Int64.pred i) + | None -> () + + let increment s address = + let current = find s address in + match current with + | Some i -> add s address (Int64.succ i) + | None -> add s address 1L + end + + type state = { + mutable queue : queue_request Queue.t; + pending : Pending_transactions.t; + tx_object : Transaction_objects.t; + tx_per_address : Transactions_per_addr.t; + address_nonce : Address_nonce.t; + config : Configuration.tx_queue; + keep_alive : bool; + mutable locked : bool; + } -let tx_queue_event ?attrs name = - Opentelemetry_lwt.Event.make - ?attrs - Format.(sprintf "%s/%s" (String.concat "." Name.base) name) + module Types = struct + type nonrec state = state + + type nonrec parameters = parameters + end + + module Name = struct + type t = unit + + let encoding = Data_encoding.unit + + let base = ["evm_node_worker"; "tx_queue"] + + let pp _fmt () = () + + let equal () () = true + end + + module Request = struct + type ('a, 'b) t = + | Inject : request -> ((unit, string) result, tztrace) t + | Find : { + txn_hash : Ethereum_types.hash; + } + -> (Ethereum_types.legacy_transaction_object option, tztrace) t + | Nonce : { + next_nonce : Ethereum_types.quantity; + address : Ethereum_types.address; + } + -> (Ethereum_types.quantity, tztrace) t + | Tick : {evm_node_endpoint : endpoint} -> (unit, tztrace) t + | Clear : (unit, tztrace) t + | Lock_transactions : (unit, tztrace) t + | Unlock_transactions : (unit, tztrace) t + | Is_locked : (bool, tztrace) t + | Content : (Ethereum_types.txpool, tztrace) t + | Pop_transactions : { + validation_state : 'a; + validate_tx : + 'a -> + string -> + Ethereum_types.legacy_transaction_object -> + [`Keep of 'a | `Drop | `Stop] tzresult Lwt.t; + } + -> ( (string * Ethereum_types.legacy_transaction_object) list, + tztrace ) + t + | Confirm_transactions : { + confirmed_txs : Ethereum_types.hash Seq.t; + clear_pending_queue_after : bool; + } + -> (unit, tztrace) t + + let name (type a b) (t : (a, b) t) = + match t with + | Inject _ -> "Inject" + | Find _ -> "Find" + | Nonce _ -> "Nonce" + | Tick _ -> "Tick" + | Clear -> "Clear" + | Lock_transactions -> "Lock_transactions" + | Unlock_transactions -> "Unlock_transactions" + | Is_locked -> "Is_locked" + | Content -> "Content" + | Pop_transactions _ -> "Pop_transactions" + | Confirm_transactions _ -> "Confirm_transactions" + + type view = View : _ t -> view + + let view req = View req + + let endpoint_encoding = + let open Data_encoding in + conv + (function Rpc uri -> Uri.to_string uri | Websocket _ -> "[websocket]") + (fun _ -> assert false) + string + + let encoding = + let open Data_encoding in + (* This encoding is used to encode only *) + union + [ + case + Json_only + ~title:"Inject" + (obj2 + (req "request" (constant "inject")) + (req "payload" Ethereum_types.hex_encoding)) + (function + | View (Inject {payload; _}) -> Some ((), payload) | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Tick" + (obj2 + (req "request" (constant "tick")) + (req "evm_node_endpoint" endpoint_encoding)) + (function + | View (Tick {evm_node_endpoint}) -> Some ((), evm_node_endpoint) + | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Find" + (obj2 + (req "request" (constant "find")) + (req "transaction_hash" Ethereum_types.hash_encoding)) + (function + | View (Find {txn_hash}) -> Some ((), txn_hash) | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Clear" + (obj1 (req "request" (constant "clear"))) + (function View Clear -> Some () | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Nonce" + (obj3 + (req "request" (constant "nonce")) + (req "next_nonce" Ethereum_types.quantity_encoding) + (req "address" Ethereum_types.address_encoding)) + (function + | View (Nonce {next_nonce; address}) -> + Some ((), next_nonce, address) + | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Lock_transactions" + (obj1 (req "request" (constant "lock_transactions"))) + (function View Lock_transactions -> Some () | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Unlock_transactions" + (obj1 (req "request" (constant "unlock_transactions"))) + (function View Unlock_transactions -> Some () | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Is_locked" + (obj1 (req "request" (constant "is_locked"))) + (function View Is_locked -> Some () | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Content" + (obj1 (req "request" (constant "content"))) + (function View Content -> Some () | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Pop_transactions" + (obj1 (req "request" (constant "pop_transactions"))) + (function + | View (Pop_transactions {validation_state = _; validate_tx = _}) + -> + Some () + | _ -> None) + (fun _ -> assert false); + case + Json_only + ~title:"Confirm_transactions" + (obj3 + (req "request" (constant "confirm_transactions")) + (req "confirmed_txs" (list Ethereum_types.hash_encoding)) + (req "clear_pending_queue_after" bool)) + (function + | View + (Confirm_transactions + {confirmed_txs; clear_pending_queue_after}) -> + Some ((), List.of_seq confirmed_txs, clear_pending_queue_after) + | _ -> None) + (fun _ -> assert false); + ] + + let pp fmt (View r) = + let open Format in + match r with + | Inject {payload = Hex txn; _} -> fprintf fmt "Inject %s" txn + | Find {txn_hash = Hash (Hex txn_hash)} -> fprintf fmt "Find %s" txn_hash + | Tick _ -> fprintf fmt "Tick" + | Clear -> fprintf fmt "Clear" + | Nonce {next_nonce = _; address = Address (Hex address)} -> + fprintf fmt "Nonce %s" address + | Lock_transactions -> Format.fprintf fmt "Locking the transactions" + | Unlock_transactions -> Format.fprintf fmt "Unlocking the transactions" + | Is_locked -> Format.fprintf fmt "Checking if the tx queue is locked" + | Content -> fprintf fmt "Content" + | Pop_transactions {validation_state = _; validate_tx = _} -> + fprintf fmt "Popping transactions with validation function" + | Confirm_transactions _ -> fprintf fmt "Confirming transactions" + end + + module Worker = Octez_telemetry.Worker.MakeSingle (Name) (Request) (Types) + + type worker = Worker.infinite Worker.queue Worker.t + + let tx_queue_event ?attrs name = + Opentelemetry_lwt.Event.make + ?attrs + Format.(sprintf "%s/%s" (String.concat "." Name.base) name) + + module Handlers = struct + open Request + + type self = worker + + let uuid_seed = Random.get_state () + + let send_transactions_batch ~evm_node_endpoint ~keep_alive transactions = + let open Lwt_result_syntax in + let module M = Map.Make (String) in + let module Srt = Rpc_encodings.Send_raw_transaction in + if Seq.is_empty transactions then return_unit + else + let rev_batch, callbacks = + Seq.fold_left + (fun (rev_batch, callbacks) {hash; payload; queue_callback} -> + Octez_telemetry.Trace.add_event (fun () -> + tx_queue_event + ~attrs:Telemetry.Attributes.[Transaction.hash hash] + "selected_transaction") ; + let req_id = + Uuidm.(v4_gen uuid_seed () |> to_string ~upper:false) + in + let txn = + Rpc_encodings.JSONRPC. + { + method_ = Srt.method_; + parameters = + Some + (Data_encoding.Json.construct + Srt.input_encoding + payload); + id = Some (Id_string req_id); + } + in -module Handlers = struct - open Request + (txn :: rev_batch, M.add req_id queue_callback callbacks)) + ([], M.empty) + transactions + in + let batch = List.rev rev_batch in + + let*! () = Tx_queue_events.injecting_transactions (List.length batch) in + + let* responses = + match evm_node_endpoint with + | Rpc base -> + let* batch_response = + Rollup_services.call_service + ~keep_alive + ~base + (Batch.dispatch_batch_service ~path:Resto.Path.root) + () + () + (Batch batch) + in + return + (match batch_response with + | Singleton r -> [r] + | Batch rs -> rs) + | Websocket ws_client -> + List.map_ep + (fun req -> + let*! response = + Websocket_client.send_jsonrpc_request ws_client req + in + return Rpc_encodings.JSONRPC.{value = response; id = req.id}) + batch + in - type self = worker + let* missed_callbacks = + List.fold_left_es + (fun callbacks (response : Rpc_encodings.JSONRPC.response) -> + match response with + | {id = Some (Id_string req); value} -> ( + match (value, M.find_opt req callbacks) with + | value, Some callback -> + let* () = + match value with + | Ok _hash_encoded -> Lwt_result.ok (callback `Accepted) + | Error error -> + let*! () = Tx_queue_events.rpc_error error in + Lwt_result.ok (callback `Refused) + in + return (M.remove req callbacks) + | _ -> return callbacks) + | _ -> failwith "Inconsistent response from the server") + callbacks + responses + in - let uuid_seed = Random.get_state () + assert (M.is_empty missed_callbacks) ; + return_unit - let send_transactions_batch ~evm_node_endpoint ~keep_alive transactions = - let open Lwt_result_syntax in - let module M = Map.Make (String) in - let module Srt = Rpc_encodings.Send_raw_transaction in - if Seq.is_empty transactions then return_unit - else - let rev_batch, callbacks = - Seq.fold_left - (fun (rev_batch, callbacks) {hash; payload; queue_callback} -> - Octez_telemetry.Trace.add_event (fun () -> - tx_queue_event - ~attrs:Telemetry.Attributes.[Transaction.hash hash] - "selected_transaction") ; - let req_id = - Uuidm.(v4_gen uuid_seed () |> to_string ~upper:false) + (** clear values and keep the allocated space *) + let clear + ({ + queue; + pending; + tx_object; + tx_per_address; + address_nonce; + config = _; + keep_alive = _; + locked = _; + } : + state) = + (* full matching so when a new element is added to the state it's not + forgotten to clear it. *) + String.Hashtbl.clear pending ; + String.Hashtbl.clear tx_object ; + String.Hashtbl.clear tx_per_address ; + String.Hashtbl.clear address_nonce ; + Queue.clear queue ; + () + + let lock_transactions state = state.locked <- true + + let unlock_transactions state = state.locked <- false + + let is_locked state = state.locked + + let pop_queue_until state ~validation_state ~validate_tx = + let open Lwt_result_syntax in + let rec aux validation_state rev_selected = + match Queue.peek_opt state.queue with + | None -> return rev_selected + | Some {hash; payload; queue_callback} -> ( + let raw_tx = Ethereum_types.hex_to_bytes payload in + let tx_object = Transaction_objects.find state.tx_object hash in + match tx_object with + | None -> + (* Drop that tx because no tx_object associated. this is + an inpossible case, we log it to investigate. *) + let*! () = Tx_queue_events.missing_tx_object hash in + let _ = Queue.take state.queue in + let*! () = queue_callback `Refused in + aux validation_state rev_selected + | Some tx_object -> ( + let* is_valid = validate_tx validation_state raw_tx tx_object in + match is_valid with + | `Stop -> return rev_selected + (* `Stop means that we don't pop transaction anymore. We + don't remove the last peek tx because it could be valid + for another call. *) + | `Drop -> + (* `Drop, the current tx was evaluated and was refused + by the caller. *) + let _ = Queue.take state.queue in + let*! () = queue_callback `Refused in + aux validation_state rev_selected + | `Keep validation_state -> + (* `Keep, the current tx was evaluated and was validated + by the caller. *) + let _ = Queue.take state.queue in + let*! () = queue_callback `Accepted in + aux validation_state ((raw_tx, tx_object) :: rev_selected))) + in + let* rev_selected = aux validation_state [] in + return @@ List.rev rev_selected + + let on_request : + type r request_error. + worker -> + (r, request_error) Request.t -> + (r, request_error) result Lwt.t = + fun self request -> + let open Lwt_result_syntax in + let state = Worker.state self in + match request with + | Inject {next_nonce; payload; tx_object; callback} -> + protect @@ fun () -> + Tx_watcher.notify tx_object.hash ; + let (Address (Hex addr)) = tx_object.from in + let (Qty tx_nonce) = tx_object.nonce in + let pending_callback (reason : pending_variant) = + let open Lwt_syntax in + let* res = + match reason with + | `Dropped -> + let* () = + Tx_queue_events.transaction_dropped tx_object.hash + in + return + @@ Address_nonce.remove + state.address_nonce + ~addr + ~nonce:tx_nonce + | `Confirmed -> + let* () = + Tx_queue_events.transaction_confirmed tx_object.hash + in + return + @@ Address_nonce.confirm_nonce + state.address_nonce + ~addr + ~nonce:tx_nonce in - let txn = - Rpc_encodings.JSONRPC. - { - method_ = Srt.method_; - parameters = - Some - (Data_encoding.Json.construct Srt.input_encoding payload); - id = Some (Id_string req_id); - } + let* () = + match res with + | Ok () -> return_unit + | Error errs -> Tx_queue_events.callback_error errs in - - (txn :: rev_batch, M.add req_id queue_callback callbacks)) - ([], M.empty) - transactions - in - let batch = List.rev rev_batch in - - let*! () = Tx_queue_events.injecting_transactions (List.length batch) in - - let* responses = - match evm_node_endpoint with - | Rpc base -> - let* batch_response = - Rollup_services.call_service - ~keep_alive - ~base - (Batch.dispatch_batch_service ~path:Resto.Path.root) - () - () - (Batch batch) + Transactions_per_addr.decrement state.tx_per_address tx_object.from ; + Transaction_objects.remove state.tx_object tx_object.hash ; + Lwt.dont_wait + (fun () -> callback (reason :> all_variant)) + (fun exn -> + Tx_queue_events.callback_error__dont_wait__use_with_care + [Error_monad.error_of_exn exn]) ; + Lwt.return_unit + in + let queue_callback reason = + let open Lwt_syntax in + let* res = + match reason with + | `Accepted -> + Pending_transactions.add + state.pending + tx_object.hash + pending_callback ; + return_ok_unit + | `Refused -> + Transactions_per_addr.decrement + state.tx_per_address + tx_object.from ; + Transaction_objects.remove state.tx_object tx_object.hash ; + return + @@ Address_nonce.remove + state.address_nonce + ~addr + ~nonce:tx_nonce in - return - (match batch_response with Singleton r -> [r] | Batch rs -> rs) - | Websocket ws_client -> - List.map_ep - (fun req -> - let*! response = - Websocket_client.send_jsonrpc_request ws_client req + let* () = + match res with + | Ok () -> return_unit + | Error errs -> Tx_queue_events.callback_error errs + in + Lwt.dont_wait + (fun () -> callback (reason :> all_variant)) + (fun exn -> + Tx_queue_events.callback_error__dont_wait__use_with_care + [Error_monad.error_of_exn exn]) ; + Lwt.return_unit + in + if Compare.Int.(Queue.length state.queue < state.config.max_size) then ( + (* Check number of txs by user in tx_queue. *) + let nb_txs_in_queue = + Transactions_per_addr.find state.tx_per_address tx_object.from + in + match nb_txs_in_queue with + | Some i when i >= state.config.tx_per_addr_limit -> + let*! () = + Tx_pool_events.txs_per_user_threshold_reached + ~address:(Ethereum_types.Address.to_string tx_object.from) in - return Rpc_encodings.JSONRPC.{value = response; id = req.id}) - batch - in - - let* missed_callbacks = - List.fold_left_es - (fun callbacks (response : Rpc_encodings.JSONRPC.response) -> - match response with - | {id = Some (Id_string req); value} -> ( - match (value, M.find_opt req callbacks) with - | value, Some callback -> - let* () = - match value with - | Ok _hash_encoded -> Lwt_result.ok (callback `Accepted) - | Error error -> - let*! () = Tx_queue_events.rpc_error error in - Lwt_result.ok (callback `Refused) - in - return (M.remove req callbacks) - | _ -> return callbacks) - | _ -> failwith "Inconsistent response from the server") - callbacks - responses - in - - assert (M.is_empty missed_callbacks) ; - return_unit - - (** clear values and keep the allocated space *) - let clear - ({ - queue; - pending; - tx_object; - tx_per_address; - address_nonce; - config = _; - keep_alive = _; - locked = _; - } : - state) = - (* full matching so when a new element is added to the state it's not - forgotten to clear it. *) - String.Hashtbl.clear pending ; - String.Hashtbl.clear tx_object ; - String.Hashtbl.clear tx_per_address ; - String.Hashtbl.clear address_nonce ; - Queue.clear queue ; - () - - let lock_transactions state = state.locked <- true - - let unlock_transactions state = state.locked <- false - - let is_locked state = state.locked - - let pop_queue_until state ~validation_state ~validate_tx = - let open Lwt_result_syntax in - let rec aux validation_state rev_selected = - match Queue.peek_opt state.queue with - | None -> return rev_selected - | Some {hash; payload; queue_callback} -> ( - let raw_tx = Ethereum_types.hex_to_bytes payload in - let tx_object = Transaction_objects.find state.tx_object hash in - match tx_object with - | None -> - (* Drop that tx because no tx_object associated. this is - an inpossible case, we log it to investigate. *) - let*! () = Tx_queue_events.missing_tx_object hash in - let _ = Queue.take state.queue in - let*! () = queue_callback `Refused in - aux validation_state rev_selected - | Some tx_object -> ( - let* is_valid = validate_tx validation_state raw_tx tx_object in - match is_valid with - | `Stop -> return rev_selected - (* `Stop means that we don't pop transaction anymore. We - don't remove the last peek tx because it could be valid - for another call. *) - | `Drop -> - (* `Drop, the current tx was evaluated and was refused - by the caller. *) - let _ = Queue.take state.queue in - let*! () = queue_callback `Refused in - aux validation_state rev_selected - | `Keep validation_state -> - (* `Keep, the current tx was evaluated and was validated - by the caller. *) - let _ = Queue.take state.queue in - let*! () = queue_callback `Accepted in - aux validation_state ((raw_tx, tx_object) :: rev_selected))) - in - let* rev_selected = aux validation_state [] in - return @@ List.rev rev_selected - - let on_request : - type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun self request -> - let open Lwt_result_syntax in - let state = Worker.state self in - match request with - | Inject {next_nonce; payload; tx_object; callback} -> - protect @@ fun () -> - Tx_watcher.notify tx_object.hash ; - let (Address (Hex addr)) = tx_object.from in - let (Qty tx_nonce) = tx_object.nonce in - let pending_callback (reason : pending_variant) = - let open Lwt_syntax in - let* res = - match reason with - | `Dropped -> - let* () = Tx_queue_events.transaction_dropped tx_object.hash in return - @@ Address_nonce.remove - state.address_nonce - ~addr - ~nonce:tx_nonce - | `Confirmed -> - let* () = - Tx_queue_events.transaction_confirmed tx_object.hash + (Error + "Limit of transaction for a user was reached. Transaction \ + is rejected.") + | Some _ | None -> + let*! () = Tx_queue_events.add_transaction tx_object.hash in + Transactions_per_addr.increment + state.tx_per_address + tx_object.from ; + Transaction_objects.add state.tx_object tx_object ; + let Ethereum_types.(Qty next_nonce) = next_nonce in + let*? () = + Address_nonce.add + state.address_nonce + ~addr + ~next_nonce + ~nonce:tx_nonce in - return - @@ Address_nonce.confirm_nonce - state.address_nonce - ~addr - ~nonce:tx_nonce + Queue.add + {hash = tx_object.hash; payload; queue_callback} + state.queue ; + return (Ok ())) + else + return + (Error "Transaction limit was reached. Transaction is rejected.") + | Find {txn_hash} -> + protect @@ fun () -> + return @@ Transaction_objects.find state.tx_object txn_hash + | Tick {evm_node_endpoint} -> + protect @@ fun () -> + let all_transactions = Queue.to_seq state.queue in + let* transactions_to_inject, remaining_transactions = + match state.config.max_transaction_batch_length with + | None -> return (all_transactions, Seq.empty) + | Some max_transaction_batch_length -> + let when_negative_length = + TzTrace.make + (Exn (Failure "Negative max_transaction_batch_length")) + in + let*? transactions_to_inject = + Seq.take + ~when_negative_length + max_transaction_batch_length + all_transactions + in + let*? remaining_transactions = + Seq.drop + ~when_negative_length + max_transaction_batch_length + all_transactions + in + return (transactions_to_inject, remaining_transactions) in + state.queue <- Queue.of_seq remaining_transactions ; + let* () = - match res with - | Ok () -> return_unit - | Error errs -> Tx_queue_events.callback_error errs + send_transactions_batch + ~keep_alive:state.keep_alive + ~evm_node_endpoint + transactions_to_inject in - Transactions_per_addr.decrement state.tx_per_address tx_object.from ; - Transaction_objects.remove state.tx_object tx_object.hash ; - Lwt.dont_wait - (fun () -> callback (reason :> all_variant)) - (fun exn -> - Tx_queue_events.callback_error__dont_wait__use_with_care - [Error_monad.error_of_exn exn]) ; - Lwt.return_unit - in - let queue_callback reason = - let open Lwt_syntax in - let* res = - match reason with - | `Accepted -> - Pending_transactions.add - state.pending - tx_object.hash - pending_callback ; - return_ok_unit - | `Refused -> - Transactions_per_addr.decrement - state.tx_per_address - tx_object.from ; - Transaction_objects.remove state.tx_object tx_object.hash ; - return - @@ Address_nonce.remove - state.address_nonce - ~addr - ~nonce:tx_nonce + + let txns = + Pending_transactions.drop + ~max_lifespan:(Ptime.Span.of_int_s state.config.max_lifespan_s) + state.pending in - let* () = - match res with - | Ok () -> return_unit - | Error errs -> Tx_queue_events.callback_error errs + let*! () = + List.iter_s + (fun {pending_callback; _} -> pending_callback `Dropped) + txns in - Lwt.dont_wait - (fun () -> callback (reason :> all_variant)) - (fun exn -> - Tx_queue_events.callback_error__dont_wait__use_with_care - [Error_monad.error_of_exn exn]) ; - Lwt.return_unit - in - if Compare.Int.(Queue.length state.queue < state.config.max_size) then ( - (* Check number of txs by user in tx_queue. *) - let nb_txs_in_queue = - Transactions_per_addr.find state.tx_per_address tx_object.from + return_unit + | Clear -> + protect @@ fun () -> + clear state ; + let*! () = Tx_queue_events.cleared () in + return_unit + | Nonce {next_nonce; address = Address (Hex addr)} -> + protect @@ fun () -> + let Ethereum_types.(Qty next_nonce) = next_nonce in + let*? next_gap = + Address_nonce.next_gap state.address_nonce ~addr ~next_nonce + in + return @@ Ethereum_types.Qty next_gap + | Lock_transactions -> + protect @@ fun () -> return (lock_transactions state) + | Unlock_transactions -> + protect @@ fun () -> return (unlock_transactions state) + | Is_locked -> protect @@ fun () -> return (is_locked state) + | Content -> + protect @@ fun () -> + let open Ethereum_types in + let process_transactions tx_map lookup_fn acc = + String.Hashtbl.fold + (fun hash value acc -> + match lookup_fn hash value with + | Some (obj : Ethereum_types.legacy_transaction_object) -> + let existing_nonce_map = + AddressMap.find_opt obj.from acc + |> Option.value ~default:NonceMap.empty + in + let updated_nonce_map = + NonceMap.add (Qty.to_z obj.nonce) obj existing_nonce_map + in + AddressMap.add obj.from updated_nonce_map acc + | None -> acc) + tx_map + acc in - match nb_txs_in_queue with - | Some i when i >= state.config.tx_per_addr_limit -> - let*! () = - Tx_pool_events.txs_per_user_threshold_reached - ~address:(Ethereum_types.Address.to_string tx_object.from) - in - return - (Error - "Limit of transaction for a user was reached. Transaction \ - is rejected.") - | Some _ | None -> - let*! () = Tx_queue_events.add_transaction tx_object.hash in - Transactions_per_addr.increment - state.tx_per_address - tx_object.from ; - Transaction_objects.add state.tx_object tx_object ; - let Ethereum_types.(Qty next_nonce) = next_nonce in - let*? () = - Address_nonce.add - state.address_nonce - ~addr - ~next_nonce - ~nonce:tx_nonce - in - Queue.add - {hash = tx_object.hash; payload; queue_callback} - state.queue ; - return (Ok ())) - else - return - (Error "Transaction limit was reached. Transaction is rejected.") - | Find {txn_hash} -> - protect @@ fun () -> - return @@ Transaction_objects.find state.tx_object txn_hash - | Tick {evm_node_endpoint} -> - protect @@ fun () -> - let all_transactions = Queue.to_seq state.queue in - let* transactions_to_inject, remaining_transactions = - match state.config.max_transaction_batch_length with - | None -> return (all_transactions, Seq.empty) - | Some max_transaction_batch_length -> - let when_negative_length = - TzTrace.make - (Exn (Failure "Negative max_transaction_batch_length")) - in - let*? transactions_to_inject = - Seq.take - ~when_negative_length - max_transaction_batch_length - all_transactions - in - let*? remaining_transactions = - Seq.drop - ~when_negative_length - max_transaction_batch_length - all_transactions - in - return (transactions_to_inject, remaining_transactions) - in - state.queue <- Queue.of_seq remaining_transactions ; - - let* () = - send_transactions_batch - ~keep_alive:state.keep_alive - ~evm_node_endpoint - transactions_to_inject - in - - let txns = - Pending_transactions.drop - ~max_lifespan:(Ptime.Span.of_int_s state.config.max_lifespan_s) - state.pending - in - let*! () = - List.iter_s - (fun {pending_callback; _} -> pending_callback `Dropped) - txns - in - return_unit - | Clear -> - protect @@ fun () -> - clear state ; - let*! () = Tx_queue_events.cleared () in - return_unit - | Nonce {next_nonce; address = Address (Hex addr)} -> - protect @@ fun () -> - let Ethereum_types.(Qty next_nonce) = next_nonce in - let*? next_gap = - Address_nonce.next_gap state.address_nonce ~addr ~next_nonce - in - return @@ Ethereum_types.Qty next_gap - | Lock_transactions -> protect @@ fun () -> return (lock_transactions state) - | Unlock_transactions -> - protect @@ fun () -> return (unlock_transactions state) - | Is_locked -> protect @@ fun () -> return (is_locked state) - | Content -> - protect @@ fun () -> - let open Ethereum_types in - let process_transactions tx_map lookup_fn acc = - String.Hashtbl.fold - (fun hash value acc -> - match lookup_fn hash value with - | Some (obj : Ethereum_types.legacy_transaction_object) -> - let existing_nonce_map = - AddressMap.find_opt obj.from acc - |> Option.value ~default:NonceMap.empty - in - let updated_nonce_map = - NonceMap.add (Qty.to_z obj.nonce) obj existing_nonce_map - in - AddressMap.add obj.from updated_nonce_map acc - | None -> acc) - tx_map - acc - in - (* Process pending and collect found transactions *) - let pending = - process_transactions - state.pending - (fun hash _v -> String.Hashtbl.find_opt state.tx_object hash) - AddressMap.empty - in + (* Process pending and collect found transactions *) + let pending = + process_transactions + state.pending + (fun hash _v -> String.Hashtbl.find_opt state.tx_object hash) + AddressMap.empty + in - (* Process tx_object separately to collect the queued (unmatched) transactions *) - let queued = - process_transactions - state.tx_object - (fun hash obj -> - if String.Hashtbl.mem state.pending hash then None else Some obj) - AddressMap.empty - in + (* Process tx_object separately to collect the queued (unmatched) transactions *) + let queued = + process_transactions + state.tx_object + (fun hash obj -> + if String.Hashtbl.mem state.pending hash then None else Some obj) + AddressMap.empty + in - return {pending; queued} - | Pop_transactions {validation_state; validate_tx} -> - protect @@ fun () -> - if is_locked state then return [] - else pop_queue_until state ~validate_tx ~validation_state - | Confirm_transactions {confirmed_txs; clear_pending_queue_after} -> - protect @@ fun () -> - let*! () = - Seq.S.iter - (fun hash -> - let callback = Pending_transactions.pop state.pending hash in - match callback with - | Some {pending_callback; _} -> pending_callback `Confirmed - | None -> - (* delayed transactions hashes are part of confirmed - txs *) - Lwt.return_unit) - confirmed_txs - in - if clear_pending_queue_after then ( - let dropped = Pending_transactions.to_seq state.pending in + return {pending; queued} + | Pop_transactions {validation_state; validate_tx} -> + protect @@ fun () -> + if is_locked state then return [] + else pop_queue_until state ~validate_tx ~validation_state + | Confirm_transactions {confirmed_txs; clear_pending_queue_after} -> + protect @@ fun () -> let*! () = Seq.S.iter - (fun {pending_callback; _} -> pending_callback `Dropped) - dropped + (fun hash -> + let callback = Pending_transactions.pop state.pending hash in + match callback with + | Some {pending_callback; _} -> pending_callback `Confirmed + | None -> + (* delayed transactions hashes are part of confirmed + txs *) + Lwt.return_unit) + confirmed_txs in - (* Emptying the pending the dropped transactions *) - Pending_transactions.clear state.pending ; - return_unit) - else return_unit - - type launch_error = tztrace - - let on_launch _self () ({config; keep_alive} : parameters) = - let open Lwt_result_syntax in - return - { - queue = Queue.create (); - pending = Pending_transactions.empty ~start_size:(config.max_size / 4); - (* start with /4 and let it grow if necessary to not allocate - too much at start. *) - tx_object = Transaction_objects.empty ~start_size:(config.max_size / 4); - address_nonce = Address_nonce.empty ~start_size:(config.max_size / 10); - (* start with /10 and let it grow if necessary to not allocate - too much at start. It's expected to have less different - addresses than transactions. *) - tx_per_address = Transactions_per_addr.empty ~start_size:500; - (* Provide an arbitrary size for the initial hash tables, to - be revisited if needs be. *) - config; - keep_alive; - locked = false; - } - - let on_error (type a b) _self _status_request (_r : (a, b) Request.t) - (_errs : b) : [`Continue | `Shutdown] tzresult Lwt.t = - Lwt_result_syntax.return `Continue - - let on_completion _ _ _ _ = Lwt.return_unit - - let on_no_request _ = Lwt.return_unit - - let on_close _ = Lwt.return_unit -end + if clear_pending_queue_after then ( + let dropped = Pending_transactions.to_seq state.pending in + let*! () = + Seq.S.iter + (fun {pending_callback; _} -> pending_callback `Dropped) + dropped + in + (* Emptying the pending the dropped transactions *) + Pending_transactions.clear state.pending ; + return_unit) + else return_unit + + type launch_error = tztrace + + let on_launch _self () ({config; keep_alive} : parameters) = + let open Lwt_result_syntax in + return + { + queue = Queue.create (); + pending = Pending_transactions.empty ~start_size:(config.max_size / 4); + (* start with /4 and let it grow if necessary to not allocate + too much at start. *) + tx_object = Transaction_objects.empty ~start_size:(config.max_size / 4); + address_nonce = Address_nonce.empty ~start_size:(config.max_size / 10); + (* start with /10 and let it grow if necessary to not allocate + too much at start. It's expected to have less different + addresses than transactions. *) + tx_per_address = Transactions_per_addr.empty ~start_size:500; + (* Provide an arbitrary size for the initial hash tables, to + be revisited if needs be. *) + config; + keep_alive; + locked = false; + } + + let on_error (type a b) _self _status_request (_r : (a, b) Request.t) + (_errs : b) : [`Continue | `Shutdown] tzresult Lwt.t = + Lwt_result_syntax.return `Continue + + let on_completion _ _ _ _ = Lwt.return_unit + + let on_no_request _ = Lwt.return_unit + + let on_close _ = Lwt.return_unit + end -module Tx_container = struct let table = Worker.create_table Queue let worker_promise, worker_waker = Lwt.task () -- GitLab From 4f0202c515cff8a6d622ffa84260d49144cc1771 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Tue, 17 Jun 2025 01:00:09 +0200 Subject: [PATCH 4/6] Etherlink/Node/Tx_queue: use L2_transaction.Eth_transaction_object --- .../lib_dev/encodings/ethereum_types.mli | 3 + etherlink/bin_node/lib_dev/tx_queue.ml | 131 +++++++++++------- 2 files changed, 82 insertions(+), 52 deletions(-) diff --git a/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli b/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli index a03036ef1bd3..cd730263eb9e 100644 --- a/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli +++ b/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli @@ -33,6 +33,9 @@ val hex_encoding : hex Data_encoding.t (** version of [hex_encoding] that do not add `0x` on encoded values. *) val hex_encoding_no0x : hex Data_encoding.t +(** Produced string is prefixed with [0x]. *) +val hex_to_string : hex -> string + (** Strips the [0x] prefix of a string. *) val hex_of_string : string -> hex diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index dcb145f429db..1e1e7a3d1c77 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -36,10 +36,12 @@ type pending_request = { type callback = all_variant variant_callback +module Tx = Tx_queue_types.Eth_transaction_object + type request = { next_nonce : Ethereum_types.quantity; payload : Ethereum_types.hex; - tx_object : Ethereum_types.legacy_transaction_object; + tx_object : Tx.legacy; callback : callback; } @@ -261,13 +263,12 @@ module Tx_container = struct open Ethereum_types module S = String.Hashtbl - type t = Ethereum_types.legacy_transaction_object S.t + type t = Tx.legacy S.t let empty ~start_size = S.create start_size - let add htbl - (({hash = Hash (Hex hash); _} : - Ethereum_types.legacy_transaction_object) as tx_object) = + let add htbl tx_object = + let (Hash (Hex hash)) = Tx.hash_of_tx_object tx_object in (* Here we uses `add` and not `replace`. If a transaction is submitted multiple times then we register it each time in the hashtable. Meaning that for `find` to returns None, we must @@ -325,11 +326,11 @@ module Tx_container = struct let empty ~start_size = S.create start_size - let remove s (Ethereum_types.Address (Hex h)) = S.remove s h + let remove s a = S.remove s (Tx.address_to_string a) - let find s (Ethereum_types.Address (Hex h)) = S.find s h + let find s a = S.find s (Tx.address_to_string a) - let add s (Ethereum_types.Address (Hex h)) i = S.replace s h i + let add s a i = S.replace s (Tx.address_to_string a) i let decrement s address = let current = find s address in @@ -377,13 +378,10 @@ module Tx_container = struct module Request = struct type ('a, 'b) t = | Inject : request -> ((unit, string) result, tztrace) t - | Find : { - txn_hash : Ethereum_types.hash; - } - -> (Ethereum_types.legacy_transaction_object option, tztrace) t + | Find : {txn_hash : Ethereum_types.hash} -> (Tx.legacy option, tztrace) t | Nonce : { next_nonce : Ethereum_types.quantity; - address : Ethereum_types.address; + address : Tx.address; } -> (Ethereum_types.quantity, tztrace) t | Tick : {evm_node_endpoint : endpoint} -> (unit, tztrace) t @@ -397,12 +395,10 @@ module Tx_container = struct validate_tx : 'a -> string -> - Ethereum_types.legacy_transaction_object -> + Tx.legacy -> [`Keep of 'a | `Drop | `Stop] tzresult Lwt.t; } - -> ( (string * Ethereum_types.legacy_transaction_object) list, - tztrace ) - t + -> ((string * Tx.legacy) list, tztrace) t | Confirm_transactions : { confirmed_txs : Ethereum_types.hash Seq.t; clear_pending_queue_after : bool; @@ -479,7 +475,7 @@ module Tx_container = struct (obj3 (req "request" (constant "nonce")) (req "next_nonce" Ethereum_types.quantity_encoding) - (req "address" Ethereum_types.address_encoding)) + (req "address" Tx.address_encoding)) (function | View (Nonce {next_nonce; address}) -> Some ((), next_nonce, address) @@ -542,8 +538,8 @@ module Tx_container = struct | Find {txn_hash = Hash (Hex txn_hash)} -> fprintf fmt "Find %s" txn_hash | Tick _ -> fprintf fmt "Tick" | Clear -> fprintf fmt "Clear" - | Nonce {next_nonce = _; address = Address (Hex address)} -> - fprintf fmt "Nonce %s" address + | Nonce {next_nonce = _; address} -> + fprintf fmt "Nonce %s" (Tx.address_to_string address) | Lock_transactions -> Format.fprintf fmt "Locking the transactions" | Unlock_transactions -> Format.fprintf fmt "Unlocking the transactions" | Is_locked -> Format.fprintf fmt "Checking if the tx queue is locked" @@ -734,16 +730,19 @@ module Tx_container = struct match request with | Inject {next_nonce; payload; tx_object; callback} -> protect @@ fun () -> - Tx_watcher.notify tx_object.hash ; - let (Address (Hex addr)) = tx_object.from in - let (Qty tx_nonce) = tx_object.nonce in + Tx_watcher.notify (Tx.hash_of_tx_object tx_object) ; + let addr = + Tx.address_to_string (Tx.from_address_of_tx_object tx_object) + in + let (Qty tx_nonce) = Tx.nonce_of_tx_object tx_object in let pending_callback (reason : pending_variant) = let open Lwt_syntax in let* res = match reason with | `Dropped -> let* () = - Tx_queue_events.transaction_dropped tx_object.hash + Tx_queue_events.transaction_dropped + (Tx.hash_of_tx_object tx_object) in return @@ Address_nonce.remove @@ -752,7 +751,8 @@ module Tx_container = struct ~nonce:tx_nonce | `Confirmed -> let* () = - Tx_queue_events.transaction_confirmed tx_object.hash + Tx_queue_events.transaction_confirmed + (Tx.hash_of_tx_object tx_object) in return @@ Address_nonce.confirm_nonce @@ -765,8 +765,12 @@ module Tx_container = struct | Ok () -> return_unit | Error errs -> Tx_queue_events.callback_error errs in - Transactions_per_addr.decrement state.tx_per_address tx_object.from ; - Transaction_objects.remove state.tx_object tx_object.hash ; + Transactions_per_addr.decrement + state.tx_per_address + (Tx.from_address_of_tx_object tx_object) ; + Transaction_objects.remove + state.tx_object + (Tx.hash_of_tx_object tx_object) ; Lwt.dont_wait (fun () -> callback (reason :> all_variant)) (fun exn -> @@ -781,14 +785,16 @@ module Tx_container = struct | `Accepted -> Pending_transactions.add state.pending - tx_object.hash + (Tx.hash_of_tx_object tx_object) pending_callback ; return_ok_unit | `Refused -> Transactions_per_addr.decrement state.tx_per_address - tx_object.from ; - Transaction_objects.remove state.tx_object tx_object.hash ; + (Tx.from_address_of_tx_object tx_object) ; + Transaction_objects.remove + state.tx_object + (Tx.hash_of_tx_object tx_object) ; return @@ Address_nonce.remove state.address_nonce @@ -810,23 +816,32 @@ module Tx_container = struct if Compare.Int.(Queue.length state.queue < state.config.max_size) then ( (* Check number of txs by user in tx_queue. *) let nb_txs_in_queue = - Transactions_per_addr.find state.tx_per_address tx_object.from + Transactions_per_addr.find + state.tx_per_address + (Tx.from_address_of_tx_object tx_object) in match nb_txs_in_queue with | Some i when i >= state.config.tx_per_addr_limit -> let*! () = Tx_pool_events.txs_per_user_threshold_reached - ~address:(Ethereum_types.Address.to_string tx_object.from) + ~address: + (Ethereum_types.hex_to_string + (Ethereum_types.Hex + (Tx.address_to_string + (Tx.from_address_of_tx_object tx_object)))) in return (Error "Limit of transaction for a user was reached. Transaction \ is rejected.") | Some _ | None -> - let*! () = Tx_queue_events.add_transaction tx_object.hash in + let*! () = + Tx_queue_events.add_transaction + (Tx.hash_of_tx_object tx_object) + in Transactions_per_addr.increment state.tx_per_address - tx_object.from ; + (Tx.from_address_of_tx_object tx_object) ; Transaction_objects.add state.tx_object tx_object ; let Ethereum_types.(Qty next_nonce) = next_nonce in let*? () = @@ -837,7 +852,11 @@ module Tx_container = struct ~nonce:tx_nonce in Queue.add - {hash = tx_object.hash; payload; queue_callback} + { + hash = Tx.hash_of_tx_object tx_object; + payload; + queue_callback; + } state.queue ; return (Ok ())) else @@ -896,11 +915,14 @@ module Tx_container = struct clear state ; let*! () = Tx_queue_events.cleared () in return_unit - | Nonce {next_nonce; address = Address (Hex addr)} -> + | Nonce {next_nonce; address} -> protect @@ fun () -> let Ethereum_types.(Qty next_nonce) = next_nonce in let*? next_gap = - Address_nonce.next_gap state.address_nonce ~addr ~next_nonce + Address_nonce.next_gap + state.address_nonce + ~addr:(Tx.address_to_string address) + ~next_nonce in return @@ Ethereum_types.Qty next_gap | Lock_transactions -> @@ -915,15 +937,23 @@ module Tx_container = struct String.Hashtbl.fold (fun hash value acc -> match lookup_fn hash value with - | Some (obj : Ethereum_types.legacy_transaction_object) -> + | Some (obj : Tx.legacy) -> let existing_nonce_map = - AddressMap.find_opt obj.from acc - |> Option.value ~default:NonceMap.empty + Tx.AddressMap.find_opt + (Tx.from_address_of_tx_object obj) + acc + |> Option.value ~default:Ethereum_types.NonceMap.empty in let updated_nonce_map = - NonceMap.add (Qty.to_z obj.nonce) obj existing_nonce_map + Ethereum_types.NonceMap.add + (Qty.to_z (Tx.nonce_of_tx_object obj)) + obj + existing_nonce_map in - AddressMap.add obj.from updated_nonce_map acc + Tx.AddressMap.add + (Tx.from_address_of_tx_object obj) + updated_nonce_map + acc | None -> acc) tx_map acc @@ -934,7 +964,7 @@ module Tx_container = struct process_transactions state.pending (fun hash _v -> String.Hashtbl.find_opt state.tx_object hash) - AddressMap.empty + Tx.AddressMap.empty in (* Process tx_object separately to collect the queued (unmatched) transactions *) @@ -943,10 +973,10 @@ module Tx_container = struct state.tx_object (fun hash obj -> if String.Hashtbl.mem state.pending hash then None else Some obj) - AddressMap.empty + Tx.AddressMap.empty in - return {pending; queued} + return (Tx.make_txpool ~pending ~queued) | Pop_transactions {validation_state; validate_tx} -> protect @@ fun () -> if is_locked state then return [] @@ -1067,8 +1097,8 @@ module Tx_container = struct Worker.Queue.push_request_and_wait w (Nonce {next_nonce; address}) |> handle_request_error - let inject ?(callback = fun _ -> Lwt_syntax.return_unit) ~next_nonce - (tx_object : Ethereum_types.legacy_transaction_object) txn = + let inject ?(callback = fun _ -> Lwt_syntax.return_unit) ~next_nonce tx_object + txn = let open Lwt_syntax in let* worker = worker_promise in Worker.Queue.push_request_and_wait @@ -1080,7 +1110,7 @@ module Tx_container = struct let open Lwt_result_syntax in let* res = inject ~next_nonce tx_object raw_tx in match res with - | Ok () -> return (Ok tx_object.hash) + | Ok () -> return (Ok (Tx.hash_of_tx_object tx_object)) | Error errs -> return (Error errs) let find txn_hash = @@ -1093,10 +1123,7 @@ module Tx_container = struct (* TODO: https://gitlab.com/tezos/tezos/-/issues/7747 We should instrument the TX queue to return the real transaction objects. *) - return - (Option.map - Transaction_object.from_store_transaction_object - legacy_tx_object) + return (Option.map Tx.transaction_object_from_legacy legacy_tx_object) let content () = let open Lwt_result_syntax in -- GitLab From 1e4197410b6d32af4c0d544b7f53c01a8722df51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Tue, 17 Jun 2025 01:01:12 +0200 Subject: [PATCH 5/6] Etherlink/Node/Tx_queue: abstract the Tx module --- etherlink/bin_node/lib_dev/tx_queue.ml | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 1e1e7a3d1c77..65d76d91fc5a 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -36,15 +36,6 @@ type pending_request = { type callback = all_variant variant_callback -module Tx = Tx_queue_types.Eth_transaction_object - -type request = { - next_nonce : Ethereum_types.quantity; - payload : Ethereum_types.hex; - tx_object : Tx.legacy; - callback : callback; -} - (** Inject transactions with either RPCs or on a websocket connection. *) type endpoint = Services_backend_sig.endpoint = | Rpc of Uri.t @@ -258,7 +249,7 @@ module Address_nonce = struct | None -> return next_nonce end -module Tx_container = struct +module Tx_container (Tx : Tx_queue_types.L2_transaction) = struct module Transaction_objects = struct open Ethereum_types module S = String.Hashtbl @@ -376,6 +367,13 @@ module Tx_container = struct end module Request = struct + type request = { + next_nonce : Ethereum_types.quantity; + payload : Ethereum_types.hex; + tx_object : Tx.legacy; + callback : callback; + } + type ('a, 'b) t = | Inject : request -> ((unit, string) result, tztrace) t | Find : {txn_hash : Ethereum_types.hash} -> (Tx.legacy option, tztrace) t @@ -1193,9 +1191,12 @@ module Tx_container = struct return_unit end -let start = Tx_container.start +module Eth_tx_container = Tx_container (Tx_queue_types.Eth_transaction_object) + +let start = Eth_tx_container.start -let tx_container = Services_backend_sig.Evm_tx_container (module Tx_container) +let tx_container = + Services_backend_sig.Evm_tx_container (module Eth_tx_container) module Internal_for_tests = struct module Nonce_bitset = Nonce_bitset -- GitLab From b2abd950a6bdc89654708183897384fe6e306ba5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Mon, 14 Apr 2025 10:28:49 +0200 Subject: [PATCH 6/6] Etherlink/Node: abstract transaction types in Tx_container --- etherlink/bin_node/lib_dev/observer.ml | 6 +++++ etherlink/bin_node/lib_dev/proxy.ml | 6 +++++ etherlink/bin_node/lib_dev/rpc.ml | 6 +++++ .../bin_node/lib_dev/services_backend_sig.ml | 22 ++++++++++++++----- etherlink/bin_node/lib_dev/tx_pool.ml | 6 +++++ etherlink/bin_node/lib_dev/tx_queue.ml | 6 +++++ 6 files changed, 46 insertions(+), 6 deletions(-) diff --git a/etherlink/bin_node/lib_dev/observer.ml b/etherlink/bin_node/lib_dev/observer.ml index d71db38ad45b..85051cebb4cd 100644 --- a/etherlink/bin_node/lib_dev/observer.ml +++ b/etherlink/bin_node/lib_dev/observer.ml @@ -118,6 +118,12 @@ let container_forward_tx ~keep_alive ~evm_node_endpoint : L2_types.evm_chain_family Services_backend_sig.tx_container = Services_backend_sig.Evm_tx_container (module struct + type address = Ethereum_types.address + + type legacy_transaction_object = Ethereum_types.legacy_transaction_object + + type transaction_object = Transaction_object.t + let nonce ~next_nonce _address = Lwt_result.return next_nonce let add ~next_nonce:_ _tx_object ~raw_tx = diff --git a/etherlink/bin_node/lib_dev/proxy.ml b/etherlink/bin_node/lib_dev/proxy.ml index 87c0eda79ab9..f6a7354ac719 100644 --- a/etherlink/bin_node/lib_dev/proxy.ml +++ b/etherlink/bin_node/lib_dev/proxy.ml @@ -25,6 +25,12 @@ let container_forward_tx ~evm_node_endpoint ~keep_alive : L2_types.evm_chain_family Services_backend_sig.tx_container = Services_backend_sig.Evm_tx_container (module struct + type address = Ethereum_types.address + + type legacy_transaction_object = Ethereum_types.legacy_transaction_object + + type transaction_object = Transaction_object.t + let nonce ~next_nonce _address = Lwt_result.return next_nonce let add ~next_nonce:_ _tx_object ~raw_tx = diff --git a/etherlink/bin_node/lib_dev/rpc.ml b/etherlink/bin_node/lib_dev/rpc.ml index 74e64aee37cd..4eb1d32f5efa 100644 --- a/etherlink/bin_node/lib_dev/rpc.ml +++ b/etherlink/bin_node/lib_dev/rpc.ml @@ -65,6 +65,12 @@ let container_forward_request ~public_endpoint ~private_endpoint ~keep_alive : L2_types.evm_chain_family Services_backend_sig.tx_container = Services_backend_sig.Evm_tx_container (module struct + type address = Ethereum_types.address + + type legacy_transaction_object = Ethereum_types.legacy_transaction_object + + type transaction_object = Transaction_object.t + let rpc_error = Internal_event.Simple.declare_2 ~section:Events.section diff --git a/etherlink/bin_node/lib_dev/services_backend_sig.ml b/etherlink/bin_node/lib_dev/services_backend_sig.ml index f606fda9e200..b0b22be5b94b 100644 --- a/etherlink/bin_node/lib_dev/services_backend_sig.ml +++ b/etherlink/bin_node/lib_dev/services_backend_sig.ml @@ -185,11 +185,17 @@ type endpoint = Rpc of Uri.t | Websocket of Websocket_client.t {!Services.dispatch_request} to request informations about pending transactions. *) module type Tx_container = sig + type address + + type legacy_transaction_object + + type transaction_object + (** [nonce ~next_nonce address] must returns the next gap nonce available. *) val nonce : next_nonce:Ethereum_types.quantity -> - Ethereum_types.address -> + address -> Ethereum_types.quantity tzresult Lwt.t (** [add ~next_nonce tx_object raw_tx] returns the next gap nonce @@ -197,13 +203,13 @@ module type Tx_container = sig [next_nonce] is the next expected nonce found in the backend. *) val add : next_nonce:Ethereum_types.quantity -> - Ethereum_types.legacy_transaction_object -> + legacy_transaction_object -> raw_tx:Ethereum_types.hex -> (Ethereum_types.hash, string) result tzresult Lwt.t (** [find hash] returns the transaction_object found in tx container. *) - val find : Ethereum_types.hash -> Transaction_object.t option tzresult Lwt.t + val find : Ethereum_types.hash -> transaction_object option tzresult Lwt.t (** [content ()] returns all the transactions found in tx container. *) @@ -262,10 +268,10 @@ module type Tx_container = sig validate_tx: ('a -> string -> - Ethereum_types.legacy_transaction_object -> + legacy_transaction_object -> [`Keep of 'a | `Drop | `Stop] tzresult Lwt.t) -> initial_validation_state:'a -> - (string * Ethereum_types.legacy_transaction_object) list tzresult Lwt.t + (string * legacy_transaction_object) list tzresult Lwt.t end (** ['f tx_container] is a GADT parametrized by the same type argument @@ -275,7 +281,11 @@ end type 'f tx_container = | Evm_tx_container : - (module Tx_container) + (module Tx_container + with type address = Ethereum_types.address + and type legacy_transaction_object = + Ethereum_types.legacy_transaction_object + and type transaction_object = Transaction_object.t) -> L2_types.evm_chain_family tx_container | Michelson_tx_container : (module Tx_container) diff --git a/etherlink/bin_node/lib_dev/tx_pool.ml b/etherlink/bin_node/lib_dev/tx_pool.ml index 90e763efac69..c9d8deaa6077 100644 --- a/etherlink/bin_node/lib_dev/tx_pool.ml +++ b/etherlink/bin_node/lib_dev/tx_pool.ml @@ -927,6 +927,12 @@ let mode () = return state.mode module Tx_container = struct + type address = Ethereum_types.address + + type legacy_transaction_object = Ethereum_types.legacy_transaction_object + + type transaction_object = Transaction_object.t + let nonce ~next_nonce:_ address = nonce address let add ~next_nonce:_ tx_object ~raw_tx = diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 65d76d91fc5a..d339c26cb8ce 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -1039,6 +1039,12 @@ module Tx_container (Tx : Tx_queue_types.L2_transaction) = struct let on_close _ = Lwt.return_unit end + type address = Tx.address + + type legacy_transaction_object = Tx.legacy + + type transaction_object = Tx.t + let table = Worker.create_table Queue let worker_promise, worker_waker = Lwt.task () -- GitLab