From 9e9383ba4d670d7058af87df7a3df7deffb52a27 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Wed, 26 Mar 2025 11:23:16 +0100 Subject: [PATCH 1/2] evm/node: add module to wrap tx_container implementation --- etherlink/bin_node/lib_dev/observer.ml | 45 ++++-- etherlink/bin_node/lib_dev/proxy.ml | 80 +++++++---- etherlink/bin_node/lib_dev/rpc.ml | 56 ++++---- etherlink/bin_node/lib_dev/rpc_server.ml | 6 +- etherlink/bin_node/lib_dev/rpc_server.mli | 2 + etherlink/bin_node/lib_dev/sequencer.ml | 39 ++++-- etherlink/bin_node/lib_dev/services.ml | 128 ++++++++---------- .../bin_node/lib_dev/services_backend_sig.ml | 30 ++++ etherlink/bin_node/lib_dev/tx_pool.ml | 21 +++ etherlink/bin_node/lib_dev/tx_pool.mli | 4 + etherlink/bin_node/lib_dev/tx_queue.ml | 24 ++++ etherlink/bin_node/lib_dev/tx_queue.mli | 4 + 12 files changed, 283 insertions(+), 156 deletions(-) diff --git a/etherlink/bin_node/lib_dev/observer.ml b/etherlink/bin_node/lib_dev/observer.ml index 8bf2cac76e34..f5f18e659404 100644 --- a/etherlink/bin_node/lib_dev/observer.ml +++ b/etherlink/bin_node/lib_dev/observer.ml @@ -153,10 +153,18 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync Evm_ro_context.ro_backend ro_ctxt config ~evm_node_endpoint in - let* () = + let* tx_container, ping_tx_pool = match config.experimental_features.enable_tx_queue with | Some tx_queue_config -> - Tx_queue.start ~config:tx_queue_config ~keep_alive:config.keep_alive () + let* () = + Tx_queue.start + ~config:tx_queue_config + ~keep_alive:config.keep_alive + () + in + return + ( (module Tx_queue.Tx_container : Services_backend_sig.Tx_container), + false ) | None -> let mode = if config.finalized_view then @@ -171,19 +179,25 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync } else Tx_pool.Relay in - Tx_pool.start - { - backend = observer_backend; - smart_rollup_address = - Tezos_crypto.Hashed.Smart_rollup_address.to_b58check - smart_rollup_address; - mode; - tx_timeout_limit = config.tx_pool_timeout_limit; - tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; - tx_pool_tx_per_addr_limit = - Int64.to_int config.tx_pool_tx_per_addr_limit; - } + let* () = + Tx_pool.start + { + backend = observer_backend; + smart_rollup_address = + Tezos_crypto.Hashed.Smart_rollup_address.to_b58check + smart_rollup_address; + mode; + tx_timeout_limit = config.tx_pool_timeout_limit; + tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; + tx_pool_tx_per_addr_limit = + Int64.to_int config.tx_pool_tx_per_addr_limit; + } + in + return + ( (module Tx_pool.Tx_container : Services_backend_sig.Tx_container), + true ) in + Metrics.init ~mode:"observer" ~tx_pool_size_info:Tx_pool.size_info @@ -230,12 +244,14 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync else None) Stateless config + tx_container (observer_backend, smart_rollup_address) in let* finalizer_private_server = Rpc_server.start_private_server ~rpc_server_family:(Rpc_types.Single_chain_node_rpc_server chain_family) config + tx_container (observer_backend, smart_rollup_address) in @@ -301,7 +317,6 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync let*! () = task in return_unit else - let ping_tx_pool = not @@ Configuration.is_tx_queue_enabled config in let* () = Blueprints_follower.start ~ping_tx_pool diff --git a/etherlink/bin_node/lib_dev/proxy.ml b/etherlink/bin_node/lib_dev/proxy.ml index ebe0ca84611e..ba9d22cf472c 100644 --- a/etherlink/bin_node/lib_dev/proxy.ml +++ b/etherlink/bin_node/lib_dev/proxy.ml @@ -17,6 +17,30 @@ let install_finalizer server_finalizer = let* () = Tx_pool.shutdown () in Evm_context.shutdown () +let container_forward_tx ~evm_node_endpoint ~keep_alive : + (module Services_backend_sig.Tx_container) = + (module struct + let nonce ~next_nonce _address = Lwt_result.return next_nonce + + let add ~next_nonce:_ _tx_object ~raw_tx = + match evm_node_endpoint with + | Some evm_node_endpoint -> + Injector.send_raw_transaction + ~keep_alive + ~base:evm_node_endpoint + ~raw_tx:(Ethereum_types.hex_to_bytes raw_tx) + | None -> + Lwt.return_ok + @@ Error + "the node is in read-only mode, it doesn't accept transactions" + + let find _hash = Lwt_result.return None + + let content () = + Lwt_result.return + Ethereum_types.{pending = AddressMap.empty; queued = AddressMap.empty} + end) + let main ({ keep_alive; @@ -58,36 +82,37 @@ let main let ignore_block_param = config.proxy.ignore_block_param end) in - let pool_mode, validation_mode = + let validation_mode = match config.proxy.evm_node_endpoint with - | None -> (Tx_pool.Proxy, Validate.Full) - | Some evm_node_endpoint -> - ( Tx_pool.Forward - { - injector = - (fun _ raw_tx -> - Injector.send_raw_transaction - ~keep_alive:config.keep_alive - ~base:evm_node_endpoint - ~raw_tx); - }, - Validate.Stateless ) + | Some _base -> Validate.Stateless + | None -> Validate.Full in - let* () = - if not config.experimental_features.enable_send_raw_transaction then - return_unit - else - Tx_pool.start - { - backend = (module Rollup_node_rpc); - smart_rollup_address; - mode = pool_mode; - tx_timeout_limit = config.tx_pool_timeout_limit; - tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; - tx_pool_tx_per_addr_limit = - Int64.to_int config.tx_pool_tx_per_addr_limit; - } + let* tx_container = + match + ( config.experimental_features.enable_send_raw_transaction, + config.proxy.evm_node_endpoint ) + with + | true, None -> + let* () = + Tx_pool.start + { + backend = (module Rollup_node_rpc); + smart_rollup_address; + mode = Proxy; + tx_timeout_limit = config.tx_pool_timeout_limit; + tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; + tx_pool_tx_per_addr_limit = + Int64.to_int config.tx_pool_tx_per_addr_limit; + } + in + return (module Tx_pool.Tx_container : Services_backend_sig.Tx_container) + | enable_send_raw_transaction, evm_node_endpoint -> + let evm_node_endpoint = + if enable_send_raw_transaction then evm_node_endpoint else None + in + return @@ container_forward_tx ~evm_node_endpoint ~keep_alive in + let () = Rollup_node_follower.start ~keep_alive:config.keep_alive @@ -141,6 +166,7 @@ let main else None) validation_mode config + tx_container ((module Rollup_node_rpc), smart_rollup_address) in let (_ : Lwt_exit.clean_up_callback_id) = diff --git a/etherlink/bin_node/lib_dev/rpc.ml b/etherlink/bin_node/lib_dev/rpc.ml index 09758a06193b..d288bda81085 100644 --- a/etherlink/bin_node/lib_dev/rpc.ml +++ b/etherlink/bin_node/lib_dev/rpc.ml @@ -76,32 +76,35 @@ let main ~data_dir ~evm_node_endpoint ?evm_node_private_endpoint Block_storage_setup.enable ~keep_alive:config.keep_alive ctxt.store ; let rpc_backend = Evm_ro_context.ro_backend ctxt config ~evm_node_endpoint in - let* () = - Tx_pool.start - { - backend = rpc_backend; - smart_rollup_address = - Tezos_crypto.Hashed.Smart_rollup_address.to_b58check - ctxt.smart_rollup_address; - mode = - (match evm_node_private_endpoint with - | Some base -> - Forward - { - injector = - (fun tx_object raw_tx -> - Injector.inject_transaction - ~keep_alive:config.keep_alive - ~base - ~tx_object - ~raw_tx); - } - | None -> Relay); - tx_timeout_limit = config.tx_pool_timeout_limit; - tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; - tx_pool_tx_per_addr_limit = - Int64.to_int config.tx_pool_tx_per_addr_limit; - } + let* tx_container = + let* () = + Tx_pool.start + { + backend = rpc_backend; + smart_rollup_address = + Tezos_crypto.Hashed.Smart_rollup_address.to_b58check + ctxt.smart_rollup_address; + mode = + (match evm_node_private_endpoint with + | Some base -> + Forward + { + injector = + (fun tx_object raw_tx -> + Injector.inject_transaction + ~keep_alive:config.keep_alive + ~base + ~tx_object + ~raw_tx); + } + | None -> Relay); + tx_timeout_limit = config.tx_pool_timeout_limit; + tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; + tx_pool_tx_per_addr_limit = + Int64.to_int config.tx_pool_tx_per_addr_limit; + } + in + return @@ (module Tx_pool.Tx_container : Services_backend_sig.Tx_container) in let* () = set_metrics_level ctxt in @@ -162,6 +165,7 @@ let main ~data_dir ~evm_node_endpoint ?evm_node_private_endpoint else None) Stateless rpc_config + tx_container (rpc_backend, ctxt.smart_rollup_address) in diff --git a/etherlink/bin_node/lib_dev/rpc_server.ml b/etherlink/bin_node/lib_dev/rpc_server.ml index dafb9db1a221..f72db3be7781 100644 --- a/etherlink/bin_node/lib_dev/rpc_server.ml +++ b/etherlink/bin_node/lib_dev/rpc_server.ml @@ -126,7 +126,7 @@ let monitor_performances ~data_dir = let start_public_server ~(rpc_server_family : Rpc_types.rpc_server_family) ?delegate_health_check_to ?evm_services ?tezlink_services ?data_dir - validation (config : Configuration.t) ctxt = + validation (config : Configuration.t) tx_container ctxt = let open Lwt_result_syntax in let*! can_start_performance_metrics = Octez_performance_metrics.supports_performance_metrics () @@ -168,6 +168,7 @@ let start_public_server ~(rpc_server_family : Rpc_types.rpc_server_family) rpc validation config + tx_container ctxt |> register_evm_services |> Evm_directory.register_metrics "/metrics" @@ -184,7 +185,7 @@ let start_public_server ~(rpc_server_family : Rpc_types.rpc_server_family) return finalizer let start_private_server ~(rpc_server_family : Rpc_types.rpc_server_family) - ?(block_production = `Disabled) config ctxt = + ?(block_production = `Disabled) config tx_container ctxt = let open Lwt_result_syntax in match config.Configuration.private_rpc with | Some private_rpc -> @@ -194,6 +195,7 @@ let start_private_server ~(rpc_server_family : Rpc_types.rpc_server_family) private_rpc ~block_production config + tx_container ctxt |> Evm_directory.register_metrics "/metrics" |> Evm_directory.register_describe diff --git a/etherlink/bin_node/lib_dev/rpc_server.mli b/etherlink/bin_node/lib_dev/rpc_server.mli index 28c42e294121..0068aadea958 100644 --- a/etherlink/bin_node/lib_dev/rpc_server.mli +++ b/etherlink/bin_node/lib_dev/rpc_server.mli @@ -32,6 +32,7 @@ val start_private_server : rpc_server_family:Rpc_types.rpc_server_family -> ?block_production:block_production -> Configuration.t -> + (module Services_backend_sig.Tx_container) -> (module Services_backend_sig.S) * 'a -> finalizer tzresult Lwt.t @@ -51,5 +52,6 @@ val start_public_server : ?data_dir:string -> Validate.validation_mode -> Configuration.t -> + (module Services_backend_sig.Tx_container) -> (module Services_backend_sig.S) * 'a -> finalizer tzresult Lwt.t diff --git a/etherlink/bin_node/lib_dev/sequencer.ml b/etherlink/bin_node/lib_dev/sequencer.ml index 8d8897e122c2..2b348cefbdda 100644 --- a/etherlink/bin_node/lib_dev/sequencer.ml +++ b/etherlink/bin_node/lib_dev/sequencer.ml @@ -197,24 +197,31 @@ let main ~data_dir ?(genesis_timestamp = Misc.now ()) ~cctxt in let backend = Evm_ro_context.ro_backend ro_ctxt configuration in - let* () = + let* tx_container = match configuration.experimental_features.enable_tx_queue with | Some tx_queue_config -> - Tx_queue.start - ~config:tx_queue_config - ~keep_alive:configuration.keep_alive - () + let* () = + Tx_queue.start + ~config:tx_queue_config + ~keep_alive:configuration.keep_alive + () + in + return + (module Tx_queue.Tx_container : Services_backend_sig.Tx_container) | None -> - Tx_pool.start - { - backend; - smart_rollup_address = smart_rollup_address_b58; - mode = Sequencer; - tx_timeout_limit = configuration.tx_pool_timeout_limit; - tx_pool_addr_limit = Int64.to_int configuration.tx_pool_addr_limit; - tx_pool_tx_per_addr_limit = - Int64.to_int configuration.tx_pool_tx_per_addr_limit; - } + let* () = + Tx_pool.start + { + backend; + smart_rollup_address = smart_rollup_address_b58; + mode = Sequencer; + tx_timeout_limit = configuration.tx_pool_timeout_limit; + tx_pool_addr_limit = Int64.to_int configuration.tx_pool_addr_limit; + tx_pool_tx_per_addr_limit = + Int64.to_int configuration.tx_pool_tx_per_addr_limit; + } + in + return (module Tx_pool.Tx_container : Services_backend_sig.Tx_container) in Metrics.init ~mode:"sequencer" @@ -292,6 +299,7 @@ let main ~data_dir ?(genesis_timestamp = Misc.now ()) ~cctxt (if Configuration.is_tx_queue_enabled configuration then Stateless else Full) configuration + tx_container (backend, smart_rollup_address_typed) in let* finalizer_private_server = @@ -301,6 +309,7 @@ let main ~data_dir ?(genesis_timestamp = Misc.now ()) ~cctxt else Rpc_types.Single_chain_node_rpc_server EVM) ~block_production:`Single_node configuration + tx_container (backend, smart_rollup_address_typed) in let finalizer_rpc_process = diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index b3d2dcdd7024..03d555ea0049 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -478,6 +478,7 @@ let process_trace_result trace = let dispatch_request (rpc_server_family : Rpc_types.rpc_server_family) (rpc : Configuration.rpc) (validation : Validate.validation_mode) (config : Configuration.t) + (module Tx_container : Services_backend_sig.Tx_container) ((module Backend_rpc : Services_backend_sig.S), _) ({method_; parameters; id} : JSONRPC.request) : JSONRPC.response Lwt.t = let open Lwt_result_syntax in @@ -588,15 +589,9 @@ let dispatch_request (rpc_server_family : Rpc_types.rpc_server_family) let f (address, block_param) = match block_param with | Ethereum_types.Block_parameter.(Block_parameter Pending) -> - let* nonce = - if Configuration.is_tx_queue_enabled config then - let* next_nonce = Backend_rpc.nonce address block_param in - let next_nonce = - Option.value ~default:Qty.zero next_nonce - in - Tx_queue.nonce ~next_nonce address - else Tx_pool.nonce address - in + let* next_nonce = Backend_rpc.nonce address block_param in + let next_nonce = Option.value ~default:Qty.zero next_nonce in + let* nonce = Tx_container.nonce ~next_nonce address in rpc_ok nonce | _ -> let* nonce = Backend_rpc.nonce address block_param in @@ -641,20 +636,10 @@ let dispatch_request (rpc_server_family : Rpc_types.rpc_server_family) build_with_input ~f module_ parameters | Get_transaction_by_hash.Method -> let f tx_hash = - let* transaction_object = - if Configuration.is_tx_queue_enabled config then - Tx_queue.find tx_hash - else Tx_pool.find tx_hash - in + let* transaction_object = Tx_container.find tx_hash in let* transaction_object = match transaction_object with - | Some transaction_object -> - (* TODO: https://gitlab.com/tezos/tezos/-/issues/7747 - We should instrument the TX pool to return the real - transaction objects. *) - return_some - (Transaction_object.from_store_transaction_object - transaction_object) + | Some transaction_object -> return_some transaction_object | None -> Backend_rpc.Block_storage.transaction_object tx_hash in rpc_ok transaction_object @@ -718,8 +703,8 @@ let dispatch_request (rpc_server_family : Rpc_types.rpc_server_family) (fun seq -> seq.Configuration.max_number_of_chunks) config.sequencer in - let f tx_raw = - let txn = Ethereum_types.hex_to_bytes tx_raw in + let f raw_tx = + let txn = Ethereum_types.hex_to_bytes raw_tx in let* is_valid = Validate.is_tx_valid ?max_number_of_chunks @@ -730,19 +715,12 @@ let dispatch_request (rpc_server_family : Rpc_types.rpc_server_family) match is_valid with | Error err -> let*! () = - Tx_pool_events.invalid_transaction ~transaction:tx_raw + Tx_pool_events.invalid_transaction ~transaction:raw_tx in rpc_error (Rpc_errors.transaction_rejected err None) | Ok (next_nonce, transaction_object) -> ( let* tx_hash = - if Configuration.is_tx_queue_enabled config then - let* res = - Tx_queue.inject ~next_nonce transaction_object tx_raw - in - match res with - | Ok () -> return (Ok transaction_object.hash) - | Error errs -> return (Error errs) - else Tx_pool.add transaction_object txn + Tx_container.add ~next_nonce transaction_object ~raw_tx in match tx_hash with | Ok tx_hash -> rpc_ok tx_hash @@ -795,11 +773,7 @@ let dispatch_request (rpc_server_family : Rpc_types.rpc_server_family) build_with_input ~f module_ parameters | Txpool_content.Method -> let f (_ : unit option) = - let* txpool_content = - if Configuration.is_tx_queue_enabled config then - Tx_queue.content () - else Tx_pool.get_tx_pool_content () - in + let* txpool_content = Tx_container.content () in rpc_ok txpool_content in build ~f module_ parameters @@ -912,6 +886,7 @@ let dispatch_request (rpc_server_family : Rpc_types.rpc_server_family) let dispatch_private_request (rpc_server_family : Rpc_types.rpc_server_family) (rpc : Configuration.rpc) (config : Configuration.t) + (module Tx_container : Services_backend_sig.Tx_container) ((module Backend_rpc : Services_backend_sig.S), _) ~block_production ({method_; parameters; id} : JSONRPC.request) : JSONRPC.response Lwt.t = let open Lwt_syntax in @@ -1001,22 +976,17 @@ let dispatch_private_request (rpc_server_family : Rpc_types.rpc_server_family) raw_txn | _ -> get_nonce () in + let transaction = Ethereum_types.hex_encode_string raw_txn in match is_valid with | Error err -> - let transaction = Ethereum_types.hex_encode_string raw_txn in let*! () = Tx_pool_events.invalid_transaction ~transaction in rpc_error (Rpc_errors.transaction_rejected err None) | Ok (next_nonce, transaction_object) -> ( let* tx_hash = - if Configuration.is_tx_queue_enabled config then - let transaction = Ethereum_types.hex_encode_string raw_txn in - let* res = - Tx_queue.inject ~next_nonce transaction_object transaction - in - match res with - | Ok () -> return (Ok transaction_object.hash) - | Error errs -> return (Error errs) - else Tx_pool.add transaction_object raw_txn + Tx_container.add + ~next_nonce + transaction_object + ~raw_tx:transaction in match tx_hash with | Ok tx_hash -> rpc_ok tx_hash @@ -1060,17 +1030,17 @@ let can_process_batch size = function | Configuration.Limit l -> size <= l | Unlimited -> true -let dispatch_handler (rpc : Configuration.rpc) config ctx dispatch_request - (input : JSONRPC.request batched_request) = +let dispatch_handler (rpc : Configuration.rpc) config tx_container ctx + dispatch_request (input : JSONRPC.request batched_request) = let open Lwt_syntax in match input with | Singleton request -> - let* response = dispatch_request config ctx request in + let* response = dispatch_request config tx_container ctx request in return (Singleton response) | Batch requests -> let process = if can_process_batch (List.length requests) rpc.batch_limit then - dispatch_request config ctx + dispatch_request config tx_container ctx else fun req -> let value = Error Rpc_errors.(invalid_request "too many requests in batch") @@ -1098,7 +1068,8 @@ let empty_stream = let empty_sid = Ethereum_types.(Subscription.Id (Hex "")) let dispatch_websocket (rpc_server_family : Rpc_types.rpc_server_family) - (rpc : Configuration.rpc) validation config ctx (input : JSONRPC.request) = + (rpc : Configuration.rpc) validation config tx_container ctx + (input : JSONRPC.request) = let open Lwt_syntax in match map_method_name @@ -1145,12 +1116,19 @@ let dispatch_websocket (rpc_server_family : Rpc_types.rpc_server_family) websocket_response_of_response JSONRPC.{value; id = input.id} | _ -> let+ response = - dispatch_request rpc_server_family rpc validation config ctx input + dispatch_request + rpc_server_family + rpc + validation + config + tx_container + ctx + input in websocket_response_of_response response let dispatch_private_websocket (rpc_server_family : Rpc_types.rpc_server_family) - ~block_production (rpc : Configuration.rpc) config ctx + ~block_production (rpc : Configuration.rpc) config tx_container ctx (input : JSONRPC.request) = let open Lwt_syntax in let+ response = @@ -1159,38 +1137,42 @@ let dispatch_private_websocket (rpc_server_family : Rpc_types.rpc_server_family) ~block_production rpc config + tx_container ctx input in websocket_response_of_response response -let generic_dispatch (rpc : Configuration.rpc) config ctx dir path +let generic_dispatch (rpc : Configuration.rpc) config tx_container ctx dir path dispatch_request = Evm_directory.register0 dir (dispatch_batch_service ~path) (fun () input -> - dispatch_handler rpc config ctx dispatch_request input |> Lwt_result.ok) + dispatch_handler rpc config tx_container ctx dispatch_request input + |> Lwt_result.ok) let dispatch_public (rpc_server_family : Rpc_types.rpc_server_family) - (rpc : Configuration.rpc) validation config ctx dir = + (rpc : Configuration.rpc) validation config tx_container ctx dir = generic_dispatch rpc config + tx_container ctx dir Path.root (dispatch_request rpc_server_family rpc validation) let dispatch_private (rpc_server_family : Rpc_types.rpc_server_family) - (rpc : Configuration.rpc) ~block_production config ctx dir = + (rpc : Configuration.rpc) ~block_production config tx_container ctx dir = generic_dispatch rpc config + tx_container ctx dir Path.(add_suffix root "private") (dispatch_private_request rpc_server_family rpc ~block_production) -let generic_websocket_dispatch (config : Configuration.t) ctx dir path - dispatch_websocket = +let generic_websocket_dispatch (config : Configuration.t) tx_container ctx dir + path dispatch_websocket = if config.experimental_features.enable_websocket then Evm_directory.jsonrpc_websocket_register ?monitor:config.experimental_features.monitor_websocket_heartbeat @@ -1198,30 +1180,31 @@ let generic_websocket_dispatch (config : Configuration.t) ctx dir path config.experimental_features.max_websocket_message_length dir path - (dispatch_websocket config ctx) + (dispatch_websocket config tx_container ctx) else dir let dispatch_websocket_public (rpc_server_family : Rpc_types.rpc_server_family) - (rpc : Configuration.rpc) validation config ctx dir = + (rpc : Configuration.rpc) validation config tx_container ctx dir = generic_websocket_dispatch config + tx_container ctx dir "/ws" (dispatch_websocket rpc_server_family rpc validation) let dispatch_websocket_private (rpc_server_family : Rpc_types.rpc_server_family) - (rpc : Configuration.rpc) ~block_production config ctx dir = + (rpc : Configuration.rpc) ~block_production config tx_container ctx dir = generic_websocket_dispatch config + tx_container ctx dir "/private/ws" (dispatch_private_websocket rpc_server_family ~block_production rpc) let directory ~rpc_server_family ?delegate_health_check_to rpc validation config - ((module Rollup_node_rpc : Services_backend_sig.S), smart_rollup_address) - dir = + tx_container backend dir = dir |> version |> configuration config |> health_check ?delegate_to:delegate_health_check_to |> dispatch_public @@ -1229,16 +1212,17 @@ let directory ~rpc_server_family ?delegate_health_check_to rpc validation config rpc validation config - ((module Rollup_node_rpc : Services_backend_sig.S), smart_rollup_address) + tx_container + backend |> dispatch_websocket_public rpc_server_family rpc validation config - ((module Rollup_node_rpc : Services_backend_sig.S), smart_rollup_address) + tx_container + backend -let private_directory ~rpc_server_family rpc config - ((module Rollup_node_rpc : Services_backend_sig.S), smart_rollup_address) +let private_directory ~rpc_server_family rpc config tx_container backend ~block_production = Evm_directory.empty config.experimental_features.rpc_server |> version @@ -1246,11 +1230,13 @@ let private_directory ~rpc_server_family rpc config rpc_server_family rpc config - ((module Rollup_node_rpc : Services_backend_sig.S), smart_rollup_address) + tx_container + backend ~block_production |> dispatch_websocket_private rpc_server_family rpc config - ((module Rollup_node_rpc : Services_backend_sig.S), smart_rollup_address) + tx_container + backend ~block_production diff --git a/etherlink/bin_node/lib_dev/services_backend_sig.ml b/etherlink/bin_node/lib_dev/services_backend_sig.ml index 143e3c4fead6..af4ccaab4c27 100644 --- a/etherlink/bin_node/lib_dev/services_backend_sig.ml +++ b/etherlink/bin_node/lib_dev/services_backend_sig.ml @@ -5,6 +5,36 @@ (* *) (*****************************************************************************) +(** [Tx_container] is the signature of the module that deals with + storing and forwarding transactions. the module type is used by + {!Services.dispatch_request} to request informations about pending + transactions. *) +module type Tx_container = sig + (** [nonce ~next_nonce address] must returns the next gap nonce + available. *) + val nonce : + next_nonce:Ethereum_types.quantity -> + Ethereum_types.address -> + Ethereum_types.quantity tzresult Lwt.t + + (** [add ~next_nonce tx_object raw_tx] returns the next gap nonce + available based on the pending transaction of the tx_container. + [next_nonce] is the next expected nonce found in the backend. *) + val add : + next_nonce:Ethereum_types.quantity -> + Ethereum_types.legacy_transaction_object -> + raw_tx:Ethereum_types.hex -> + (Ethereum_types.hash, string) result tzresult Lwt.t + + (** [find hash] returns the transaction_object found in tx + container. *) + val find : Ethereum_types.hash -> Transaction_object.t option tzresult Lwt.t + + (** [content ()] returns all the transactions found in tx + container. *) + val content : unit -> Ethereum_types.txpool tzresult Lwt.t +end + module type S = sig module Reader : Durable_storage.READER diff --git a/etherlink/bin_node/lib_dev/tx_pool.ml b/etherlink/bin_node/lib_dev/tx_pool.ml index 2abc41493674..a1b9a5e80955 100644 --- a/etherlink/bin_node/lib_dev/tx_pool.ml +++ b/etherlink/bin_node/lib_dev/tx_pool.ml @@ -957,3 +957,24 @@ let mode () = let*? worker = Lazy.force worker in let state = Worker.state worker in return state.mode + +module Tx_container = struct + let nonce ~next_nonce:_ address = nonce address + + let add ~next_nonce:_ tx_object ~raw_tx = + let raw_tx_str = Ethereum_types.hex_to_bytes raw_tx in + add tx_object raw_tx_str + + let find hash = + let open Lwt_result_syntax in + let* legacy_tx_object = find hash in + (* TODO: https://gitlab.com/tezos/tezos/-/issues/7747 + We should instrument the TX pool to return the real + transaction objects. *) + return + (Option.map + Transaction_object.from_store_transaction_object + legacy_tx_object) + + let content = get_tx_pool_content +end diff --git a/etherlink/bin_node/lib_dev/tx_pool.mli b/etherlink/bin_node/lib_dev/tx_pool.mli index 67a717eaad73..6033c8c856fc 100644 --- a/etherlink/bin_node/lib_dev/tx_pool.mli +++ b/etherlink/bin_node/lib_dev/tx_pool.mli @@ -95,3 +95,7 @@ val clear_popped_transactions : unit -> unit tzresult Lwt.t (** [mode] retrieves the current pool mode *) val mode : unit -> mode tzresult Lwt.t + +(** wrapper of the Tx_pool to be compatible with the Tx_container + signature for the services. *) +module Tx_container : Services_backend_sig.Tx_container diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index e546d5b3183e..7b87f33e8aa0 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -1094,3 +1094,27 @@ module Internal_for_tests = struct module Nonce_bitset = Nonce_bitset module Address_nonce = Address_nonce end + +module Tx_container = struct + let nonce = nonce + + let add ~next_nonce tx_object ~raw_tx = + let open Lwt_result_syntax in + let* res = inject ~next_nonce tx_object raw_tx in + match res with + | Ok () -> return (Ok tx_object.hash) + | Error errs -> return (Error errs) + + let find hash = + let open Lwt_result_syntax in + let* legacy_tx_object = find hash in + (* TODO: https://gitlab.com/tezos/tezos/-/issues/7747 + We should instrument the TX queue to return the real + transaction objects. *) + return + (Option.map + Transaction_object.from_store_transaction_object + legacy_tx_object) + + let content = content +end diff --git a/etherlink/bin_node/lib_dev/tx_queue.mli b/etherlink/bin_node/lib_dev/tx_queue.mli index b7e243108054..91476b465dbb 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.mli +++ b/etherlink/bin_node/lib_dev/tx_queue.mli @@ -136,6 +136,10 @@ val confirm_transactions : confirmed_txs:Ethereum_types.hash Seq.t -> unit tzresult Lwt.t +(** wrapper of the Tx_queue to be compatible with the Tx_container + signature for the services. *) +module Tx_container : Services_backend_sig.Tx_container + (**/*) module Internal_for_tests : sig -- GitLab From aa147959fcaba2bb3991d95855b6082f0845ffc3 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Wed, 26 Mar 2025 11:29:38 +0100 Subject: [PATCH 2/2] evm/node: by pass tx_container when it's unused --- etherlink/bin_node/lib_dev/injector.ml | 28 ++++++ etherlink/bin_node/lib_dev/injector.mli | 13 +++ etherlink/bin_node/lib_dev/observer.ml | 68 +++++++------ etherlink/bin_node/lib_dev/rpc.ml | 122 ++++++++++++++++++------ etherlink/bin_node/lib_dev/tx_pool.ml | 24 +---- etherlink/bin_node/lib_dev/tx_pool.mli | 8 -- 6 files changed, 179 insertions(+), 84 deletions(-) diff --git a/etherlink/bin_node/lib_dev/injector.ml b/etherlink/bin_node/lib_dev/injector.ml index 2f6f8eed86fa..2cb5aaf56c38 100644 --- a/etherlink/bin_node/lib_dev/injector.ml +++ b/etherlink/bin_node/lib_dev/injector.ml @@ -67,3 +67,31 @@ let inject_transaction ~keep_alive ~base ~tx_object ~raw_tx = ~path:Resto.Path.(root / "private") (inject_transaction_request tx_object raw_tx) Inject_transaction.output_encoding + +let get_transaction_count_request address block_param = + construct_rpc_call + ~method_:Get_transaction_count.method_ + ~input_encoding:Get_transaction_count.input_encoding + (address, block_param) + +let get_transaction_count ~keep_alive ~base address block_param = + call_rpc_service + ~keep_alive + ~base + ~path:Resto.Path.root + (get_transaction_count_request address block_param) + Get_transaction_count.output_encoding + +let get_transaction_by_hash_request tx_hash = + construct_rpc_call + ~method_:Get_transaction_by_hash.method_ + ~input_encoding:Get_transaction_by_hash.input_encoding + tx_hash + +let get_transaction_by_hash ~keep_alive ~base tx_hash = + call_rpc_service + ~keep_alive + ~base + ~path:Resto.Path.root + (get_transaction_by_hash_request tx_hash) + Get_transaction_by_hash.output_encoding diff --git a/etherlink/bin_node/lib_dev/injector.mli b/etherlink/bin_node/lib_dev/injector.mli index cfdd3de66959..7ef3544330a2 100644 --- a/etherlink/bin_node/lib_dev/injector.mli +++ b/etherlink/bin_node/lib_dev/injector.mli @@ -21,3 +21,16 @@ val inject_transaction : tx_object:Ethereum_types.legacy_transaction_object -> raw_tx:string -> (Ethereum_types.hash, string) result tzresult Lwt.t + +val get_transaction_count : + keep_alive:bool -> + base:Uri.t -> + Ethereum_types.address -> + Ethereum_types.Block_parameter.extended -> + (Ethereum_types.quantity, string) result tzresult Lwt.t + +val get_transaction_by_hash : + keep_alive:bool -> + base:Uri.t -> + Ethereum_types.hash -> + (Transaction_object.t option, string) result tzresult Lwt.t diff --git a/etherlink/bin_node/lib_dev/observer.ml b/etherlink/bin_node/lib_dev/observer.ml index f5f18e659404..a4d3af1e9733 100644 --- a/etherlink/bin_node/lib_dev/observer.ml +++ b/etherlink/bin_node/lib_dev/observer.ml @@ -95,6 +95,23 @@ let install_finalizer_observer ~rollup_node_tracking finalizer_public_server let* () = Evm_context.shutdown () in when_ rollup_node_tracking @@ fun () -> Evm_events_follower.shutdown () +let container_forward_tx ~keep_alive ~evm_node_endpoint : + (module Services_backend_sig.Tx_container) = + (module struct + let nonce ~next_nonce _address = Lwt_result.return next_nonce + + let add ~next_nonce:_ _tx_object ~raw_tx = + Injector.send_raw_transaction + ~keep_alive + ~base:evm_node_endpoint + ~raw_tx:(Ethereum_types.hex_to_bytes raw_tx) + + let find _hash = Lwt_result.return None + + let content () = + Lwt_result.return {pending = AddressMap.empty; queued = AddressMap.empty} + end) + let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync ~init_from_snapshot () = let open Lwt_result_syntax in @@ -166,36 +183,31 @@ let main ?network ?kernel_path ~data_dir ~(config : Configuration.t) ~no_sync ( (module Tx_queue.Tx_container : Services_backend_sig.Tx_container), false ) | None -> - let mode = - if config.finalized_view then - Tx_pool.Forward + if config.finalized_view then + let tx_container = + container_forward_tx + ~keep_alive:config.keep_alive + ~evm_node_endpoint + in + return (tx_container, false) + else + let* () = + Tx_pool.start { - injector = - (fun _ raw_tx -> - Injector.send_raw_transaction - ~keep_alive:config.keep_alive - ~base:evm_node_endpoint - ~raw_tx); + backend = observer_backend; + smart_rollup_address = + Tezos_crypto.Hashed.Smart_rollup_address.to_b58check + smart_rollup_address; + mode = Relay; + tx_timeout_limit = config.tx_pool_timeout_limit; + tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; + tx_pool_tx_per_addr_limit = + Int64.to_int config.tx_pool_tx_per_addr_limit; } - else Tx_pool.Relay - in - let* () = - Tx_pool.start - { - backend = observer_backend; - smart_rollup_address = - Tezos_crypto.Hashed.Smart_rollup_address.to_b58check - smart_rollup_address; - mode; - tx_timeout_limit = config.tx_pool_timeout_limit; - tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; - tx_pool_tx_per_addr_limit = - Int64.to_int config.tx_pool_tx_per_addr_limit; - } - in - return - ( (module Tx_pool.Tx_container : Services_backend_sig.Tx_container), - true ) + in + return + ( (module Tx_pool.Tx_container : Services_backend_sig.Tx_container), + true ) in Metrics.init diff --git a/etherlink/bin_node/lib_dev/rpc.ml b/etherlink/bin_node/lib_dev/rpc.ml index d288bda81085..9afface54b18 100644 --- a/etherlink/bin_node/lib_dev/rpc.ml +++ b/etherlink/bin_node/lib_dev/rpc.ml @@ -57,6 +57,68 @@ let set_metrics_confirmed_levels (ctxt : Evm_ro_context.t) = Metrics.set_l1_level ~level:l1_level | None -> () +let container_forward_request ~public_endpoint ~private_endpoint ~keep_alive : + (module Services_backend_sig.Tx_container) = + (module struct + let rpc_error = + Internal_event.Simple.declare_2 + ~section:(Events.section @ ["local_node_rpc"]) + ~name:"local_node_rpc" + ~msg:"local node failed answering {rpc} with {message}" + ~level:Error + ("rpc", Data_encoding.string) + ("message", Data_encoding.string) + + let get_or_emit_error ~rpc_name res = + let open Lwt_result_syntax in + match res with + | Ok res -> return_some res + | Error msg -> + let*! () = Internal_event.Simple.emit rpc_error (rpc_name, msg) in + return_none + + let nonce ~next_nonce address = + let open Lwt_result_syntax in + let* res = + Injector.get_transaction_count + ~keep_alive + ~base:public_endpoint + address + (* The function [nonce] is only ever called when + requesting the nonce for the pending block. It's + safe to assume the pending block. *) + Ethereum_types.Block_parameter.(Block_parameter Pending) + in + let* nonce = get_or_emit_error ~rpc_name:"get_transaction_count" res in + match nonce with + | Some nonce -> return nonce + | None -> + (*we return the known next_nonce instead of failing *) + return next_nonce + + let add ~next_nonce:_ tx_object ~raw_tx = + Injector.inject_transaction + ~keep_alive + ~base:private_endpoint + ~tx_object + ~raw_tx:(Ethereum_types.hex_to_bytes raw_tx) + + let find hash = + let open Lwt_result_syntax in + let* res = + Injector.get_transaction_by_hash ~keep_alive ~base:public_endpoint hash + in + let* tx_object = + get_or_emit_error ~rpc_name:"get_transaction_by_hash" res + in + let tx_object = Option.join tx_object in + return tx_object + + let content () = + Lwt_result.return + Ethereum_types.{pending = AddressMap.empty; queued = AddressMap.empty} + end) + let main ~data_dir ~evm_node_endpoint ?evm_node_private_endpoint ~(config : Configuration.t) () = let open Lwt_result_syntax in @@ -76,35 +138,36 @@ let main ~data_dir ~evm_node_endpoint ?evm_node_private_endpoint Block_storage_setup.enable ~keep_alive:config.keep_alive ctxt.store ; let rpc_backend = Evm_ro_context.ro_backend ctxt config ~evm_node_endpoint in - let* tx_container = - let* () = - Tx_pool.start - { - backend = rpc_backend; - smart_rollup_address = - Tezos_crypto.Hashed.Smart_rollup_address.to_b58check - ctxt.smart_rollup_address; - mode = - (match evm_node_private_endpoint with - | Some base -> - Forward - { - injector = - (fun tx_object raw_tx -> - Injector.inject_transaction - ~keep_alive:config.keep_alive - ~base - ~tx_object - ~raw_tx); - } - | None -> Relay); - tx_timeout_limit = config.tx_pool_timeout_limit; - tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; - tx_pool_tx_per_addr_limit = - Int64.to_int config.tx_pool_tx_per_addr_limit; - } - in - return @@ (module Tx_pool.Tx_container : Services_backend_sig.Tx_container) + + let* ping_tx_pool, tx_container = + match evm_node_private_endpoint with + | Some private_endpoint -> + let forward_request = + container_forward_request + ~keep_alive:config.keep_alive + ~public_endpoint:evm_node_endpoint + ~private_endpoint + in + + return (false, forward_request) + | None -> + let* () = + Tx_pool.start + { + backend = rpc_backend; + smart_rollup_address = + Tezos_crypto.Hashed.Smart_rollup_address.to_b58check + ctxt.smart_rollup_address; + mode = Relay; + tx_timeout_limit = config.tx_pool_timeout_limit; + tx_pool_addr_limit = Int64.to_int config.tx_pool_addr_limit; + tx_pool_tx_per_addr_limit = + Int64.to_int config.tx_pool_tx_per_addr_limit; + } + in + return + ( true, + (module Tx_pool.Tx_container : Services_backend_sig.Tx_container) ) in let* () = set_metrics_level ctxt in @@ -184,6 +247,7 @@ let main ~data_dir ~evm_node_endpoint ?evm_node_private_endpoint ~time_between_blocks ~evm_node_endpoint ~next_blueprint_number + ~ping_tx_pool @@ fun (Qty number) blueprint -> let* () = when_ (Option.is_some blueprint.kernel_upgrade) @@ fun () -> diff --git a/etherlink/bin_node/lib_dev/tx_pool.ml b/etherlink/bin_node/lib_dev/tx_pool.ml index a1b9a5e80955..b0d5914032ff 100644 --- a/etherlink/bin_node/lib_dev/tx_pool.ml +++ b/etherlink/bin_node/lib_dev/tx_pool.ml @@ -222,16 +222,7 @@ module Pool = struct transactions end -type mode = - | Proxy - | Sequencer - | Relay - | Forward of { - injector : - Ethereum_types.legacy_transaction_object -> - string -> - (Ethereum_types.hash, string) result tzresult Lwt.t; - } +type mode = Proxy | Sequencer | Relay type parameters = { backend : (module Services_backend_sig.S); @@ -703,7 +694,7 @@ module Handlers = struct Worker.Queue.push_request w Request.Pop_and_inject_transactions in return_unit - | Sequencer | Proxy | Forward _ -> return_unit + | Sequencer | Proxy -> return_unit let on_request : type r request_error. @@ -716,12 +707,7 @@ module Handlers = struct | Request.Add_transaction (transaction_object, txn) -> protect @@ fun () -> Tx_watcher.notify transaction_object.hash ; - let* res = - match state.mode with - | Forward {injector} -> injector transaction_object txn - | Proxy | Sequencer | Relay -> - insert_valid_transaction state txn transaction_object - in + let* res = insert_valid_transaction state txn transaction_object in let* () = relay_self_inject_request w in return res | Request.Pop_transactions maximum_cumulative_size -> @@ -857,7 +843,7 @@ let pop_and_inject_transactions () = let*? worker = Lazy.force worker in let state = Worker.state worker in match state.mode with - | Sequencer | Forward _ -> + | Sequencer -> (* the sequencer injects blueprint in a rollup node, not transaction. *) return_unit @@ -872,7 +858,7 @@ let pop_and_inject_transactions_lazy () = bind_worker @@ fun w -> let state = Worker.state w in match state.mode with - | Sequencer | Forward _ -> + | Sequencer -> (* the sequencer injects blueprint in a rollup node, not transaction. *) return_unit diff --git a/etherlink/bin_node/lib_dev/tx_pool.mli b/etherlink/bin_node/lib_dev/tx_pool.mli index 6033c8c856fc..76f6fac292a0 100644 --- a/etherlink/bin_node/lib_dev/tx_pool.mli +++ b/etherlink/bin_node/lib_dev/tx_pool.mli @@ -11,14 +11,6 @@ type mode = | Sequencer | Relay (** Relays the transactions when they are valid w.r.t. the local state. *) - | Forward of { - injector : - Ethereum_types.legacy_transaction_object -> - string -> - (Ethereum_types.hash, string) result tzresult Lwt.t; - } - (** Forwards the transactions without checking the - transaction validity. *) type parameters = { backend : (module Services_backend_sig.S); (** The backend RPC module. *) -- GitLab