diff --git a/etherlink/bin_node/lib_dev/observer.ml b/etherlink/bin_node/lib_dev/observer.ml index b3b8f3c952452458331ffa465b7fb3bb965b4607..dffe7debe29c38593f8479f75518fe0e0f69bda9 100644 --- a/etherlink/bin_node/lib_dev/observer.ml +++ b/etherlink/bin_node/lib_dev/observer.ml @@ -99,7 +99,8 @@ let on_new_blueprint config evm_node_endpoint next_blueprint_number return (`Restart_from next_blueprint_number) let install_finalizer_observer ~rollup_node_tracking finalizer_public_server - finalizer_private_server finalizer_rpc_process = + finalizer_private_server finalizer_rpc_process + (module Tx_container : Services_backend_sig.Tx_container) = let open Lwt_syntax in Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun exit_status -> let* () = Events.shutdown_node ~exit_status in @@ -108,8 +109,7 @@ let install_finalizer_observer ~rollup_node_tracking finalizer_public_server let* () = Option.iter_s (fun f -> f ()) finalizer_rpc_process in Misc.unwrap_error_monad @@ fun () -> let open Lwt_result_syntax in - let* () = Tx_pool.shutdown () in - let* () = Tx_queue.shutdown () in + let* () = Tx_container.shutdown () in let* () = Evm_context.shutdown () in when_ rollup_node_tracking @@ fun () -> Evm_events_follower.shutdown () @@ -128,6 +128,13 @@ let container_forward_tx ~keep_alive ~evm_node_endpoint : let content () = Lwt_result.return {pending = AddressMap.empty; queued = AddressMap.empty} + + let shutdown () = Lwt_result_syntax.return_unit + + let tx_queue_tick ~evm_node_endpoint:_ = Lwt_result_syntax.return_unit + + let tx_queue_beacon ~evm_node_endpoint:_ ~tick_interval:_ = + Lwt_result_syntax.return_unit end) let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync @@ -330,6 +337,7 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync finalizer_public_server finalizer_private_server finalizer_rpc_process + tx_container in let*! next_blueprint_number = Evm_context.next_blueprint_number () in @@ -349,10 +357,9 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync and* () = Drift_monitor.run ~evm_node_endpoint Evm_context.next_blueprint_number and* () = - if Configuration.is_tx_queue_enabled config then - Tx_queue.beacon - ~evm_node_endpoint:(Rpc evm_node_endpoint) - ~tick_interval:0.05 - else return_unit + let (module Tx_container) = tx_container in + Tx_container.tx_queue_beacon + ~evm_node_endpoint:(Rpc evm_node_endpoint) + ~tick_interval:0.05 in return_unit diff --git a/etherlink/bin_node/lib_dev/proxy.ml b/etherlink/bin_node/lib_dev/proxy.ml index b8767934f31997baab21c943ca1e5ed9c55c8f0a..3e320216173f26f20e928582935b5a3bd0675441 100644 --- a/etherlink/bin_node/lib_dev/proxy.ml +++ b/etherlink/bin_node/lib_dev/proxy.ml @@ -7,14 +7,15 @@ (* *) (*****************************************************************************) -let install_finalizer server_finalizer = +let install_finalizer server_finalizer + (module Tx_container : Services_backend_sig.Tx_container) = let open Lwt_syntax in Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun exit_status -> let* () = Events.shutdown_node ~exit_status in let* () = server_finalizer () in Misc.unwrap_error_monad @@ fun () -> let open Lwt_result_syntax in - let* () = Tx_pool.shutdown () in + let* () = Tx_container.shutdown () in Evm_context.shutdown () let container_forward_tx ~evm_node_endpoint ~keep_alive : @@ -39,6 +40,13 @@ let container_forward_tx ~evm_node_endpoint ~keep_alive : let content () = Lwt_result.return Ethereum_types.{pending = AddressMap.empty; queued = AddressMap.empty} + + let shutdown () = Lwt_result_syntax.return_unit + + let tx_queue_tick ~evm_node_endpoint:_ = Lwt_result_syntax.return_unit + + let tx_queue_beacon ~evm_node_endpoint:_ ~tick_interval:_ = + Lwt_result_syntax.return_unit end) let tx_queue_pop_and_inject (module Rollup_node_rpc : Services_backend_sig.S) @@ -222,7 +230,7 @@ let main ((module Rollup_node_rpc), smart_rollup_address) in let (_ : Lwt_exit.clean_up_callback_id) = - install_finalizer server_finalizer + install_finalizer server_finalizer tx_container in let wait, _resolve = Lwt.wait () in let* () = wait in diff --git a/etherlink/bin_node/lib_dev/rpc.ml b/etherlink/bin_node/lib_dev/rpc.ml index 19abe2a8d68a54589bc79a608fb3381cd1c250f1..2e0edffaebc98031fbab8b0a6962fa337fcaa7bf 100644 --- a/etherlink/bin_node/lib_dev/rpc.ml +++ b/etherlink/bin_node/lib_dev/rpc.ml @@ -34,12 +34,13 @@ let spawn_main ~exposed_port ~protected_endpoint ?private_endpoint ~data_dir () let finalizer () = Lwt.return process#terminate in finalizer -let install_finalizer_rpc server_public_finalizer = +let install_finalizer_rpc server_public_finalizer + (module Tx_container : Services_backend_sig.Tx_container) = let open Lwt_syntax in Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun exit_status -> let* () = Events.shutdown_node ~exit_status in let* () = server_public_finalizer () in - Misc.unwrap_error_monad @@ fun () -> Tx_pool.shutdown () + Misc.unwrap_error_monad @@ fun () -> Tx_container.shutdown () let set_metrics_level (ctxt : Evm_ro_context.t) = let open Lwt_result_syntax in @@ -132,6 +133,13 @@ let container_forward_request ~public_endpoint ~private_endpoint ~keep_alive : let content () = Lwt_result.return Ethereum_types.{pending = AddressMap.empty; queued = AddressMap.empty} + + let shutdown () = Lwt_result_syntax.return_unit + + let tx_queue_tick ~evm_node_endpoint:_ = Lwt_result_syntax.return_unit + + let tx_queue_beacon ~evm_node_endpoint:_ ~tick_interval:_ = + Lwt_result_syntax.return_unit end) let main ~data_dir ~evm_node_endpoint ?evm_node_private_endpoint @@ -255,7 +263,7 @@ let main ~data_dir ~evm_node_endpoint ?evm_node_private_endpoint in let (_ : Lwt_exit.clean_up_callback_id) = - install_finalizer_rpc server_public_finalizer + install_finalizer_rpc server_public_finalizer tx_container in let* () = @@ -265,15 +273,10 @@ let main ~data_dir ~evm_node_endpoint ?evm_node_private_endpoint let* next_blueprint_number = Evm_ro_context.next_blueprint_number ctxt in let* () = - if - Configuration.is_tx_queue_enabled config - && Option.is_none evm_node_private_endpoint - (* Only start the beacon when the tx_queue is started. *) - then - Tx_queue.beacon - ~evm_node_endpoint:(Rpc evm_node_endpoint) - ~tick_interval:0.05 - else return_unit + let (module Tx_container) = tx_container in + Tx_container.tx_queue_beacon + ~evm_node_endpoint:(Rpc evm_node_endpoint) + ~tick_interval:0.05 and* () = Blueprints_follower.start ~ping_tx_pool diff --git a/etherlink/bin_node/lib_dev/sequencer.ml b/etherlink/bin_node/lib_dev/sequencer.ml index bc905704665ba5b92b34310059663eccd936c655..1a2549b949a4ca6f371a8a517444f4d4eb5f4a23 100644 --- a/etherlink/bin_node/lib_dev/sequencer.ml +++ b/etherlink/bin_node/lib_dev/sequencer.ml @@ -18,7 +18,8 @@ type sandbox_config = { } let install_finalizer_seq server_public_finalizer server_private_finalizer - finalizer_rpc_process = + finalizer_rpc_process + (module Tx_container : Services_backend_sig.Tx_container) = let open Lwt_syntax in Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun exit_status -> let* () = Events.shutdown_node ~exit_status in @@ -27,7 +28,7 @@ let install_finalizer_seq server_public_finalizer server_private_finalizer let* () = Option.iter_s (fun f -> f ()) finalizer_rpc_process in Misc.unwrap_error_monad @@ fun () -> let open Lwt_result_syntax in - let* () = Tx_pool.shutdown () in + let* () = Tx_container.shutdown () in let* () = Evm_events_follower.shutdown () in let* () = Blueprints_publisher.shutdown () in let* () = Signals_publisher.shutdown () in @@ -416,6 +417,7 @@ let main ~data_dir ?(genesis_timestamp = Misc.now ()) ~cctxt finalizer_public_server finalizer_private_server finalizer_rpc_process + tx_container in let* () = loop_sequencer diff --git a/etherlink/bin_node/lib_dev/services_backend_sig.ml b/etherlink/bin_node/lib_dev/services_backend_sig.ml index 76e9a56c936c2a77e20b026c6bd9d7aa1a2f37e5..f0536d9530a8395da1280e608f20e24fda4cc379 100644 --- a/etherlink/bin_node/lib_dev/services_backend_sig.ml +++ b/etherlink/bin_node/lib_dev/services_backend_sig.ml @@ -4,37 +4,6 @@ (* Copyright (c) 2023 Nomadic Labs *) (* *) (*****************************************************************************) - -(** [Tx_container] is the signature of the module that deals with - storing and forwarding transactions. the module type is used by - {!Services.dispatch_request} to request informations about pending - transactions. *) -module type Tx_container = sig - (** [nonce ~next_nonce address] must returns the next gap nonce - available. *) - val nonce : - next_nonce:Ethereum_types.quantity -> - Ethereum_types.address -> - Ethereum_types.quantity tzresult Lwt.t - - (** [add ~next_nonce tx_object raw_tx] returns the next gap nonce - available based on the pending transaction of the tx_container. - [next_nonce] is the next expected nonce found in the backend. *) - val add : - next_nonce:Ethereum_types.quantity -> - Ethereum_types.legacy_transaction_object -> - raw_tx:Ethereum_types.hex -> - (Ethereum_types.hash, string) result tzresult Lwt.t - - (** [find hash] returns the transaction_object found in tx - container. *) - val find : Ethereum_types.hash -> Transaction_object.t option tzresult Lwt.t - - (** [content ()] returns all the transactions found in tx - container. *) - val content : unit -> Ethereum_types.txpool tzresult Lwt.t -end - module type S = sig module Reader : Durable_storage.READER @@ -221,3 +190,49 @@ module Make (Backend : Backend) (Executor : Evm_execution.S) : S = struct let l2_levels_of_l1_level = Backend.l2_levels_of_l1_level end + +(** Inject transactions with either RPCs or on a websocket connection. *) +type endpoint = Rpc of Uri.t | Websocket of Websocket_client.t + +(** [Tx_container] is the signature of the module that deals with + storing and forwarding transactions. the module type is used by + {!Services.dispatch_request} to request informations about pending + transactions. *) +module type Tx_container = sig + (** [nonce ~next_nonce address] must returns the next gap nonce + available. *) + val nonce : + next_nonce:Ethereum_types.quantity -> + Ethereum_types.address -> + Ethereum_types.quantity tzresult Lwt.t + + (** [add ~next_nonce tx_object raw_tx] returns the next gap nonce + available based on the pending transaction of the tx_container. + [next_nonce] is the next expected nonce found in the backend. *) + val add : + next_nonce:Ethereum_types.quantity -> + Ethereum_types.legacy_transaction_object -> + raw_tx:Ethereum_types.hex -> + (Ethereum_types.hash, string) result tzresult Lwt.t + + (** [find hash] returns the transaction_object found in tx + container. *) + val find : Ethereum_types.hash -> Transaction_object.t option tzresult Lwt.t + + (** [content ()] returns all the transactions found in tx + container. *) + val content : unit -> Ethereum_types.txpool tzresult Lwt.t + + (** [shutdown ()] stops the tx container, waiting for the ongoing request + to be processed. *) + val shutdown : unit -> unit tzresult Lwt.t + + (** Trigger a tick in the [Tx_queue]. *) + val tx_queue_tick : evm_node_endpoint:endpoint -> unit tzresult Lwt.t + + (** [tx_queue_beacon ~evm_node_endpoint ~tick_interval] is a never fulfilled + promise which triggers a tick in the [Tx_queue] every + [tick_interval] seconds. *) + val tx_queue_beacon : + evm_node_endpoint:endpoint -> tick_interval:float -> unit tzresult Lwt.t +end diff --git a/etherlink/bin_node/lib_dev/tx_pool.ml b/etherlink/bin_node/lib_dev/tx_pool.ml index 7a1c86a4a9f34ed8a015a0b4af8431e12b515055..5476e880bbf1744623ab71e973ca9a906d98a6be 100644 --- a/etherlink/bin_node/lib_dev/tx_pool.ml +++ b/etherlink/bin_node/lib_dev/tx_pool.ml @@ -810,13 +810,6 @@ let start parameters = let+ worker = Worker.launch table () parameters (module Handlers) in Lwt.wakeup worker_waker worker -let shutdown () = - let open Lwt_result_syntax in - bind_worker @@ fun w -> - let*! () = Tx_pool_events.shutdown () in - let*! () = Worker.shutdown w in - return_unit - let add transaction_object raw_tx = let open Lwt_result_syntax in let*? w = Lazy.force worker in @@ -970,4 +963,16 @@ module Tx_container = struct legacy_tx_object) let content = get_tx_pool_content + + let shutdown () = + let open Lwt_result_syntax in + bind_worker @@ fun w -> + let*! () = Tx_pool_events.shutdown () in + let*! () = Worker.shutdown w in + return_unit + + let tx_queue_tick ~evm_node_endpoint:_ = Lwt_result_syntax.return_unit + + let tx_queue_beacon ~evm_node_endpoint:_ ~tick_interval:_ = + Lwt_result_syntax.return_unit end diff --git a/etherlink/bin_node/lib_dev/tx_pool.mli b/etherlink/bin_node/lib_dev/tx_pool.mli index f5c47df8c24b7a25a7429f632457c40221abaf94..c5e6fd48fb3044b71db502a2d78f986ce4847e5b 100644 --- a/etherlink/bin_node/lib_dev/tx_pool.mli +++ b/etherlink/bin_node/lib_dev/tx_pool.mli @@ -25,28 +25,9 @@ type parameters = { (** [start parameters] starts the tx-pool *) val start : parameters -> unit tzresult Lwt.t -(** [shutdown ()] stops the tx-pool, waiting for the ongoing request - to be processed. *) -val shutdown : unit -> unit tzresult Lwt.t - -(** [add transaction_object raw_tx] adds a eth transaction and its raw contents - to the tx-pool. - - The consistency between [transaction_object] and [raw_tx] is assumed by - [add]. It is the responsibility of the caller to enforce it. *) -val add : - Ethereum_types.legacy_transaction_object -> - string -> - (Ethereum_types.hash, string) result tzresult Lwt.t - -(** [nonce address] returns the nonce of the user - Returns the first gap in the tx-pool, or the nonce stored on the rollup - if no transactions are in the pool. *) -val nonce : Ethereum_types.Address.t -> Ethereum_types.quantity tzresult Lwt.t - -(** [pop_transactions chain_family maximum_cumulative_size] pops as much - valid transactions as possible from the pool, until their cumulative - size exceeds `maximum_cumulative_size`. If the pool is locked or node +(** [pop_transactions chain_family maximum_cumulative_size] pops as much + valid transactions as possible from the pool, until their cumulative + size exceeds `maximum_cumulative_size`. If the pool is locked or node in tezlink mode, returns no transactions. *) val pop_transactions : chain_family:L2_types.chain_family -> @@ -76,15 +57,6 @@ val is_locked : unit -> bool tzresult Lwt.t val size_info : unit -> Metrics.Tx_pool.size_info tzresult Lwt.t -val get_tx_pool_content : unit -> Ethereum_types.txpool tzresult Lwt.t - -(** [find tx_hash] look into the tx pool if a transaction with hash - [tx_hash] exists and returns it's corresponding - {!Ethereum_types.transaction_object}. *) -val find : - Ethereum_types.hash -> - Ethereum_types.legacy_transaction_object option tzresult Lwt.t - val clear_popped_transactions : unit -> unit tzresult Lwt.t (** [mode] retrieves the current pool mode *) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 2dc4eb60f233f22d408c22ca0ecec29b16310679..5f56d2d1785690bf285dc2578c2c98306dbb022d 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -46,7 +46,9 @@ type request = { } (** Inject transactions with either RPCs or on a websocket connection. *) -type endpoint = Rpc of Uri.t | Websocket of Websocket_client.t +type endpoint = Services_backend_sig.endpoint = + | Rpc of Uri.t + | Websocket of Websocket_client.t (** [Nonce_bitset] registers known nonces from transactions that went through the tx_queue from a specific sender address. With this @@ -376,7 +378,6 @@ end module Request = struct type ('a, 'b) t = | Inject : request -> ((unit, string) result, tztrace) t - | Confirm : {txn_hash : Ethereum_types.hash} -> (unit, tztrace) t | Find : { txn_hash : Ethereum_types.hash; } @@ -432,15 +433,6 @@ module Request = struct (function | View (Inject {payload; _}) -> Some ((), payload) | _ -> None) (fun _ -> assert false); - case - Json_only - ~title:"Confirm" - (obj2 - (req "request" (constant "confirm")) - (req "transaction_hash" Ethereum_types.hash_encoding)) - (function - | View (Confirm {txn_hash}) -> Some ((), txn_hash) | _ -> None) - (fun _ -> assert false); case Json_only ~title:"Tick" @@ -530,8 +522,6 @@ module Request = struct let open Format in match r with | Inject {payload = Hex txn; _} -> fprintf fmt "Inject %s" txn - | Confirm {txn_hash = Hash (Hex txn_hash)} -> - fprintf fmt "Confirm %s" txn_hash | Find {txn_hash = Hash (Hex txn_hash)} -> fprintf fmt "Find %s" txn_hash | Tick _ -> fprintf fmt "Tick" | Clear -> fprintf fmt "Clear" @@ -819,13 +809,6 @@ module Handlers = struct else return (Error "Transaction limit was reached. Transaction is rejected.") - | Confirm {txn_hash} -> ( - protect @@ fun () -> - match Pending_transactions.pop state.pending txn_hash with - | Some {pending_callback; _} -> - let*! () = pending_callback `Confirmed in - return_unit - | None -> return_unit) | Find {txn_hash} -> protect @@ fun () -> return @@ Transaction_objects.find state.tx_object txn_hash @@ -1043,30 +1026,6 @@ let push_request worker request = let*! (pushed : bool) = Worker.Queue.push_request worker request in if not pushed then tzfail Tx_queue_is_closed else return_unit -let tick ~evm_node_endpoint = - bind_worker @@ fun w -> push_request w (Tick {evm_node_endpoint}) - -let beacon ~evm_node_endpoint ~tick_interval = - let open Lwt_result_syntax in - let rec loop () = - let* () = tick ~evm_node_endpoint in - let*! () = Lwt_unix.sleep tick_interval in - loop () - in - loop () - -let inject ?(callback = fun _ -> Lwt_syntax.return_unit) ~next_nonce - (tx_object : Ethereum_types.legacy_transaction_object) txn = - let open Lwt_syntax in - let* worker = worker_promise in - Worker.Queue.push_request_and_wait - worker - (Inject {next_nonce; payload = txn; tx_object; callback}) - |> handle_request_error - -let confirm txn_hash = - bind_worker @@ fun w -> push_request w (Confirm {txn_hash}) - let start ~config ~keep_alive () = let open Lwt_result_syntax in let* worker = Worker.launch table () {config; keep_alive} (module Handlers) in @@ -1074,34 +1033,11 @@ let start ~config ~keep_alive () = let*! () = Tx_queue_events.is_ready () in return_unit -let find txn_hash = - let open Lwt_result_syntax in - let*? w = Lazy.force worker in - Worker.Queue.push_request_and_wait w (Find {txn_hash}) |> handle_request_error - let clear () = let open Lwt_result_syntax in let*? w = Lazy.force worker in Worker.Queue.push_request_and_wait w Clear |> handle_request_error -let nonce ~next_nonce address = - let open Lwt_result_syntax in - let*? w = Lazy.force worker in - Worker.Queue.push_request_and_wait w (Nonce {next_nonce; address}) - |> handle_request_error - -let content () = - let open Lwt_result_syntax in - let*? w = Lazy.force worker in - Worker.Queue.push_request_and_wait w Content |> handle_request_error - -let shutdown () = - let open Lwt_result_syntax in - bind_worker @@ fun w -> - let*! () = Tx_queue_events.shutdown () in - let*! () = Worker.shutdown w in - return_unit - let lock_transactions () = bind_worker @@ fun w -> push_request w Lock_transactions @@ -1135,7 +1071,20 @@ module Internal_for_tests = struct end module Tx_container = struct - let nonce = nonce + let nonce ~next_nonce address = + let open Lwt_result_syntax in + let*? w = Lazy.force worker in + Worker.Queue.push_request_and_wait w (Nonce {next_nonce; address}) + |> handle_request_error + + let inject ?(callback = fun _ -> Lwt_syntax.return_unit) ~next_nonce + (tx_object : Ethereum_types.legacy_transaction_object) txn = + let open Lwt_syntax in + let* worker = worker_promise in + Worker.Queue.push_request_and_wait + worker + (Inject {next_nonce; payload = txn; tx_object; callback}) + |> handle_request_error let add ~next_nonce tx_object ~raw_tx = let open Lwt_result_syntax in @@ -1144,9 +1093,13 @@ module Tx_container = struct | Ok () -> return (Ok tx_object.hash) | Error errs -> return (Error errs) - let find hash = + let find txn_hash = let open Lwt_result_syntax in - let* legacy_tx_object = find hash in + let* legacy_tx_object = + let*? w = Lazy.force worker in + Worker.Queue.push_request_and_wait w (Find {txn_hash}) + |> handle_request_error + in (* TODO: https://gitlab.com/tezos/tezos/-/issues/7747 We should instrument the TX queue to return the real transaction objects. *) @@ -1155,5 +1108,27 @@ module Tx_container = struct Transaction_object.from_store_transaction_object legacy_tx_object) - let content = content + let content () = + let open Lwt_result_syntax in + let*? w = Lazy.force worker in + Worker.Queue.push_request_and_wait w Content |> handle_request_error + + let shutdown () = + let open Lwt_result_syntax in + bind_worker @@ fun w -> + let*! () = Tx_queue_events.shutdown () in + let*! () = Worker.shutdown w in + return_unit + + let tx_queue_tick ~evm_node_endpoint = + bind_worker @@ fun w -> push_request w (Tick {evm_node_endpoint}) + + let tx_queue_beacon ~evm_node_endpoint ~tick_interval = + let open Lwt_result_syntax in + let rec loop () = + let* () = tx_queue_tick ~evm_node_endpoint in + let*! () = Lwt_unix.sleep tick_interval in + loop () + in + loop () end diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index c3cd244f4e68b791fad97889e587cc0f56619251..b94997c7446b043c2f9141a57d5b9660d88dbdd3 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -24,9 +24,6 @@ [callback] is called with [`Dropped].}} *) type callback = [`Accepted | `Confirmed | `Dropped | `Refused] -> unit Lwt.t -(** Inject transactions with either RPCs or on a websocket connection. *) -type endpoint = Rpc of Uri.t | Websocket of Websocket_client.t - (** [start ~config ~max_transaction_batch_length ()] starts the worker, meaning it is possible to call {!inject}, {!confirm} and {!beacon}. *) @@ -36,67 +33,9 @@ val start : unit -> unit tzresult Lwt.t -(** [shutdown ()] stops the tx queue, waiting for the ongoing request - to be processed. *) -val shutdown : unit -> unit tzresult Lwt.t - (** [clear ()] removes the tx queue data but keeps the allocated space *) val clear : unit -> unit tzresult Lwt.t -(** [inject ?callback ~next_nonce tx_object raw_txn] pushes the - transaction [raw_txn] to the worker queue. - - The [tx_object] is stored until the transaction is confirmed or - dropped, so the transaction can be retrieved with [find]. - - [next_nonce] is the next nonce expected by the kernel for the - address [tx_object.from]. [next_nonce] must always be increasing - for any given [tx_object.from], else if the tx_queue already - contains a transaction for [tx_object.from] it will raise an error - for that request. The increasing order of [next_nonce] is enforced - by the kernel execution where transaction must be executed in - order. - - Any transaction added in the tx_queue via {!inject} must be a - valid transaction. In particular the nonce of the transaction must - be valid, i.e. it must be greater or equal to [next_nonce]. This - is validated by {!Validate.is_tx_valid}. - - {b Note:} The promise will be sleeping until at least {!start} is called. *) -val inject : - ?callback:callback -> - next_nonce:Ethereum_types.quantity -> - Ethereum_types.legacy_transaction_object -> - Ethereum_types.hex -> - (unit, string) result tzresult Lwt.t - -(** [confirm hash] is to be called by an external component to advertise a - transaction has been included in a blueprint. *) -val confirm : Ethereum_types.hash -> unit tzresult Lwt.t - -(** Trigger a tick in the [Tx_queue]. *) -val tick : evm_node_endpoint:endpoint -> unit tzresult Lwt.t - -(** [beacon ~evm_node_endpoint ~tick_interval] is a never fulfilled - promise which triggers a tick in the [Tx_queue] every - [tick_interval] seconds. *) -val beacon : - evm_node_endpoint:endpoint -> tick_interval:float -> unit tzresult Lwt.t - -(** [find hash] returns the transaction associated with that hash if - it's found in the tx_queue. *) -val find : - Ethereum_types.hash -> - Ethereum_types.legacy_transaction_object option tzresult Lwt.t - -(** [nonce ~next_nonce address] returns the first gap in the tx queue - for [address], or [next_nonce] if no transaction for [address] are - found. *) -val nonce : - next_nonce:Ethereum_types.quantity -> - Ethereum_types.address -> - Ethereum_types.quantity tzresult Lwt.t - (** [lock_transactions] locks the transactions in the queue, new transactions can be added but nothing can be retrieved with {!pop_transactions}. *) @@ -109,12 +48,6 @@ val unlock_transactions : unit -> unit tzresult Lwt.t (** [is_locked] checks if the queue is locked. *) val is_locked : unit -> bool tzresult Lwt.t -(** [content ()] returns the queued and pending transactions of the - tx_queue mapped into a tx_pool to mimic - {!Tx_pool.get_tx_pool_content}. Semantics of pending and queued - are not equal to {!Tx_pool.get_tx_pool_content} *) -val content : unit -> Ethereum_types.txpool tzresult Lwt.t - (** [pop_transactions ~validate_tx ~initial_validation_state] pops as many transactions as possible from the queue, validating them with [validate_tx]. If [validate_tx] returns [`Keep validation_state] diff --git a/etherlink/fa-bridge-watchtower/etherlink_monitor.ml b/etherlink/fa-bridge-watchtower/etherlink_monitor.ml index 6aa747f61213dc25b5d7594fcb798464750254bc..d9cca8fe8812a6cf263fd6e0ab3bbe2c1b3bd0e5 100644 --- a/etherlink/fa-bridge-watchtower/etherlink_monitor.ml +++ b/etherlink/fa-bridge-watchtower/etherlink_monitor.ml @@ -59,12 +59,19 @@ module Tx_queue = struct (* as found in etherlink/bin_floodgate/tx_queue.ml *) let transfer ctx ?to_ ?(value = Z.zero) ~nonce ~data () = + let open Lwt_result_syntax in let txn = Craft.transfer ctx ~nonce ?to_ ~value ~data () in let tx_raw = Ethereum_types.hex_to_bytes txn in let hash = Ethereum_types.hash_raw_tx tx_raw in let**? tx = Transaction.decode tx_raw in let**? tx_object = Transaction.to_transaction_object ~hash tx in - inject tx_object txn ~next_nonce:(Ethereum_types.Qty nonce) + let+ res = + Tx_container.add + tx_object + ~raw_tx:txn + ~next_nonce:(Ethereum_types.Qty nonce) + in + match res with Ok _hash -> Ok () | Error _ as res -> res end module Contract = Tezos_raw_protocol_alpha.Alpha_context.Contract @@ -716,7 +723,11 @@ let handle_confirmed_txs {db; ws_client; _} exec.blockNumber ) in let* () = Db.Deposits.set_claimed db nonce exec in - let* () = Tx_queue.confirm tx_hash in + let* () = + Tx_queue.confirm_transactions + ~clear_pending_queue_after:false + ~confirmed_txs:(Seq.cons tx_hash Seq.empty) + in return_unit | Some {status; _} -> let*! () = @@ -851,7 +862,7 @@ end let start db ~config ~notify_ws_change ~first_block = let open Lwt_result_syntax in let evm_node_endpoint = config.Config.evm_node_endpoint in - let tx_queue_endpoint = ref (Tx_queue.Rpc evm_node_endpoint) in + let tx_queue_endpoint = ref (Services_backend_sig.Rpc evm_node_endpoint) in let run () = let*! ws_client = Websocket_client.connect @@ -859,7 +870,7 @@ let start db ~config ~notify_ws_change ~first_block = Media_type.json (Uri.with_path evm_node_endpoint (Uri.path evm_node_endpoint ^ "/ws")) in - tx_queue_endpoint := Tx_queue.Websocket ws_client ; + tx_queue_endpoint := Services_backend_sig.Websocket ws_client ; notify_ws_change ws_client ; let* () = init_db_pointers db ws_client ~first_block in let* chain_id = get_chain_id ws_client in @@ -893,7 +904,8 @@ let start db ~config ~notify_ws_change ~first_block = let rec tx_queue_beacon () = let open Lwt_syntax in let* res = - protect @@ fun () -> Tx_queue.tick ~evm_node_endpoint:!tx_queue_endpoint + protect @@ fun () -> + Tx_queue.Tx_container.tx_queue_tick ~evm_node_endpoint:!tx_queue_endpoint in let* () = match res with