From 5eb664301a28db2600131e2569e9edda4386a1ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Mon, 12 May 2025 16:06:37 +0200 Subject: [PATCH 1/2] EVM node/Tx_queue: move send_transactions_batch to Handlers --- etherlink/bin_node/lib_dev/tx_queue.ml | 163 +++++++++++++------------ 1 file changed, 83 insertions(+), 80 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 05c8d3643f8a..87c6095b3fe3 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -540,86 +540,6 @@ module Worker = Worker.MakeSingle (Name) (Request) (Types) type worker = Worker.infinite Worker.queue Worker.t -let uuid_seed = Random.get_state () - -let send_transactions_batch ~evm_node_endpoint ~keep_alive transactions = - let open Lwt_result_syntax in - let module M = Map.Make (String) in - let module Srt = Rpc_encodings.Send_raw_transaction in - if Seq.is_empty transactions then return_unit - else - let rev_batch, callbacks = - Seq.fold_left - (fun (rev_batch, callbacks) {hash = _; payload; queue_callback} -> - let req_id = Uuidm.(v4_gen uuid_seed () |> to_string ~upper:false) in - let txn = - Rpc_encodings.JSONRPC. - { - method_ = Srt.method_; - parameters = - Some (Data_encoding.Json.construct Srt.input_encoding payload); - id = Some (Id_string req_id); - } - in - - (txn :: rev_batch, M.add req_id queue_callback callbacks)) - ([], M.empty) - transactions - in - let batch = List.rev rev_batch in - - let*! () = Tx_queue_events.injecting_transactions (List.length batch) in - - let* responses = - match evm_node_endpoint with - | Rpc base -> - let* batch_response = - Rollup_services.call_service - ~keep_alive - ~base - (Batch.dispatch_batch_service ~path:Resto.Path.root) - () - () - (Batch batch) - in - return - (match batch_response with Singleton r -> [r] | Batch rs -> rs) - | Websocket ws_client -> - List.map_es - (fun req -> - let+ resp_json = - Websocket_client.send_jsonrpc_request ws_client req - in - Data_encoding.Json.destruct - Rpc_encodings.JSONRPC.response_encoding - resp_json) - batch - in - - let* missed_callbacks = - List.fold_left_es - (fun callbacks (response : Rpc_encodings.JSONRPC.response) -> - match response with - | {id = Some (Id_string req); value} -> ( - match (value, M.find_opt req callbacks) with - | value, Some callback -> - let* () = - match value with - | Ok _hash_encoded -> Lwt_result.ok (callback `Accepted) - | Error error -> - let*! () = Tx_queue_events.rpc_error error in - Lwt_result.ok (callback `Refused) - in - return (M.remove req callbacks) - | _ -> return callbacks) - | _ -> failwith "Inconsistent response from the server") - callbacks - responses - in - - assert (M.is_empty missed_callbacks) ; - return_unit - (** clear values and keep the allocated space *) let clear ({ @@ -692,6 +612,89 @@ module Handlers = struct type self = worker + let uuid_seed = Random.get_state () + + let send_transactions_batch ~evm_node_endpoint ~keep_alive transactions = + let open Lwt_result_syntax in + let module M = Map.Make (String) in + let module Srt = Rpc_encodings.Send_raw_transaction in + if Seq.is_empty transactions then return_unit + else + let rev_batch, callbacks = + Seq.fold_left + (fun (rev_batch, callbacks) {hash = _; payload; queue_callback} -> + let req_id = + Uuidm.(v4_gen uuid_seed () |> to_string ~upper:false) + in + let txn = + Rpc_encodings.JSONRPC. + { + method_ = Srt.method_; + parameters = + Some + (Data_encoding.Json.construct Srt.input_encoding payload); + id = Some (Id_string req_id); + } + in + + (txn :: rev_batch, M.add req_id queue_callback callbacks)) + ([], M.empty) + transactions + in + let batch = List.rev rev_batch in + + let*! () = Tx_queue_events.injecting_transactions (List.length batch) in + + let* responses = + match evm_node_endpoint with + | Rpc base -> + let* batch_response = + Rollup_services.call_service + ~keep_alive + ~base + (Batch.dispatch_batch_service ~path:Resto.Path.root) + () + () + (Batch batch) + in + return + (match batch_response with Singleton r -> [r] | Batch rs -> rs) + | Websocket ws_client -> + List.map_es + (fun req -> + let+ resp_json = + Websocket_client.send_jsonrpc_request ws_client req + in + Data_encoding.Json.destruct + Rpc_encodings.JSONRPC.response_encoding + resp_json) + batch + in + + let* missed_callbacks = + List.fold_left_es + (fun callbacks (response : Rpc_encodings.JSONRPC.response) -> + match response with + | {id = Some (Id_string req); value} -> ( + match (value, M.find_opt req callbacks) with + | value, Some callback -> + let* () = + match value with + | Ok _hash_encoded -> Lwt_result.ok (callback `Accepted) + | Error error -> + let*! () = Tx_queue_events.rpc_error error in + Lwt_result.ok (callback `Refused) + in + return (M.remove req callbacks) + | _ -> return callbacks) + | _ -> failwith "Inconsistent response from the server") + callbacks + responses + in + + assert (M.is_empty missed_callbacks) ; + return_unit + let on_request : type r request_error. worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t -- GitLab From eaa5a3f3bdc4c088815770f4ed46cbbe1cc691b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Wed, 30 Apr 2025 17:06:21 +0200 Subject: [PATCH 2/2] EVM node/Tx_queue: move remaining toplevel functions to submodules This commit moves all remaining toplevel functions of tx_queue.ml to submodules Handlers and Tx_container with the notable exception of start. --- etherlink/bin_node/lib_dev/tx_queue.ml | 252 +++++++++++++------------ 1 file changed, 128 insertions(+), 124 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 87c6095b3fe3..0f092f6453fd 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -540,73 +540,6 @@ module Worker = Worker.MakeSingle (Name) (Request) (Types) type worker = Worker.infinite Worker.queue Worker.t -(** clear values and keep the allocated space *) -let clear - ({ - queue; - pending; - tx_object; - tx_per_address; - address_nonce; - config = _; - keep_alive = _; - locked = _; - } : - state) = - (* full matching so when a new element is added to the state it's not - forgotten to clear it. *) - String.Hashtbl.clear pending ; - String.Hashtbl.clear tx_object ; - String.Hashtbl.clear tx_per_address ; - String.Hashtbl.clear address_nonce ; - Queue.clear queue ; - () - -let lock_transactions state = state.locked <- true - -let unlock_transactions state = state.locked <- false - -let is_locked state = state.locked - -let pop_queue_until state ~validation_state ~validate_tx = - let open Lwt_result_syntax in - let rec aux validation_state rev_selected = - match Queue.peek_opt state.queue with - | None -> return rev_selected - | Some {hash; payload; queue_callback} -> ( - let raw_tx = Ethereum_types.hex_to_bytes payload in - let tx_object = Transaction_objects.find state.tx_object hash in - match tx_object with - | None -> - (* Drop that tx because no tx_object associated. this is - an inpossible case, we log it to investigate. *) - let*! () = Tx_queue_events.missing_tx_object hash in - let _ = Queue.take state.queue in - let*! () = queue_callback `Refused in - aux validation_state rev_selected - | Some tx_object -> ( - let* is_valid = validate_tx validation_state raw_tx tx_object in - match is_valid with - | `Stop -> return rev_selected - (* `Stop means that we don't pop transaction anymore. We - don't remove the last peek tx because it could be valid - for another call. *) - | `Drop -> - (* `Drop, the current tx was evaluated and was refused - by the caller. *) - let _ = Queue.take state.queue in - let*! () = queue_callback `Refused in - aux validation_state rev_selected - | `Keep validation_state -> - (* `Keep, the current tx was evaluated and was validated - by the caller. *) - let _ = Queue.take state.queue in - let*! () = queue_callback `Accepted in - aux validation_state ((raw_tx, tx_object) :: rev_selected))) - in - let* rev_selected = aux validation_state [] in - return @@ List.rev rev_selected - module Handlers = struct open Request @@ -695,6 +628,73 @@ module Handlers = struct assert (M.is_empty missed_callbacks) ; return_unit + (** clear values and keep the allocated space *) + let clear + ({ + queue; + pending; + tx_object; + tx_per_address; + address_nonce; + config = _; + keep_alive = _; + locked = _; + } : + state) = + (* full matching so when a new element is added to the state it's not + forgotten to clear it. *) + String.Hashtbl.clear pending ; + String.Hashtbl.clear tx_object ; + String.Hashtbl.clear tx_per_address ; + String.Hashtbl.clear address_nonce ; + Queue.clear queue ; + () + + let lock_transactions state = state.locked <- true + + let unlock_transactions state = state.locked <- false + + let is_locked state = state.locked + + let pop_queue_until state ~validation_state ~validate_tx = + let open Lwt_result_syntax in + let rec aux validation_state rev_selected = + match Queue.peek_opt state.queue with + | None -> return rev_selected + | Some {hash; payload; queue_callback} -> ( + let raw_tx = Ethereum_types.hex_to_bytes payload in + let tx_object = Transaction_objects.find state.tx_object hash in + match tx_object with + | None -> + (* Drop that tx because no tx_object associated. this is + an inpossible case, we log it to investigate. *) + let*! () = Tx_queue_events.missing_tx_object hash in + let _ = Queue.take state.queue in + let*! () = queue_callback `Refused in + aux validation_state rev_selected + | Some tx_object -> ( + let* is_valid = validate_tx validation_state raw_tx tx_object in + match is_valid with + | `Stop -> return rev_selected + (* `Stop means that we don't pop transaction anymore. We + don't remove the last peek tx because it could be valid + for another call. *) + | `Drop -> + (* `Drop, the current tx was evaluated and was refused + by the caller. *) + let _ = Queue.take state.queue in + let*! () = queue_callback `Refused in + aux validation_state rev_selected + | `Keep validation_state -> + (* `Keep, the current tx was evaluated and was validated + by the caller. *) + let _ = Queue.take state.queue in + let*! () = queue_callback `Accepted in + aux validation_state ((raw_tx, tx_object) :: rev_selected))) + in + let* rev_selected = aux validation_state [] in + return @@ List.rev rev_selected + let on_request : type r request_error. worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t @@ -979,69 +979,62 @@ module Handlers = struct let on_close _ = Lwt.return_unit end -let table = Worker.create_table Queue - -let worker_promise, worker_waker = Lwt.task () - -type error += No_worker - -type error += Tx_queue_is_closed - -let () = - register_error_kind - `Permanent - ~id:"tx_queue_is_closed" - ~title:"Tx_queue_is_closed" - ~description:"Failed to add a request to the Tx queue, it's closed." - Data_encoding.unit - (function Tx_queue_is_closed -> Some () | _ -> None) - (fun () -> Tx_queue_is_closed) - -let worker = - lazy - (match Lwt.state worker_promise with - | Lwt.Return worker -> Ok worker - | Lwt.Fail e -> Result_syntax.tzfail (error_of_exn e) - | Lwt.Sleep -> Result_syntax.tzfail No_worker) - -let handle_request_error rq = - let open Lwt_syntax in - let* rq in - match rq with - | Ok res -> return_ok res - | Error (Worker.Request_error errs) -> Lwt.return_error errs - | Error (Closed None) -> Lwt.return_error [Tx_queue_is_closed] - | Error (Closed (Some errs)) -> Lwt.return_error errs - | Error (Any exn) -> Lwt.return_error [Exn exn] - -let bind_worker f = - let open Lwt_result_syntax in - let res = Lazy.force worker in - match res with - | Error [No_worker] -> - (* There is no worker, nothing to do *) - return_unit - | Error errs -> fail errs - | Ok w -> f w - -let push_request worker request = - let open Lwt_result_syntax in - let*! (pushed : bool) = Worker.Queue.push_request worker request in - if not pushed then tzfail Tx_queue_is_closed else return_unit - -let start ~config ~keep_alive () = - let open Lwt_result_syntax in - let* worker = Worker.launch table () {config; keep_alive} (module Handlers) in - Lwt.wakeup worker_waker worker ; - let*! () = Tx_queue_events.is_ready () in - return_unit - module Internal_for_tests = struct module Nonce_bitset = Nonce_bitset module Address_nonce = Address_nonce end module Tx_container = struct + let table = Worker.create_table Queue + + let worker_promise, worker_waker = Lwt.task () + + type error += No_worker + + type error += Tx_queue_is_closed + + let () = + register_error_kind + `Permanent + ~id:"tx_queue_is_closed" + ~title:"Tx_queue_is_closed" + ~description:"Failed to add a request to the Tx queue, it's closed." + Data_encoding.unit + (function Tx_queue_is_closed -> Some () | _ -> None) + (fun () -> Tx_queue_is_closed) + + let worker = + lazy + (match Lwt.state worker_promise with + | Lwt.Return worker -> Ok worker + | Lwt.Fail e -> Result_syntax.tzfail (error_of_exn e) + | Lwt.Sleep -> Result_syntax.tzfail No_worker) + + let handle_request_error rq = + let open Lwt_syntax in + let* rq in + match rq with + | Ok res -> return_ok res + | Error (Worker.Request_error errs) -> Lwt.return_error errs + | Error (Closed None) -> Lwt.return_error [Tx_queue_is_closed] + | Error (Closed (Some errs)) -> Lwt.return_error errs + | Error (Any exn) -> Lwt.return_error [Exn exn] + + let bind_worker f = + let open Lwt_result_syntax in + let res = Lazy.force worker in + match res with + | Error [No_worker] -> + (* There is no worker, nothing to do *) + return_unit + | Error errs -> fail errs + | Ok w -> f w + + let push_request worker request = + let open Lwt_result_syntax in + let*! (pushed : bool) = Worker.Queue.push_request worker request in + if not pushed then tzfail Tx_queue_is_closed else return_unit + let nonce ~next_nonce address = let open Lwt_result_syntax in let*? w = Lazy.force worker in @@ -1136,4 +1129,15 @@ module Tx_container = struct (Pop_transactions {validate_tx; validation_state = initial_validation_state}) |> handle_request_error + + let start ~config ~keep_alive () = + let open Lwt_result_syntax in + let* worker = + Worker.launch table () {config; keep_alive} (module Handlers) + in + Lwt.wakeup worker_waker worker ; + let*! () = Tx_queue_events.is_ready () in + return_unit end + +let start = Tx_container.start -- GitLab