diff --git a/etherlink/bin_node/lib_dev/injector.ml b/etherlink/bin_node/lib_dev/injector.ml index 0190765874672421593f462db8c4069afac0d6b0..9148b6d0a10c29ab4ed406b063f5f046a66fada2 100644 --- a/etherlink/bin_node/lib_dev/injector.ml +++ b/etherlink/bin_node/lib_dev/injector.ml @@ -41,6 +41,18 @@ let call_rpc_service ~keep_alive ~timeout ~base ~path request output_encoding = "Upstream endpoint returned an inconsistent response (more than one \ result)") +let call_singleton_request (type input output) ~keep_alive ~timeout ~base + (module Method_ : Rpc_encodings.METHOD + with type input = input + and type output = output) request = + call_rpc_service + ~keep_alive + ~timeout + ~base + ~path:Resto.Path.root + request + Method_.output_encoding + let send_raw_transaction_request raw_tx = construct_rpc_call ~method_:Send_raw_transaction.method_ diff --git a/etherlink/bin_node/lib_dev/injector.mli b/etherlink/bin_node/lib_dev/injector.mli index a30427e1e047e08c21f5aa09e57ec90a39703a2e..fc3412d520ee8fe7ab5fb08adcbc04a6d87c9b8f 100644 --- a/etherlink/bin_node/lib_dev/injector.mli +++ b/etherlink/bin_node/lib_dev/injector.mli @@ -5,6 +5,16 @@ (* *) (*****************************************************************************) +val call_singleton_request : + keep_alive:bool -> + timeout:float -> + base:Uri.t -> + (module Rpc_encodings.METHOD + with type input = 'input + and type output = 'output) -> + Rpc_encodings.JSONRPC.request -> + ('output, string) result tzresult Lwt.t + (** [send_raw_transaction ~keep_alive ~base raw_txn] uses the [eth_sendRawTransaction] RPC method to inject [raw_txn] to the endpoint [base]. It will retry as many time as necessary if [keep_alive] is set to diff --git a/etherlink/bin_node/lib_dev/metrics.ml b/etherlink/bin_node/lib_dev/metrics.ml index e9a43bc860ce0056a7261c53e92a6d6465f4cf0b..f0c31848bdbfc917ea0e6e8ce0a25d68cdc41c8b 100644 --- a/etherlink/bin_node/lib_dev/metrics.ml +++ b/etherlink/bin_node/lib_dev/metrics.ml @@ -362,9 +362,9 @@ let metrics = in {chain; block; simulation; health; l1_level} -let init ~mode ~tx_pool_size_info ~smart_rollup_address = +let init ~mode ?tx_pool_size_info ~smart_rollup_address () = Info.init ~mode ~smart_rollup_address ; - Tx_pool.register tx_pool_size_info + Option.iter Tx_pool.register tx_pool_size_info let set_level ~level = Gauge.set metrics.chain.head (Z.to_float level) diff --git a/etherlink/bin_node/lib_dev/metrics.mli b/etherlink/bin_node/lib_dev/metrics.mli index 2a1e3805cf0a19c6b2285b7672bf515b9f12e6ce..ffe01b9a99535d437a2797f8a768d247b6726209 100644 --- a/etherlink/bin_node/lib_dev/metrics.mli +++ b/etherlink/bin_node/lib_dev/metrics.mli @@ -26,8 +26,9 @@ end val init : mode:string -> - tx_pool_size_info:(unit -> Tx_pool.size_info tzresult Lwt.t) -> + ?tx_pool_size_info:(unit -> Tx_pool.size_info tzresult Lwt.t) -> smart_rollup_address:Tezos_crypto.Hashed.Smart_rollup_address.t -> + unit -> unit val set_level : level:Z.t -> unit diff --git a/etherlink/bin_node/lib_dev/mode.ml b/etherlink/bin_node/lib_dev/mode.ml index 53f4468326e1c01342cdb13da0050cd6f22de543..4bf9861d0f56ff1aecda45bc2dcdf9b4ceed9be3 100644 --- a/etherlink/bin_node/lib_dev/mode.ml +++ b/etherlink/bin_node/lib_dev/mode.ml @@ -6,9 +6,15 @@ (* *) (*****************************************************************************) +type local_node_info = { + evm_node_endpoint : Uri.t; + evm_node_private_endpoint : Uri.t; + websocket : Websocket_client.t option; +} + (** Available modes. *) -type t = - | Sequencer - | Observer - | Proxy - | Rpc of {evm_node_endpoint : Uri.t; websocket : Websocket_client.t option} +type 'f t = + | Sequencer : 'a Services_backend_sig.tx_container -> 'a t + | Observer : 'a Services_backend_sig.tx_container -> 'a t + | Proxy : 'a Services_backend_sig.tx_container -> 'a t + | Rpc : local_node_info -> 'a t diff --git a/etherlink/bin_node/lib_dev/observer.ml b/etherlink/bin_node/lib_dev/observer.ml index f350650ef8d9383665f0c91596cf04310b150408..b2a2a07ec40a2f47ad1a3057cbd1c94689105d21 100644 --- a/etherlink/bin_node/lib_dev/observer.ml +++ b/etherlink/bin_node/lib_dev/observer.ml @@ -259,7 +259,8 @@ let main ?network ?kernel_path ~(config : Configuration.t) ~no_sync Metrics.init ~mode:"observer" ~tx_pool_size_info:Tx_container.size_info - ~smart_rollup_address ; + ~smart_rollup_address + () ; let* () = Prevalidator.start @@ -271,21 +272,19 @@ let main ?network ?kernel_path ~(config : Configuration.t) ~no_sync let rpc_server_family = Rpc_types.Single_chain_node_rpc_server chain_family in let* finalizer_public_server = Rpc_server.start_public_server - ~mode:Observer + ~mode:(Observer tx_container) ~l2_chain_id ~evm_services: Evm_ro_context.(evm_services_methods ro_ctxt time_between_blocks) ~rpc_server_family config - tx_container ((module Rpc_backend), smart_rollup_address) in let* finalizer_private_server = Rpc_server.start_private_server - ~mode:Observer + ~mode:(Observer tx_container) ~rpc_server_family config - tx_container ((module Rpc_backend), smart_rollup_address) in diff --git a/etherlink/bin_node/lib_dev/proxy.ml b/etherlink/bin_node/lib_dev/proxy.ml index 54df6301e0d1ed92cf6bde5d335415de0b1ecc9b..a02d9271b673653779c0a414953aa85b2fe7dd13 100644 --- a/etherlink/bin_node/lib_dev/proxy.ml +++ b/etherlink/bin_node/lib_dev/proxy.ml @@ -253,11 +253,10 @@ let main let* server_finalizer = Rpc_server.start_public_server - ~mode:Proxy + ~mode:(Proxy tx_container) ~rpc_server_family:(Rpc_types.Single_chain_node_rpc_server chain_family) ~l2_chain_id config - tx_container ((module Rollup_node_rpc), smart_rollup_address) in let (_ : Lwt_exit.clean_up_callback_id) = diff --git a/etherlink/bin_node/lib_dev/rpc.ml b/etherlink/bin_node/lib_dev/rpc.ml index bcf8bae0408fe3b46aecd1075a3ee17868d35d68..ee0e8147ca1489f79fa515235dc1c1b2123fb726 100644 --- a/etherlink/bin_node/lib_dev/rpc.ml +++ b/etherlink/bin_node/lib_dev/rpc.ml @@ -36,17 +36,12 @@ let spawn_main ~exposed_port ~protected_endpoint ?private_endpoint ~data_dir () let finalizer () = Lwt.return process#terminate in finalizer -let install_finalizer_rpc ~(tx_container : _ Services_backend_sig.tx_container) - server_public_finalizer telemetry_cleanup = +let install_finalizer_rpc server_public_finalizer telemetry_cleanup = let open Lwt_syntax in - let (module Tx_container) = - Services_backend_sig.tx_container_module tx_container - in Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun exit_status -> let* () = telemetry_cleanup () in let* () = Events.shutdown_node ~exit_status in - let* () = server_public_finalizer () in - Misc.unwrap_error_monad @@ fun () -> Tx_container.shutdown () + server_public_finalizer () let set_metrics_level (ctxt : Evm_ro_context.t) = let open Lwt_result_syntax in @@ -64,216 +59,6 @@ let set_metrics_confirmed_levels (ctxt : Evm_ro_context.t) = Metrics.set_l1_level ~level:l1_level | None -> () -module Forward_container - (Tx : Tx_queue_types.L2_transaction) - (C : sig - val public_endpoint : Uri.t - - val private_endpoint : Uri.t - - val keep_alive : bool - - val timeout : float - end) - (Injector : sig - val get_transaction_count : - keep_alive:bool -> - timeout:float -> - base:Uri.t -> - Tx.address -> - Ethereum_types.Block_parameter.extended -> - (Ethereum_types.quantity, string) result tzresult Lwt.t - - val inject_transaction : - keep_alive:bool -> - timeout:float -> - base:Uri.t -> - tx_object:Tx.t -> - raw_tx:string -> - wait_confirmation:bool -> - (Ethereum_types.hash, string) result tzresult Lwt.t - - val get_transaction_by_hash : - keep_alive:bool -> - timeout:float -> - base:Uri.t -> - Ethereum_types.hash -> - (Tx.t option, string) result tzresult Lwt.t - end) : - Services_backend_sig.Tx_container - with type address = Tx.address - and type transaction_object = Tx.t = struct - type address = Tx.address - - type transaction_object = Tx.t - - let rpc_error = - Internal_event.Simple.declare_2 - ~section:Events.section - ~name:"local_node_rpc_failure" - ~msg:"local node failed answering {rpc} with {message}" - ~level:Error - ("rpc", Data_encoding.string) - ("message", Data_encoding.string) - - let forwarding_transaction = - Internal_event.Simple.declare_1 - ~section:Events.section - ~name:"forward_transaction" - ~msg:"forwarding transaction {tx_hash} to local node" - ~level:Info - ~pp1:(fun fmt Ethereum_types.(Hash (Hex h)) -> - Format.fprintf fmt "%10s" h) - ("tx_hash", Ethereum_types.hash_encoding) - - let get_or_emit_error ~rpc_name res = - let open Lwt_result_syntax in - match res with - | Ok res -> return_some res - | Error msg -> - let*! () = Internal_event.Simple.emit rpc_error (rpc_name, msg) in - return_none - - let nonce ~next_nonce address = - let open Lwt_result_syntax in - let* res = - Injector.get_transaction_count - ~keep_alive:C.keep_alive - ~timeout:C.timeout - ~base:C.public_endpoint - address - (* The function [nonce] is only ever called when - requesting the nonce for the pending block. It's - safe to assume the pending block. *) - Ethereum_types.Block_parameter.(Block_parameter Pending) - in - let* nonce = get_or_emit_error ~rpc_name:"get_transaction_count" res in - match nonce with - | Some nonce -> return nonce - | None -> - (*we return the known next_nonce instead of failing *) - return next_nonce - - let add ?(wait_confirmation = false) ~next_nonce:_ (tx_object : Tx.t) ~raw_tx - = - let open Lwt_syntax in - let* () = - Internal_event.Simple.emit - forwarding_transaction - (Tx.hash_of_tx_object tx_object) - in - Injector.inject_transaction - ~wait_confirmation - ~keep_alive:C.keep_alive - ~timeout:C.timeout - ~base:C.private_endpoint - ~tx_object - ~raw_tx:(Ethereum_types.hex_to_bytes raw_tx) - - let find hash : transaction_object option tzresult Lwt.t = - let open Lwt_result_syntax in - let* res = - Injector.get_transaction_by_hash - ~keep_alive:C.keep_alive - ~timeout:C.timeout - ~base:C.public_endpoint - hash - in - let* tx_object = - get_or_emit_error ~rpc_name:"get_transaction_by_hash" res - in - let tx_object = Option.join tx_object in - return tx_object - - let content () = - Lwt_result.return - Transaction_object. - { - pending = Ethereum_types.AddressMap.empty; - queued = Ethereum_types.AddressMap.empty; - } - - let shutdown () = Lwt_result_syntax.return_unit - - let clear () = Lwt_result_syntax.return_unit - - let tx_queue_tick ~evm_node_endpoint:_ = Lwt_result_syntax.return_unit - - let tx_queue_beacon ~evm_node_endpoint:_ ~tick_interval:_ = - Lwt_result_syntax.return_unit - - let lock_transactions () = Lwt_result_syntax.return_unit - - let unlock_transactions () = Lwt_result_syntax.return_unit - - let is_locked () = Lwt_result_syntax.return_false - - let confirm_transactions ~clear_pending_queue_after:_ ~confirmed_txs:_ = - Lwt_result_syntax.return_unit - - let size_info () = - Lwt_result.return - Metrics.Tx_pool.{number_of_addresses = 0; number_of_transactions = 0} - - let pop_transactions ~maximum_cumulative_size:_ ~validate_tx:_ - ~initial_validation_state:_ = - Lwt_result_syntax.return_nil -end - -let container_forward_request (type f) ~(chain_family : f L2_types.chain_family) - ~public_endpoint ~private_endpoint ~keep_alive ~timeout : - f Services_backend_sig.tx_container = - match chain_family with - | EVM -> - Services_backend_sig.Evm_tx_container - (module Forward_container - (Tx_queue_types.Eth_transaction_object) - (struct - let public_endpoint = public_endpoint - - let private_endpoint = private_endpoint - - let keep_alive = keep_alive - - let timeout = timeout - end) - (Injector)) - | Michelson -> - Services_backend_sig.Michelson_tx_container - (module Forward_container - (Tx_queue_types.Tezlink_operation) - (struct - let public_endpoint = public_endpoint - - let private_endpoint = private_endpoint - - let keep_alive = keep_alive - - let timeout = timeout - end) - (struct - let get_transaction_count ~keep_alive:_ ~timeout:_ ~base:_ _ - _ = - failwith - "TODO: implement get_transaction_count in the Tezlink \ - case (using counter RPC)" - - let inject_transaction ~keep_alive ~timeout ~base ~tx_object - ~raw_tx ~wait_confirmation:_ = - Injector.inject_tezlink_operation - ~keep_alive - ~timeout - ~base - ~op:tx_object - ~raw_op:(Bytes.of_string raw_tx) - - let get_transaction_by_hash ~keep_alive:_ ~timeout:_ ~base:_ - _ = - failwith - "TODO: implement get_transaction_by_hash in the \ - Tezlink case" - end)) - let main ~evm_node_endpoint ~evm_node_private_endpoint ~(config : Configuration.t) () = let open Lwt_result_syntax in @@ -313,26 +98,10 @@ let main ~evm_node_endpoint ~evm_node_private_endpoint Rpc_backend.single_chain_id_and_family ~config ~enable_multichain in - let tx_container = - container_forward_request - ~chain_family - ~keep_alive:config.keep_alive - ~timeout:config.rpc_timeout - ~public_endpoint:evm_node_endpoint - ~private_endpoint:evm_node_private_endpoint - in - let* () = set_metrics_level ctxt in let* () = set_metrics_confirmed_levels ctxt in - let (module Tx_container) = - Services_backend_sig.tx_container_module tx_container - in - - Metrics.init - ~mode:"rpc" - ~tx_pool_size_info:Tx_container.size_info - ~smart_rollup_address:ctxt.smart_rollup_address ; + Metrics.init ~mode:"rpc" ~smart_rollup_address:ctxt.smart_rollup_address () ; (* Never spawn from an RPC node *) let rpc_config = @@ -368,21 +137,19 @@ let main ~evm_node_endpoint ~evm_node_private_endpoint in let* server_public_finalizer = Rpc_server.start_public_server - ~mode:(Rpc {evm_node_endpoint; websocket = ws_client}) + ~mode: + (Rpc + {evm_node_endpoint; evm_node_private_endpoint; websocket = ws_client}) ~l2_chain_id ~evm_services: Evm_ro_context.(evm_services_methods ctxt time_between_blocks) ~rpc_server_family rpc_config - tx_container ((module Rpc_backend), ctxt.smart_rollup_address) in let (_ : Lwt_exit.clean_up_callback_id) = - install_finalizer_rpc - server_public_finalizer - telemetry_cleanup - ~tx_container + install_finalizer_rpc server_public_finalizer telemetry_cleanup in let* () = @@ -392,13 +159,6 @@ let main ~evm_node_endpoint ~evm_node_private_endpoint let* next_blueprint_number = Evm_ro_context.next_blueprint_number ctxt in let* () = - let (module Tx_container) = - Services_backend_sig.tx_container_module tx_container - in - Tx_container.tx_queue_beacon - ~evm_node_endpoint:(Rpc evm_node_endpoint) - ~tick_interval:0.05 - and* () = Blueprints_follower.start ~multichain:enable_multichain ~time_between_blocks diff --git a/etherlink/bin_node/lib_dev/rpc_server.ml b/etherlink/bin_node/lib_dev/rpc_server.ml index e491c4d365f8ab9bf34ab526cbff573a07e9b71d..511b716e7ef6eb7679533989b4f041bac82c6a60 100644 --- a/etherlink/bin_node/lib_dev/rpc_server.ml +++ b/etherlink/bin_node/lib_dev/rpc_server.ml @@ -145,10 +145,9 @@ let monitor_performances ~data_dir = (* Run in background *) ignore domain -let start_public_server (type f) ~(mode : Mode.t) +let start_public_server (type f) ~(mode : f Mode.t) ~(rpc_server_family : f Rpc_types.rpc_server_family) ~l2_chain_id - ?evm_services (config : Configuration.t) - (tx_container : f Services_backend_sig.tx_container) ctxt = + ?evm_services (config : Configuration.t) ctxt = let open Lwt_result_syntax in let can_start_performance_metrics = Octez_performance_metrics.Unix.supports_performance_metrics () @@ -176,9 +175,22 @@ let start_public_server (type f) ~(mode : Mode.t) | Some l2_chain_id -> return l2_chain_id | None -> Backend.chain_id () in - let (Services_backend_sig.Michelson_tx_container (module Tx_container)) - = - tx_container + let add_transaction ~next_nonce transaction_object ~raw_op = + match mode with + | Observer (Michelson_tx_container (module Tx_container)) + | Proxy (Michelson_tx_container (module Tx_container)) + | Sequencer (Michelson_tx_container (module Tx_container)) -> + Tx_container.add + ~next_nonce + transaction_object + ~raw_tx:(Ethereum_types.hex_of_bytes raw_op) + | Rpc {evm_node_private_endpoint; _} -> + Injector.inject_tezlink_operation + ~keep_alive:config.keep_alive + ~timeout:config.rpc_timeout + ~base:evm_node_private_endpoint + ~op:transaction_object + ~raw_op in return @@ Evm_directory.init_from_resto_directory @@ Tezlink_directory.register_tezlink_services @@ -203,10 +215,10 @@ let start_public_server (type f) ~(mode : Mode.t) err | Ok {next_nonce; transaction_object} -> let* hash_res = - Tx_container.add + add_transaction ~next_nonce transaction_object - ~raw_tx:raw_hex + ~raw_op:(Bytes.of_string raw_str) in let* hash = match hash_res with @@ -226,7 +238,7 @@ let start_public_server (type f) ~(mode : Mode.t) let directory = register_tezos_services - |> Services.directory ~rpc_server_family mode rpc config tx_container ctxt + |> Services.directory ~rpc_server_family mode rpc config ctxt |> register_evm_services |> Evm_directory.register_metrics "/metrics" |> Evm_directory.register_describe @@ -243,7 +255,7 @@ let start_public_server (type f) ~(mode : Mode.t) let start_private_server ~mode ~(rpc_server_family : _ Rpc_types.rpc_server_family) - ?(block_production = `Disabled) config tx_container ctxt = + ?(block_production = `Disabled) config ctxt = let open Lwt_result_syntax in match config.Configuration.private_rpc with | Some private_rpc -> @@ -254,7 +266,6 @@ let start_private_server ~mode private_rpc ~block_production config - tx_container ctxt |> Evm_directory.register_metrics "/metrics" |> Evm_directory.register_describe diff --git a/etherlink/bin_node/lib_dev/rpc_server.mli b/etherlink/bin_node/lib_dev/rpc_server.mli index 4c63039b35e2b6654b78ba4af6c46595bfafbb98..742232fbdbf2fe6f936b21603d47bc7c2d94a0fe 100644 --- a/etherlink/bin_node/lib_dev/rpc_server.mli +++ b/etherlink/bin_node/lib_dev/rpc_server.mli @@ -32,11 +32,10 @@ type block_production = [`Single_node | `Disabled] sequencer setup, [`Disabled] means no block production method is available. *) val start_private_server : - mode:Mode.t -> + mode:'f Mode.t -> rpc_server_family:'f Rpc_types.rpc_server_family -> ?block_production:block_production -> Configuration.t -> - 'f Services_backend_sig.tx_container -> (module Services_backend_sig.S) * 'a -> finalizer tzresult Lwt.t @@ -49,11 +48,10 @@ val start_private_server : If the host provides the necessary binaries, performance metrics are enabled. *) val start_public_server : - mode:Mode.t -> + mode:'f Mode.t -> rpc_server_family:'f Rpc_types.rpc_server_family -> l2_chain_id:L2_types.chain_id option -> ?evm_services:evm_services_methods -> Configuration.t -> - 'f Services_backend_sig.tx_container -> (module Services_backend_sig.S) * 'a -> finalizer tzresult Lwt.t diff --git a/etherlink/bin_node/lib_dev/sequencer.ml b/etherlink/bin_node/lib_dev/sequencer.ml index a23cec64049addfb9d1269b371ba813e2922fa01..bad1a83259811a2247acc8fcca942431a6343a16 100644 --- a/etherlink/bin_node/lib_dev/sequencer.ml +++ b/etherlink/bin_node/lib_dev/sequencer.ml @@ -482,7 +482,8 @@ let main ~cctxt ?(genesis_timestamp = Misc.now ()) Metrics.init ~mode:"sequencer" ~tx_pool_size_info:Tx_container.size_info - ~smart_rollup_address:smart_rollup_address_typed ; + ~smart_rollup_address:smart_rollup_address_typed + () ; let* () = Block_producer.start { @@ -520,7 +521,7 @@ let main ~cctxt ?(genesis_timestamp = Misc.now ()) let* finalizer_public_server = Rpc_server.start_public_server - ~mode:Sequencer + ~mode:(Sequencer tx_container) ~l2_chain_id ~evm_services: Evm_ro_context.( @@ -529,18 +530,16 @@ let main ~cctxt ?(genesis_timestamp = Misc.now ()) (if enable_multichain then Rpc_types.Multichain_sequencer_rpc_server else Rpc_types.Single_chain_node_rpc_server chain_family) configuration - tx_container ((module Rpc_backend), smart_rollup_address_typed) in let* finalizer_private_server = Rpc_server.start_private_server - ~mode:Sequencer + ~mode:(Sequencer tx_container) ~rpc_server_family: (if enable_multichain then Rpc_types.Multichain_sequencer_rpc_server else Rpc_types.Single_chain_node_rpc_server chain_family) ~block_production:`Single_node configuration - tx_container ((module Rpc_backend), smart_rollup_address_typed) in let*! finalizer_rpc_process = diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index b761303d68bda0300008e59c5492039ab94744c5..3530fb19f6dfa09f283a90226bfa82bbcf5db570 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -152,7 +152,7 @@ let configuration_handler config = let health_check_handler config mode db_liveness_check query = match mode with - | Mode.Sequencer | Observer | Proxy -> + | Mode.Sequencer _ | Observer _ | Proxy _ -> let open Lwt_result_syntax in let* () = fail_when (Metrics.is_bootstrapping ()) Node_is_bootstrapping and* () = @@ -187,9 +187,9 @@ let health_check_handler config mode db_liveness_check query = let evm_mode_handler config evm_mode = let open Lwt_result_syntax in match evm_mode with - | Mode.Sequencer -> return "sequencer" - | Observer -> return "observer" - | Proxy -> return "proxy" + | Mode.Sequencer _ -> return "sequencer" + | Observer _ -> return "observer" + | Proxy _ -> return "proxy" | Rpc {evm_node_endpoint; _} -> let+ evm_node_mode = Rollup_services.call_service @@ -559,7 +559,7 @@ let eth_subscribe_rpc_mode ~timeout ~(kind : Ethereum_types.Subscription.kind) proxied.subscribers <- proxied.subscribers + 1 ; return (stream, unsubscribe) -let eth_subscribe (config : Configuration.t) (mode : Mode.t) ~kind backend = +let eth_subscribe (config : Configuration.t) (mode : 'f Mode.t) ~kind backend = let open Lwt_result_syntax in let id = make_id ~id:(generate_id ()) in let* stream, stopper = @@ -795,8 +795,36 @@ let process_trace_result trace = let msg = Format.asprintf "%a" pp_print_trace e in rpc_error (Rpc_errors.internal_error msg) -let send_raw_transaction (type f) - (tx_container : f Services_backend_sig.tx_container) ~wait_confirmation = +let inject_rpc_call (config : Configuration.t) request method_ + Mode.{evm_node_endpoint; _} = + let open Lwt_result_syntax in + let* res = + Injector.call_singleton_request + ~keep_alive:config.keep_alive + ~timeout:config.rpc_timeout + ~base:evm_node_endpoint + method_ + request + in + match res with + | Ok output -> rpc_ok output + | Error reason -> rpc_error (Rpc_errors.internal_error reason) + +let process_based_on_mode (type f) (mode : f Mode.t) ~on_rpc ~on_stateful_evm + ~on_stateful_michelson = + match mode with + | Mode.Rpc rpc -> on_rpc rpc + | Proxy (Evm_tx_container tx_container) + | Sequencer (Evm_tx_container tx_container) + | Observer (Evm_tx_container tx_container) -> + on_stateful_evm tx_container + | Proxy (Michelson_tx_container tx_container) + | Sequencer (Michelson_tx_container tx_container) + | Observer (Michelson_tx_container tx_container) -> + on_stateful_michelson tx_container + +let send_raw_transaction (type f) (config : Configuration.t) (mode : f Mode.t) + ~wait_confirmation = let open Lwt_result_syntax in let f raw_tx = let txn = Ethereum_types.hex_to_bytes raw_tx in @@ -805,39 +833,48 @@ let send_raw_transaction (type f) | Error err -> let*! () = Tx_pool_events.invalid_transaction ~transaction:raw_tx in rpc_error (Rpc_errors.transaction_rejected err None) - | Ok {next_nonce; transaction_object} -> ( + | Ok {next_nonce; transaction_object} -> Octez_telemetry.Trace.( add_attrs (fun () -> Telemetry.Attributes. [Transaction.hash @@ Transaction_object.hash transaction_object])) ; - - let* (module Tx_container) = - match tx_container with - | Evm_tx_container m -> return m - | Michelson_tx_container _ -> - failwith - "Unsupported JSONRPC method in Tezlink: sendRawTransaction" - in - let* tx_hash = - Tx_container.add - ~next_nonce - transaction_object - ~raw_tx - ~wait_confirmation - in - match tx_hash with - | Ok tx_hash -> rpc_ok tx_hash - | Error reason -> - rpc_error (Rpc_errors.transaction_rejected reason None)) + process_based_on_mode + mode + ~on_rpc:(fun {evm_node_private_endpoint; _} -> + let* res = + Injector.inject_transaction + ~wait_confirmation + ~keep_alive:config.keep_alive + ~timeout:config.rpc_timeout + ~base:evm_node_private_endpoint + ~tx_object:transaction_object + ~raw_tx:(Ethereum_types.hex_to_bytes raw_tx) + in + match res with + | Ok output -> rpc_ok output + | Error reason -> rpc_error (Rpc_errors.internal_error reason)) + ~on_stateful_evm:(fun (module Tx_container) -> + let* tx_hash = + Tx_container.add + ~next_nonce + transaction_object + ~raw_tx + ~wait_confirmation + in + match tx_hash with + | Ok tx_hash -> rpc_ok tx_hash + | Error reason -> + rpc_error (Rpc_errors.transaction_rejected reason None)) + ~on_stateful_michelson:(fun _ -> + failwith "Unsupported JSONRPC method in Tezlink: sendRawTransaction") in f let dispatch_request (type f) ~websocket (rpc_server_family : f Rpc_types.rpc_server_family) - (rpc : Configuration.rpc) (config : Configuration.t) (mode : Mode.t) - (tx_container : f Services_backend_sig.tx_container) + (rpc : Configuration.rpc) (config : Configuration.t) (mode : f Mode.t) ((module Backend_rpc : Services_backend_sig.S), _) - ({method_; parameters; id} : JSONRPC.request) : + ({method_; parameters; id} as request : JSONRPC.request) : JSONRPC.return_response Lwt.t = let open Lwt_result_syntax in let open Ethereum_types in @@ -1003,20 +1040,22 @@ let dispatch_request (type f) ~websocket let f (address, block_param) = match block_param with | Ethereum_types.Block_parameter.(Block_parameter Pending) -> - let* (module Tx_container) = - match tx_container with - | Evm_tx_container m -> return m - | Michelson_tx_container _ -> - failwith - "Unsupported JSONRPC method in Tezlink: \ - getTransactionCount" - in - let* next_nonce = - Backend_rpc.Etherlink.nonce address block_param - in - let next_nonce = Option.value ~default:Qty.zero next_nonce in - let* nonce = Tx_container.nonce ~next_nonce address in - rpc_ok nonce + process_based_on_mode + mode + ~on_rpc:(inject_rpc_call config request module_) + ~on_stateful_michelson:(fun _ -> + failwith + "Unsupported JSONRPC method in Tezlink: \ + getTransactionCount") + ~on_stateful_evm:(fun (module Tx_container) -> + let* next_nonce = + Backend_rpc.Etherlink.nonce address block_param + in + let next_nonce = + Option.value ~default:Qty.zero next_nonce + in + let* nonce = Tx_container.nonce ~next_nonce address in + rpc_ok nonce) | _ -> let* nonce = Backend_rpc.Etherlink.nonce address block_param @@ -1119,24 +1158,23 @@ let dispatch_request (type f) ~websocket Octez_telemetry.Trace.( add_attrs (fun () -> Telemetry.Attributes.[Transaction.hash tx_hash])) ; - - let* (module Tx_container) = - match tx_container with - | Evm_tx_container m -> return m - | Michelson_tx_container _ -> - failwith - "Unsupported JSONRPC method in Tezlink: \ - getTransactionByHash" - in - let* transaction_object = Tx_container.find tx_hash in - let* transaction_object = - match transaction_object with - | Some transaction_object -> return_some transaction_object - | None -> - Backend_rpc.Etherlink_block_storage.transaction_object - tx_hash - in - rpc_ok transaction_object + process_based_on_mode + mode + ~on_rpc:(inject_rpc_call config request module_) + ~on_stateful_michelson:(fun _ -> + failwith + "Unsupported JSONRPC method in Tezlink: \ + GetTransactionByHash") + ~on_stateful_evm:(fun (module Tx_container) -> + let* transaction_object = Tx_container.find tx_hash in + match transaction_object with + | Some transaction_object -> rpc_ok (Some transaction_object) + | None -> + let* transaction_object = + Backend_rpc.Etherlink_block_storage.transaction_object + tx_hash + in + rpc_ok transaction_object) in build_with_input ~f module_ parameters | Get_transaction_by_block_hash_and_index.Method -> @@ -1192,8 +1230,8 @@ let dispatch_request (type f) ~websocket transactions" None else - let f = - send_raw_transaction tx_container ~wait_confirmation:false + let f raw_tx = + send_raw_transaction config mode ~wait_confirmation:false raw_tx in build_with_input ~f module_ parameters | Send_raw_transaction_sync.Method -> @@ -1231,31 +1269,34 @@ let dispatch_request (type f) ~websocket None)) in let f (raw_tx, timeout, block_parameter) = - match (block_parameter, mode) with - (* Forward to the observer directly *) - | ( Ethereum_types.Block_parameter.Pending, - Rpc {evm_node_endpoint; _} ) -> ( - let* res = - Injector.send_raw_transaction_sync - ~keep_alive:true - ~timeout:10. - ~base:evm_node_endpoint - ~raw_tx - ~internal_timeout:timeout - ~block_parameter:Ethereum_types.Block_parameter.Pending - in - match res with - | Ok receipt -> rpc_ok receipt - | Error err -> rpc_error (Rpc_errors.internal_error err)) - (* Use preconfirmation stream *) - | Ethereum_types.Block_parameter.Pending, _ -> - wait_or_timeout timeout (fun () -> + match block_parameter with + (* Wait for the receipt in the stream *) + | Ethereum_types.Block_parameter.Pending -> ( + match mode with + | Rpc {evm_node_endpoint; _} -> ( + (* For the RPC mode forward the call *) + let* res = + Injector.call_singleton_request + ~keep_alive:config.keep_alive + ~timeout:config.rpc_timeout + ~base:evm_node_endpoint + module_ + request + in + match res with + | Ok output -> rpc_ok output + | Error reason -> + rpc_error + (Rpc_errors.transaction_rejected reason None)) + | _ -> + wait_or_timeout timeout @@ fun () -> let receipt_stream, stopper = Broadcast.create_receipt_stream () in let* hash = send_raw_transaction - tx_container + config + mode raw_tx ~wait_confirmation:false in @@ -1273,12 +1314,13 @@ let dispatch_request (type f) ~websocket in Lwt_watcher.shutdown stopper ; receipt) - (* Use normal execution infos *) - | Ethereum_types.Block_parameter.Latest, _ -> + | Ethereum_types.Block_parameter.Latest -> + (* Use normal execution infos *) wait_or_timeout timeout (fun () -> let* hash = send_raw_transaction - tx_container + config + mode raw_tx ~wait_confirmation:true in @@ -1346,15 +1388,15 @@ let dispatch_request (type f) ~websocket build_with_input ~f module_ parameters | Txpool_content.Method -> let f (_ : unit option) = - let* (module Tx_container) = - match tx_container with - | Evm_tx_container m -> return m - | Michelson_tx_container _ -> - failwith - "Unsupported JSONRPC method in Tezlink: txpoolContent" - in - let* txpool_content = Tx_container.content () in - rpc_ok txpool_content + process_based_on_mode + mode + ~on_rpc:(inject_rpc_call config request module_) + ~on_stateful_michelson:(fun _ -> + failwith + "Unsupported JSONRPC method in Tezlink: txpoolContent") + ~on_stateful_evm:(fun (module Tx_container) -> + let* txpool_content = Tx_container.content () in + rpc_ok txpool_content) in build ~f module_ parameters | Web3_clientVersion.Method -> @@ -1473,8 +1515,7 @@ let dispatch_request (type f) ~websocket let dispatch_private_request (type f) ~websocket (rpc_server_family : f Rpc_types.rpc_server_family) - (rpc : Configuration.rpc) (_config : Configuration.t) (_ : Mode.t) - (tx_container : f Services_backend_sig.tx_container) + (rpc : Configuration.rpc) (_config : Configuration.t) (mode : f Mode.t) ((module Backend_rpc : Services_backend_sig.S), _) ~block_production ({method_; parameters; id} : JSONRPC.request) : JSONRPC.return_response Lwt.t = @@ -1569,18 +1610,23 @@ let dispatch_private_request (type f) ~websocket in let transaction = Ethereum_types.hex_encode_string raw_txn in let* tx_hash = - let* (module Tx_container) = - match tx_container with - | Evm_tx_container m -> return m - | Michelson_tx_container _ -> - failwith - "Unsupported JSONRPC method in Tezlink: injectTransaction" - in - Tx_container.add - ~wait_confirmation - ~next_nonce - transaction_object - ~raw_tx:transaction + match mode with + | Observer (Evm_tx_container (module Tx_container)) + | Sequencer (Evm_tx_container (module Tx_container)) -> + Tx_container.add + ~wait_confirmation + ~next_nonce + transaction_object + ~raw_tx:transaction + | Observer (Michelson_tx_container _m) + | Sequencer (Michelson_tx_container _m) -> + failwith + "Unsupported JSONRPC method in Tezlink: injectTransaction" + | Proxy _ -> + failwith + "Unsupported JSONRPC method in proxy: injectTransaction" + | Rpc _ -> + failwith "Unsupported JSONRPC method in Rpc: injectTransaction" in match tx_hash with | Ok tx_hash -> rpc_ok tx_hash @@ -1601,12 +1647,22 @@ let dispatch_private_request (type f) ~websocket [Transaction.hash (Tezos_types.Operation.hash_operation op)])) ; let* hash = let* (module Tx_container) = - match tx_container with - | Evm_tx_container _ -> + match mode with + | Observer (Michelson_tx_container m) + | Sequencer (Michelson_tx_container m) -> + return m + | Observer (Evm_tx_container _m) | Sequencer (Evm_tx_container _m) + -> failwith "Unsupported JSONRPC method in Etherlink: \ injectTezlinkOperation" - | Michelson_tx_container m -> return m + | Proxy _ -> + failwith + "Unsupported JSONRPC method in proxy: \ + injectTezlinkOperation" + | Rpc _ -> + failwith + "Unsupported JSONRPC method in Rpc: injectTezlinkOperation" in Tx_container.add ~next_nonce:(Ethereum_types.Qty op.first_counter) @@ -1655,9 +1711,8 @@ let can_process_batch size = function | Configuration.Limit l -> size <= l | Unlimited -> true -let dispatch_handler ~service_name (rpc : Configuration.rpc) config mode - tx_container ctx dispatch_request (input : JSONRPC.request batched_request) - = +let dispatch_handler ~service_name (rpc : Configuration.rpc) ctx + dispatch_request (input : JSONRPC.request batched_request) = let open Lwt_syntax in let wait_for_return_output JSONRPC.{return_value; id} = match return_value with @@ -1668,9 +1723,7 @@ let dispatch_handler ~service_name (rpc : Configuration.rpc) config mode in match input with | Singleton request -> - let* return_value = - dispatch_request config mode tx_container ctx request - in + let* return_value = dispatch_request ctx request in let* response = wait_for_return_output return_value in return (Singleton response) | Batch requests -> @@ -1679,7 +1732,7 @@ let dispatch_handler ~service_name (rpc : Configuration.rpc) config mode @@ fun _scope -> let process = if can_process_batch batch_size rpc.batch_limit then - dispatch_request config mode tx_container ctx + dispatch_request ctx else fun req -> let response = direct_rpc_value @@ -1720,8 +1773,7 @@ let empty_stream = let empty_sid = Ethereum_types.(Subscription.Id (Hex "")) let dispatch_websocket (rpc_server_family : _ Rpc_types.rpc_server_family) - (rpc : Configuration.rpc) config mode tx_container ctx - (input : JSONRPC.request) = + (rpc : Configuration.rpc) config mode ctx (input : JSONRPC.request) = let open Lwt_syntax in match map_method_name @@ -1776,7 +1828,6 @@ let dispatch_websocket (rpc_server_family : _ Rpc_types.rpc_server_family) rpc config mode - tx_container ctx input in @@ -1791,8 +1842,7 @@ let dispatch_websocket (rpc_server_family : _ Rpc_types.rpc_server_family) let dispatch_private_websocket (rpc_server_family : _ Rpc_types.rpc_server_family) ~block_production - (rpc : Configuration.rpc) config mode tx_container ctx - (input : JSONRPC.request) = + (rpc : Configuration.rpc) config mode ctx (input : JSONRPC.request) = let open Lwt_syntax in let* {return_value; id} = dispatch_private_request @@ -1802,7 +1852,6 @@ let dispatch_private_websocket rpc config mode - tx_container ctx input in @@ -1815,55 +1864,42 @@ let dispatch_private_websocket in websocket_response_of_response response -let generic_dispatch ~service_name (rpc : Configuration.rpc) config mode - tx_container ctx dir path dispatch_request = +let generic_dispatch ~service_name (rpc : Configuration.rpc) ctx dir path + dispatch_request = Evm_directory.register0 dir (dispatch_batch_service ~path) (fun () input -> - dispatch_handler - ~service_name - rpc - config - mode - tx_container - ctx - dispatch_request - input + dispatch_handler ~service_name rpc ctx dispatch_request input |> Lwt_result.ok) -let dispatch_public (type f) (rpc_server_family : _ Rpc_types.rpc_server_family) - (rpc : Configuration.rpc) config (mode : Mode.t) - (tx_container : f Services_backend_sig.tx_container) ctx dir = +let dispatch_public (type f) (rpc_server_family : f Rpc_types.rpc_server_family) + (rpc : Configuration.rpc) config (mode : f Mode.t) ctx dir = generic_dispatch ~service_name:"public_rpc" rpc - config - mode - tx_container ctx dir Path.root - (dispatch_request ~websocket:false rpc_server_family rpc) + (dispatch_request ~websocket:false rpc_server_family rpc config mode) let dispatch_private (type f) (rpc_server_family : _ Rpc_types.rpc_server_family) - (rpc : Configuration.rpc) ~block_production config (mode : Mode.t) - (tx_container : f Services_backend_sig.tx_container) ctx dir = + (rpc : Configuration.rpc) ~block_production config (mode : f Mode.t) ctx dir + = generic_dispatch ~service_name:"private_rpc" rpc - config - mode - tx_container ctx dir Path.(add_suffix root "private") (dispatch_private_request ~websocket:false rpc_server_family + ~block_production rpc - ~block_production) + config + mode) -let generic_websocket_dispatch (config : Configuration.t) mode tx_container ctx - dir path dispatch_websocket = +let generic_websocket_dispatch (config : Configuration.t) mode ctx dir path + dispatch_websocket = match config.websockets with | None -> dir | Some {max_message_length; monitor_heartbeat; _} -> @@ -1872,37 +1908,32 @@ let generic_websocket_dispatch (config : Configuration.t) mode tx_container ctx ~max_message_length dir path - (dispatch_websocket config mode tx_container ctx) + (dispatch_websocket config mode ctx) -let dispatch_websocket_public (type f) +let dispatch_websocket_public (rpc_server_family : _ Rpc_types.rpc_server_family) - (rpc : Configuration.rpc) config mode - (tx_container : f Services_backend_sig.tx_container) ctx dir = + (rpc : Configuration.rpc) config mode ctx dir = generic_websocket_dispatch config mode - tx_container ctx dir "/ws" (dispatch_websocket rpc_server_family rpc) -let dispatch_websocket_private (type f) +let dispatch_websocket_private (rpc_server_family : _ Rpc_types.rpc_server_family) - (rpc : Configuration.rpc) ~block_production config mode - (tx_container : f Services_backend_sig.tx_container) ctx dir = + (rpc : Configuration.rpc) ~block_production config mode ctx dir = generic_websocket_dispatch config mode - tx_container ctx dir "/private/ws" (dispatch_private_websocket rpc_server_family ~block_production rpc) -let directory (type f) ~(rpc_server_family : f Rpc_types.rpc_server_family) mode - rpc config (tx_container : f Services_backend_sig.tx_container) backend dir - = +let directory (type f) ~(rpc_server_family : f Rpc_types.rpc_server_family) + (mode : f Mode.t) rpc config backend dir = let db_liveness_check () = let open Lwt_result_syntax in let (module Backend : Services_backend_sig.S) = fst backend in @@ -1922,17 +1953,10 @@ let directory (type f) ~(rpc_server_family : f Rpc_types.rpc_server_family) mode dir |> version |> configuration config |> evm_mode config mode |> health_check config mode db_liveness_check - |> dispatch_public rpc_server_family rpc config mode tx_container backend - |> dispatch_websocket_public - rpc_server_family - rpc - config - mode - tx_container - backend + |> dispatch_public rpc_server_family rpc config mode backend + |> dispatch_websocket_public rpc_server_family rpc config mode backend -let private_directory ~rpc_server_family mode rpc config - (tx_container : _ Services_backend_sig.tx_container) backend +let private_directory ~rpc_server_family mode rpc config backend ~block_production = Evm_directory.empty config.experimental_features.rpc_server |> version |> evm_mode config mode @@ -1941,7 +1965,6 @@ let private_directory ~rpc_server_family mode rpc config rpc config mode - tx_container backend ~block_production |> dispatch_websocket_private @@ -1949,6 +1972,5 @@ let private_directory ~rpc_server_family mode rpc config rpc config mode - tx_container backend ~block_production