diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 8a8960e5c5db7f52ad0a73c2bd560c737aa0646c..d1b0ad02ca325d1eb4934290a532ba61281eb184 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -12,24 +12,48 @@ let two_seconds = Ptime.Span.of_int_s 2 type parameters = {evm_node_endpoint : Uri.t; config : Configuration.tx_queue} -type callback = - [`Accepted of Ethereum_types.hash | `Confirmed | `Dropped | `Refused] -> - unit Lwt.t +type queue_variant = [`Accepted of Ethereum_types.hash | `Refused] -type request = {payload : Ethereum_types.hex; callback : callback} +type pending_variant = [`Confirmed | `Dropped] + +type all_variant = [queue_variant | pending_variant] + +type 'a variant_callback = 'a -> unit Lwt.t + +(** tx is in the queue and wait to be injected into the upstream + node. *) +type queue_request = { + payload : Ethereum_types.hex; (** payload of the transaction *) + queue_callback : queue_variant variant_callback; + (** callback to call with the response given by the upstream + node. *) +} -type pending = {callback : callback; since : Ptime.t} +(** tx have been forwarded to the upstream node, now it's pending until confirmed. *) +type pending_request = { + since : Time.System.t; + (** time when the transaction was injected into the upstream node. *) + pending_callback : pending_variant variant_callback; + (** callback to call when the pending transaction have been confirmed or is dropped. *) +} + +type callback = all_variant variant_callback + +type request = {payload : Ethereum_types.hex; callback : callback} module Pending_transactions = struct open Ethereum_types module S = String.Hashtbl - type t = pending S.t + type t = pending_request S.t let empty ~start_size = S.create start_size - let add htbl (Hash (Hex hash)) callback = - S.add htbl hash ({callback; since = Time.System.now ()} : pending) + let add htbl (Hash (Hex hash)) pending_callback = + S.add + htbl + hash + ({pending_callback; since = Time.System.now ()} : pending_request) let pop htbl (Hash (Hex hash)) = match S.find htbl hash with @@ -54,7 +78,7 @@ end type state = { evm_node_endpoint : Uri.t; - mutable queue : request Queue.t; + mutable queue : queue_request Queue.t; pending : Pending_transactions.t; config : Configuration.tx_queue; } @@ -79,11 +103,7 @@ end module Request = struct type ('a, 'b) t = - | Inject : { - payload : Ethereum_types.hex; - callback : callback; - } - -> (unit, tztrace) t + | Inject : request -> (unit, tztrace) t | Confirm : {txn_hash : Ethereum_types.hash} -> (unit, tztrace) t | Tick : (unit, tztrace) t @@ -145,7 +165,7 @@ let send_transactions_batch ~evm_node_endpoint transactions = else let rev_batch, callbacks = Seq.fold_left - (fun (rev_batch, callbacks) {payload; callback} -> + (fun (rev_batch, callbacks) {payload; queue_callback} -> let req_id = Uuidm.(v4_gen uuid_seed () |> to_string ~upper:false) in let txn = Rpc_encodings.JSONRPC. @@ -157,7 +177,7 @@ let send_transactions_batch ~evm_node_endpoint transactions = } in - (txn :: rev_batch, M.add req_id callback callbacks)) + (txn :: rev_batch, M.add req_id queue_callback callbacks)) ([], M.empty) transactions in @@ -221,19 +241,20 @@ module Handlers = struct let state = Worker.state self in match request with | Inject {payload; callback} -> - let instrumented_callback reason = + let queue_callback reason = (match reason with | `Accepted hash -> - Pending_transactions.add state.pending hash callback - | _ -> ()) ; - callback reason + Pending_transactions.add state.pending hash (fun reason -> + callback (reason :> all_variant)) + | `Refused -> ()) ; + callback (reason :> all_variant) in - Queue.add {payload; callback = instrumented_callback} state.queue ; + Queue.add {payload; queue_callback} state.queue ; return_unit | Confirm {txn_hash} -> ( match Pending_transactions.pop state.pending txn_hash with - | Some {callback; _} -> - Lwt.async (fun () -> callback `Confirmed) ; + | Some {pending_callback; _} -> + Lwt.async (fun () -> pending_callback `Confirmed) ; return_unit | None -> return_unit) | Tick -> @@ -270,7 +291,8 @@ module Handlers = struct let txns = Pending_transactions.drop state.pending in List.iter - (fun {callback; _} -> Lwt.async (fun () -> callback `Dropped)) + (fun {pending_callback; _} -> + Lwt.async (fun () -> pending_callback `Dropped)) txns type launch_error = tztrace