From f0245bc67940c9416f812d9db3403090a037dcd2 Mon Sep 17 00:00:00 2001 From: Pierrick Couderc Date: Tue, 7 Jan 2025 15:26:02 +0100 Subject: [PATCH 1/2] Workers: requests can have optional metadata Metadata are in Ezjson format, so that they can come from various sources. These are meant to be populated by profilers to help propagating data across worker requests. --- src/lib_workers/test/test_workers_unit.ml | 30 +++--- src/lib_workers/worker.ml | 113 +++++++++++++++------- src/lib_workers/worker.mli | 20 +++- 3 files changed, 109 insertions(+), 54 deletions(-) diff --git a/src/lib_workers/test/test_workers_unit.ml b/src/lib_workers/test/test_workers_unit.ml index 6850bfffc2bc..360f2dcf1434 100644 --- a/src/lib_workers/test/test_workers_unit.ml +++ b/src/lib_workers/test/test_workers_unit.ml @@ -45,9 +45,11 @@ let create_handlers (type a) ?on_completion () = let on_request : type r request_error. - self -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun _w request -> + self -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun _w request _ -> let open Lwt_result_syntax in match request with | Request.RqA _i -> return_unit @@ -62,7 +64,7 @@ let create_handlers (type a) ?on_completion () = let open Lwt_result_syntax in return (ref []) - let on_error (type a b) w _st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) w _st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let history = Worker.state w in @@ -76,7 +78,7 @@ let create_handlers (type a) ?on_completion () = history := "RqErr" :: !history ; return_unit) - let on_completion w r _ _st = + let on_completion w r _ _ _st = let open Lwt_syntax in let history = Worker.state w in let () = @@ -112,16 +114,18 @@ let create_bounded ?on_completion = let create_dropbox ?on_completion = let table = let open Worker in - let merge _w (Any_request neu) (old : _ option) = - let (Any_request r) = + let merge _w (Any_request (neu, neu_metadata)) (old : _ option) = + let (Any_request (r, metadata)) = match (neu, old) with - | RqA i1, Some (Any_request (RqA i2)) -> Any_request (RqA (i1 + i2)) - | (RqA _ as rqa), _ -> Any_request rqa - | _, Some (Any_request (RqA _ as rqa)) -> Any_request rqa - | RqB, _ -> Any_request neu - | RqErr _, _ -> Any_request neu + | RqA i1, Some (Any_request (RqA i2, _)) -> + Any_request (RqA (i1 + i2), neu_metadata) + | (RqA _ as rqa), _ -> Any_request (rqa, neu_metadata) + | _, Some (Any_request ((RqA _ as rqa), metadata)) -> + Any_request (rqa, metadata) + | RqB, _ -> Any_request (neu, neu_metadata) + | RqErr _, _ -> Any_request (neu, neu_metadata) in - Some (Worker.Any_request r) + Some (Worker.Any_request (r, metadata)) in Worker.create_table (Dropbox {merge}) in diff --git a/src/lib_workers/worker.ml b/src/lib_workers/worker.ml index ecc05caff66e..7e3c3b6b27eb 100644 --- a/src/lib_workers/worker.ml +++ b/src/lib_workers/worker.ml @@ -56,6 +56,8 @@ module type T = sig | Request_error of 'a | Any of exn + type metadata = Data_encoding.json + (** Supported kinds of internal buffers. *) type _ buffer_kind = | Queue : infinite queue buffer_kind @@ -66,7 +68,7 @@ module type T = sig } -> dropbox buffer_kind - and any_request = Any_request : _ Request.t -> any_request + and any_request = Any_request : _ Request.t * metadata option -> any_request (** Create a table of workers. *) val create_table : 'kind buffer_kind -> 'kind table @@ -92,6 +94,7 @@ module type T = sig val on_request : self -> ('a, 'request_error) Request.t -> + metadata option -> ('a, 'request_error) result Lwt.t (** Called when no request has been made before the timeout, if @@ -111,6 +114,7 @@ module type T = sig self -> Worker_types.request_status -> ('a, 'request_error) Request.t -> + metadata option -> 'request_error -> unit tzresult Lwt.t @@ -119,6 +123,7 @@ module type T = sig val on_completion : self -> ('a, 'request_error) Request.t -> + metadata option -> 'a -> Worker_types.request_status -> unit Lwt.t @@ -143,9 +148,11 @@ module type T = sig module type BOX = sig type t - val put_request : t -> ('a, 'request_error) Request.t -> unit + val put_request : + ?metadata:metadata -> t -> ('a, 'request_error) Request.t -> unit val put_request_and_wait : + ?metadata:metadata -> t -> ('a, 'request_error) Request.t -> ('a, 'request_error message_error) result Lwt.t @@ -155,11 +162,13 @@ module type T = sig type 'a t val push_request_and_wait : + ?metadata:metadata -> 'q t -> ('a, 'request_error) Request.t -> ('a, 'request_error message_error) result Lwt.t - val push_request : 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t + val push_request : + ?metadata:metadata -> 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t val pending_requests : 'a t -> (Time.System.t * Request.view) list @@ -175,7 +184,10 @@ module type T = sig (** Adds a message to the queue immediately. *) val push_request_now : - infinite queue t -> ('a, 'request_error) Request.t -> unit + ?metadata:metadata -> + infinite queue t -> + ('a, 'request_error) Request.t -> + unit end (** Exports the canceler to allow cancellation of other tasks when this @@ -247,9 +259,13 @@ struct | Request_error of 'a | Any of exn + type metadata = Data_encoding.json + type message = | Message : - ('a, 'b) Request.t * ('a, 'b message_error) result Lwt.u option + ('a, 'b) Request.t + * metadata option + * ('a, 'b message_error) result Lwt.u option -> message type 'a queue @@ -269,7 +285,7 @@ struct } -> dropbox buffer_kind - and any_request = Any_request : _ Request.t -> any_request + and any_request = Any_request : _ Request.t * metadata option -> any_request and _ buffer = | Queue_buffer : @@ -306,27 +322,32 @@ struct | Worker_types.Launching _ | Running _ | Closing _ -> None | Closed (_, _, errs) -> errs - let queue_item ?u r = (Time.System.now (), Message (r, u)) + let queue_item ?u metadata r = (Time.System.now (), Message (r, metadata, u)) - let drop_request w merge message_box request = + let drop_request ?metadata w merge message_box request = try match match Lwt_dropbox.peek message_box with - | None -> merge w (Any_request request) None - | Some (_, Message (old, _)) -> + | None -> merge w (Any_request (request, metadata)) None + | Some (_, Message (old, metadata_old, _)) -> Lwt.ignore_result (Lwt_dropbox.take message_box) ; - merge w (Any_request request) (Some (Any_request old)) + merge + w + (Any_request (request, metadata)) + (Some (Any_request (old, metadata_old))) with | None -> () - | Some (Any_request neu) -> - Lwt_dropbox.put message_box (Time.System.now (), Message (neu, None)) + | Some (Any_request (neu, metadata)) -> + Lwt_dropbox.put + message_box + (Time.System.now (), Message (neu, metadata, None)) with Lwt_dropbox.Closed -> () - let drop_request_and_wait w message_box request = + let drop_request_and_wait ?metadata w message_box request = let t, u = Lwt.wait () in Lwt.catch (fun () -> - Lwt_dropbox.put message_box (queue_item ~u request) ; + Lwt_dropbox.put message_box (queue_item metadata ~u request) ; t) (function | Lwt_dropbox.Closed -> @@ -338,9 +359,11 @@ struct module type BOX = sig type t - val put_request : t -> ('a, 'request_error) Request.t -> unit + val put_request : + ?metadata:metadata -> t -> ('a, 'request_error) Request.t -> unit val put_request_and_wait : + ?metadata:metadata -> t -> ('a, 'request_error) Request.t -> ('a, 'request_error message_error) result Lwt.t @@ -350,11 +373,13 @@ struct type 'a t val push_request_and_wait : + ?metadata:metadata -> 'q t -> ('a, 'request_error) Request.t -> ('a, 'request_error message_error) result Lwt.t - val push_request : 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t + val push_request : + ?metadata:metadata -> 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t val pending_requests : 'a t -> (Time.System.t * Request.view) list @@ -362,23 +387,23 @@ struct end module Dropbox = struct - let put_request (w : dropbox t) request = + let put_request ?metadata (w : dropbox t) request = let (Dropbox {merge}) = w.table.buffer_kind in let (Dropbox_buffer message_box) = w.buffer in - drop_request w merge message_box request + drop_request ?metadata w merge message_box request - let put_request_and_wait (w : dropbox t) request = + let put_request_and_wait ?metadata (w : dropbox t) request = let (Dropbox_buffer message_box) = w.buffer in - drop_request_and_wait w message_box request + drop_request_and_wait ?metadata w message_box request end module Queue = struct - let push_request (type a) (w : a queue t) request = + let push_request ?metadata (type a) (w : a queue t) request = match w.buffer with | Queue_buffer message_queue -> if Lwt_pipe.Unbounded.is_closed message_queue then Lwt.return_false else ( - Lwt_pipe.Unbounded.push message_queue (queue_item request) ; + Lwt_pipe.Unbounded.push message_queue (queue_item metadata request) ; (* because pushing on an unbounded pipe is immediate, we return within Lwt explicitly for compatibility with the other case *) Lwt.return_true) @@ -387,21 +412,23 @@ struct else let open Lwt_syntax in let* () = - Lwt_pipe.Bounded.push message_queue (queue_item request) + Lwt_pipe.Bounded.push message_queue (queue_item metadata request) in Lwt.return_true - let push_request_now (w : infinite queue t) request = + let push_request_now ?metadata (w : infinite queue t) request = let (Queue_buffer message_queue) = w.buffer in if Lwt_pipe.Unbounded.is_closed message_queue then () - else Lwt_pipe.Unbounded.push message_queue (queue_item request) + else Lwt_pipe.Unbounded.push message_queue (queue_item metadata request) - let push_request_and_wait (type a) (w : a queue t) request = + let push_request_and_wait ?metadata (type a) (w : a queue t) request = match w.buffer with | Queue_buffer message_queue -> ( try let t, u = Lwt.wait () in - Lwt_pipe.Unbounded.push message_queue (queue_item ~u request) ; + Lwt_pipe.Unbounded.push + message_queue + (queue_item ~u metadata request) ; t with Lwt_pipe.Closed -> Lwt.return_error (Closed (extract_status_errors w))) @@ -409,7 +436,9 @@ struct let t, u = Lwt.wait () in Lwt.try_bind (fun () -> - Lwt_pipe.Bounded.push message_queue (queue_item ~u request)) + Lwt_pipe.Bounded.push + message_queue + (queue_item ~u metadata request)) (fun () -> t) (function | Lwt_pipe.Closed -> @@ -426,7 +455,9 @@ struct Lwt_pipe.Bounded.peek_all_now message_queue with Lwt_pipe.Closed -> [] in - List.map (function t, Message (req, _) -> (t, Request.view req)) peeked + List.map + (function t, Message (req, _, _) -> (t, Request.view req)) + peeked let pending_requests_length (type a) (w : a queue t) = let pipe_length (type a) (q : a buffer) = @@ -440,9 +471,9 @@ struct let close (type a) (w : a t) = let wakeup = function - | _, Message (_, Some u) -> + | _, Message (_, _, Some u) -> Lwt.wakeup_later u (Error (Closed (extract_status_errors w))) - | _, Message (_, None) -> () + | _, Message (_, _, None) -> () in let close_queue message_queue = let messages = Lwt_pipe.Bounded.pop_all_now message_queue in @@ -514,6 +545,7 @@ struct val on_request : self -> ('a, 'request_error) Request.t -> + metadata option -> ('a, 'request_error) result Lwt.t val on_no_request : self -> unit Lwt.t @@ -524,12 +556,14 @@ struct self -> Worker_types.request_status -> ('a, 'request_error) Request.t -> + metadata option -> 'request_error -> unit tzresult Lwt.t val on_completion : self -> ('a, 'request_error) Request.t -> + metadata option -> 'a -> Worker_types.request_status -> unit Lwt.t @@ -579,7 +613,7 @@ struct | Ok None -> let* () = Handlers.on_no_request w in loop () - | Ok (Some (pushed, Message (request, u))) -> ( + | Ok (Some (pushed, Message (request, metadata, u))) -> ( let current_request = Request.view request in let treated = Time.System.now () in w.current_request <- Some (pushed, treated, current_request) ; @@ -587,14 +621,16 @@ struct match u with | None -> ( let open Lwt_result_syntax in - let*! res = Handlers.on_request w request in + let*! res = Handlers.on_request w request metadata in match res with | Error err -> Lwt.return_error err | Ok res -> let completed = Time.System.now () in w.current_request <- None ; let status = Worker_types.{pushed; treated; completed} in - let*! () = Handlers.on_completion w request res status in + let*! () = + Handlers.on_completion w request metadata res status + in let*! () = Worker_events.(emit request_no_errors) (current_request, status) @@ -606,7 +642,7 @@ struct (Error). To that end, we treat it locally like a regular promise (which happens to carry a [result]) within the Lwt monad. *) - let* res = Handlers.on_request w request in + let* res = Handlers.on_request w request metadata in match res with | Error err -> Lwt.wakeup_later u (Error (Request_error err)) ; @@ -616,7 +652,9 @@ struct let completed = Time.System.now () in let status = Worker_types.{pushed; treated; completed} in w.current_request <- None ; - let* () = Handlers.on_completion w request res status in + let* () = + Handlers.on_completion w request metadata res status + in let* () = Worker_events.(emit request_no_errors) (current_request, status) @@ -635,6 +673,7 @@ struct w Worker_types.{pushed; treated; completed} request + metadata err | None -> assert false in diff --git a/src/lib_workers/worker.mli b/src/lib_workers/worker.mli index 9f28073b99bf..6dafd21d37bd 100644 --- a/src/lib_workers/worker.mli +++ b/src/lib_workers/worker.mli @@ -67,6 +67,8 @@ module type T = sig | Request_error of 'a | Any of exn + type metadata = Data_encoding.json + (** Supported kinds of internal buffers. *) type _ buffer_kind = | Queue : infinite queue buffer_kind @@ -77,7 +79,7 @@ module type T = sig } -> dropbox buffer_kind - and any_request = Any_request : _ Request.t -> any_request + and any_request = Any_request : _ Request.t * metadata option -> any_request (** Create a table of workers. *) val create_table : 'kind buffer_kind -> 'kind table @@ -104,6 +106,7 @@ module type T = sig val on_request : self -> ('a, 'request_error) Request.t -> + metadata option -> ('a, 'request_error) result Lwt.t (** Called when no request has been made before the timeout, if @@ -121,6 +124,7 @@ module type T = sig self -> Worker_types.request_status -> ('a, 'request_error) Request.t -> + metadata option -> 'request_error -> unit tzresult Lwt.t @@ -129,6 +133,7 @@ module type T = sig val on_completion : self -> ('a, 'request_error) Request.t -> + metadata option -> 'a -> Worker_types.request_status -> unit Lwt.t @@ -158,12 +163,14 @@ module type T = sig (** [put_request worker request] sends the [request] to the [worker]. If the [worker] dropbox is closed, then it is a no-op. *) - val put_request : t -> ('a, 'request_error) Request.t -> unit + val put_request : + ?metadata:metadata -> t -> ('a, 'request_error) Request.t -> unit (** [put_request_and_wait worker request] sends the [request] to the [worker] and waits for its completion. If the worker dropbox is closed, then it returns [Error Closed]. *) val put_request_and_wait : + ?metadata:metadata -> t -> ('a, 'request_error) Request.t -> ('a, 'request_error message_error) result Lwt.t @@ -178,6 +185,7 @@ module type T = sig then it returns [Error Closed]. If the buffer is a bounded queue and the underlying queue is full, the call is blocking. *) val push_request_and_wait : + ?metadata:metadata -> 'q t -> ('a, 'request_error) Request.t -> ('a, 'request_error message_error) result Lwt.t @@ -186,7 +194,8 @@ module type T = sig promise returned is [true] if the request was pushed successfuly or [false] if the worker queue is closed. If the buffer is a bounded queue and the underlying queue is full, the call is blocking. *) - val push_request : 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t + val push_request : + ?metadata:metadata -> 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t val pending_requests : 'a t -> (Time.System.t * Request.view) list @@ -202,7 +211,10 @@ module type T = sig (** Adds a message to the queue immediately. *) val push_request_now : - infinite queue t -> ('a, 'request_error) Request.t -> unit + ?metadata:metadata -> + infinite queue t -> + ('a, 'request_error) Request.t -> + unit end (** Exports the canceler to allow cancellation of other tasks when this -- GitLab From fdef113bfb2a6078f35c21ab7eb0b36ca7a04e87 Mon Sep 17 00:00:00 2001 From: Pierrick Couderc Date: Tue, 7 Jan 2025 15:27:24 +0100 Subject: [PATCH 2/2] Workers: update all workers for the new interface with metadata This update is purely mechanical and adds the relevant parameter to each of the Handlers implementation. --- etherlink/bin_node/lib_dev/block_producer.ml | 12 +++++++----- .../bin_node/lib_dev/blueprints_publisher.ml | 13 ++++++++----- etherlink/bin_node/lib_dev/evm_context.ml | 13 ++++++++----- .../bin_node/lib_dev/evm_events_follower.ml | 13 ++++++++----- etherlink/bin_node/lib_dev/evm_websocket.ml | 12 +++++++----- .../bin_node/lib_dev/signals_publisher.ml | 12 +++++++----- etherlink/bin_node/lib_dev/tx_pool.ml | 12 +++++++----- src/lib_injector/injector_functor.ml | 7 ++++--- src/lib_shell/block_validator.ml | 18 +++++++++++++----- src/lib_shell/chain_validator.ml | 6 +++--- src/lib_shell/peer_validator.ml | 18 +++++++++--------- src/lib_shell/prevalidator.ml | 7 ++++--- src/lib_smart_rollup_node/batcher.ml | 12 +++++++----- .../dal_injection_queue.ml | 12 +++++++----- src/lib_smart_rollup_node/publisher.ml | 12 +++++++----- .../refutation_coordinator.ml | 12 +++++++----- src/lib_smart_rollup_node/refutation_player.ml | 12 +++++++----- 17 files changed, 120 insertions(+), 83 deletions(-) diff --git a/etherlink/bin_node/lib_dev/block_producer.ml b/etherlink/bin_node/lib_dev/block_producer.ml index c1d8b2d53be1..5039be27c8f6 100644 --- a/etherlink/bin_node/lib_dev/block_producer.ml +++ b/etherlink/bin_node/lib_dev/block_producer.ml @@ -280,9 +280,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let state = Worker.state w in match request with | Request.Produce_block (with_delayed_transactions, timestamp, force) -> @@ -309,11 +311,11 @@ module Handlers = struct let on_launch _w () (parameters : Types.parameters) = Lwt_result_syntax.return parameters - let on_error (type a b) _w _st (_r : (a, b) Request.t) (_errs : b) : + let on_error (type a b) _w _st (_r : (a, b) Request.t) _ (_errs : b) : unit tzresult Lwt.t = Lwt_result_syntax.return_unit - let on_completion _ _ _ _ = Lwt.return_unit + let on_completion _ _ _ _ _ = Lwt.return_unit let on_no_request _ = Lwt.return_unit diff --git a/etherlink/bin_node/lib_dev/blueprints_publisher.ml b/etherlink/bin_node/lib_dev/blueprints_publisher.ml index b746d3fc27c5..6229e1e6127d 100644 --- a/etherlink/bin_node/lib_dev/blueprints_publisher.ml +++ b/etherlink/bin_node/lib_dev/blueprints_publisher.ml @@ -282,8 +282,11 @@ module Handlers = struct let on_request : type r request_error. - self -> (r, request_error) Request.t -> (r, request_error) result Lwt.t = - fun self request -> + self -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun self request _ -> let open Lwt_result_syntax in match request with | Publish {level; payload} -> @@ -309,15 +312,15 @@ module Handlers = struct unlock the transaction pool in case it was locked. *) Tx_pool.unlock_transactions ()) - let on_completion (type a err) _self (_r : (a, err) Request.t) (_res : a) _st - = + let on_completion (type a err) _self (_r : (a, err) Request.t) _ (_res : a) + _st = Lwt_syntax.return_unit let on_no_request _self = Lwt.return_unit let on_close _self = Lwt.return_unit - let on_error (type a b) _w _st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w _st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let request_view = Request.view r in diff --git a/etherlink/bin_node/lib_dev/evm_context.ml b/etherlink/bin_node/lib_dev/evm_context.ml index f0074b180c52..0c8433f4b66d 100644 --- a/etherlink/bin_node/lib_dev/evm_context.ml +++ b/etherlink/bin_node/lib_dev/evm_context.ml @@ -1589,8 +1589,11 @@ module Handlers = struct let on_request : type r request_error. - self -> (r, request_error) Request.t -> (r, request_error) result Lwt.t = - fun self request -> + self -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun self request _ -> let open Lwt_result_syntax in match request with | Apply_evm_events {finalized_level; events} -> @@ -1648,8 +1651,8 @@ module Handlers = struct ctxt blueprint_with_events - let on_completion (type a err) _self (_r : (a, err) Request.t) (_res : a) _st - = + let on_completion (type a err) _self (_r : (a, err) Request.t) _ (_res : a) + _st = Lwt_syntax.return_unit let on_no_request _self = Lwt.return_unit @@ -1671,7 +1674,7 @@ module Handlers = struct | Potential_observer_reorg _ -> Eq end - let on_error (type a b) _self _st (req : (a, b) Request.t) (errs : b) : + let on_error (type a b) _self _st (req : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in match (req, errs) with diff --git a/etherlink/bin_node/lib_dev/evm_events_follower.ml b/etherlink/bin_node/lib_dev/evm_events_follower.ml index 680d1f7400b9..3063a1a12636 100644 --- a/etherlink/bin_node/lib_dev/evm_events_follower.ml +++ b/etherlink/bin_node/lib_dev/evm_events_follower.ml @@ -255,9 +255,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun worker request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun worker request _ -> match request with | Request.New_rollup_node_block rollup_head -> protect @@ fun () -> new_rollup_block worker rollup_head @@ -280,9 +282,10 @@ module Handlers = struct worker -> Tezos_base.Worker_types.request_status -> (r, request_error) Request.t -> + Data_encoding.json option -> request_error -> unit tzresult Lwt.t = - fun _w _ req errs -> + fun _w _ req _ errs -> let open Lwt_result_syntax in match req with | Request.New_rollup_node_block _ -> @@ -300,7 +303,7 @@ module Handlers = struct in return_unit - let on_completion _ _ _ _ = Lwt.return_unit + let on_completion _ _ _ _ _ = Lwt.return_unit let on_no_request _ = Lwt.return_unit diff --git a/etherlink/bin_node/lib_dev/evm_websocket.ml b/etherlink/bin_node/lib_dev/evm_websocket.ml index bb3389a0eb2f..87f343c6ffc6 100644 --- a/etherlink/bin_node/lib_dev/evm_websocket.ml +++ b/etherlink/bin_node/lib_dev/evm_websocket.ml @@ -356,9 +356,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun worker request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun worker request _ -> match request with | Request.Frame fr -> protect @@ fun () -> on_frame worker fr |> Lwt_result.ok @@ -405,7 +407,7 @@ module Handlers = struct in return state - let on_error (type a b) _w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let request_view = Request.view r in @@ -415,7 +417,7 @@ module Handlers = struct in match r with Request.Frame _ -> emit_and_continue errs - let on_completion _w r _ st = + let on_completion _w r _ _ st = match Request.view r with | Request.View (Frame _) -> Event.(emit request_completed_debug) (Request.view r, st) diff --git a/etherlink/bin_node/lib_dev/signals_publisher.ml b/etherlink/bin_node/lib_dev/signals_publisher.ml index 6072d56c659b..6674c18fd546 100644 --- a/etherlink/bin_node/lib_dev/signals_publisher.ml +++ b/etherlink/bin_node/lib_dev/signals_publisher.ml @@ -179,9 +179,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> match request with | New_rollup_node_block {finalized_level} -> protect @@ fun () -> Worker.new_rollup_block w finalized_level @@ -191,11 +193,11 @@ module Handlers = struct let on_launch _w () (parameters : Types.parameters) = Types.of_parameters parameters - let on_error (type a b) _w _st (_r : (a, b) Request.t) (_errs : b) : + let on_error (type a b) _w _st (_r : (a, b) Request.t) _ (_errs : b) : unit tzresult Lwt.t = Lwt_result_syntax.return_unit - let on_completion _ _ _ _ = Lwt.return_unit + let on_completion _ _ _ _ _ = Lwt.return_unit let on_no_request _ = Lwt.return_unit diff --git a/etherlink/bin_node/lib_dev/tx_pool.ml b/etherlink/bin_node/lib_dev/tx_pool.ml index d2e922325248..3e8ac69c8c6f 100644 --- a/etherlink/bin_node/lib_dev/tx_pool.ml +++ b/etherlink/bin_node/lib_dev/tx_pool.ml @@ -726,9 +726,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let open Lwt_result_syntax in let state = Worker.state w in match request with @@ -790,11 +792,11 @@ module Handlers = struct in Lwt_result_syntax.return state - let on_error (type a b) _w _st (_r : (a, b) Request.t) (_errs : b) : + let on_error (type a b) _w _st (_r : (a, b) Request.t) _ (_errs : b) : unit tzresult Lwt.t = Lwt_result_syntax.return_unit - let on_completion _ _ _ _ = Lwt.return_unit + let on_completion _ _ _ _ _ = Lwt.return_unit let on_no_request _ = Lwt.return_unit diff --git a/src/lib_injector/injector_functor.ml b/src/lib_injector/injector_functor.ml index 630a39fcc9aa..60826f893658 100644 --- a/src/lib_injector/injector_functor.ml +++ b/src/lib_injector/injector_functor.ml @@ -1280,8 +1280,9 @@ module Make (Parameters : PARAMETERS) = struct type r request_error. worker -> (r, request_error) Request.t -> + Data_encoding.json option -> (r, request_error) result Lwt.t = - fun w request -> + fun w request _ -> let open Lwt_result_syntax in let state = Worker.state w in match request with @@ -1326,7 +1327,7 @@ module Make (Parameters : PARAMETERS) = struct strategy tags - let on_error (type a b) w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let state = Worker.state w in @@ -1340,7 +1341,7 @@ module Make (Parameters : PARAMETERS) = struct | Request.Inject -> emit_and_return_errors errs | Request.Clear _ -> emit_and_return_errors errs - let on_completion w r _ st = + let on_completion w r _ _ st = let state = Worker.state w in Event.(emit2 request_completed_debug) state (Request.view r) st diff --git a/src/lib_shell/block_validator.ml b/src/lib_shell/block_validator.ml index aaf5bac63d77..59f49796f3b7 100644 --- a/src/lib_shell/block_validator.ml +++ b/src/lib_shell/block_validator.ml @@ -518,8 +518,11 @@ let metrics = Shell_metrics.Block_validator.init Name.base let on_request : type r request_error. - t -> (r, request_error) Request.t -> (r, request_error) result Lwt.t = - fun w r -> + t -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w r _ -> Prometheus.Counter.inc_one metrics.worker_counters.worker_request_count ; match r with | Request.Request_validation r -> @@ -562,7 +565,7 @@ let on_launch _ _ (limits, start_testchain, db, validation_process) : inapplicable_blocks_after_validation; } -let on_error (type a b) (_w : t) st (r : (a, b) Request.t) (errs : b) = +let on_error (type a b) (_w : t) st (r : (a, b) Request.t) _ (errs : b) = let open Lwt_syntax in Prometheus.Counter.inc_one metrics.worker_counters.worker_error_count ; match r with @@ -595,8 +598,13 @@ let check_and_quit_on_context_errors errors = let on_completion : type a b. - t -> (a, b) Request.t -> a -> Worker_types.request_status -> unit Lwt.t = - fun _w request v st -> + t -> + (a, b) Request.t -> + Data_encoding.json option -> + a -> + Worker_types.request_status -> + unit Lwt.t = + fun _w request _ v st -> let open Lwt_syntax in Prometheus.Counter.inc_one metrics.worker_counters.worker_completion_count ; match (request, v) with diff --git a/src/lib_shell/chain_validator.ml b/src/lib_shell/chain_validator.ml index 2975a65e4297..cc935f8087cb 100644 --- a/src/lib_shell/chain_validator.ml +++ b/src/lib_shell/chain_validator.ml @@ -603,7 +603,7 @@ let on_disconnection w peer_id = return_unit) let on_request (type a b) w start_testchain active_chains spawn_child - (req : (a, b) Request.t) : (a, b) result Lwt.t = + (req : (a, b) Request.t) _ : (a, b) result Lwt.t = let nv = Worker.state w in Prometheus.Counter.inc_one nv.parameters.metrics.worker_counters.worker_request_count ; @@ -670,7 +670,7 @@ let collect_proto ~metrics (chain_store, block) = Lwt.return_unit) (fun _ -> Lwt.return_unit) -let on_error (type a b) w st (request : (a, b) Request.t) (errs : b) : +let on_error (type a b) w st (request : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let nv = Worker.state w in @@ -705,7 +705,7 @@ let on_error (type a b) w st (request : (a, b) Request.t) (errs : b) : | Notify_head _ -> ( match errs with _ -> .) | Disconnection _ -> ( match errs with _ -> .) -let on_completion (type a b) w (req : (a, b) Request.t) (update : a) +let on_completion (type a b) w (req : (a, b) Request.t) _ (update : a) request_status = let open Lwt_syntax in let nv = Worker.state w in diff --git a/src/lib_shell/peer_validator.ml b/src/lib_shell/peer_validator.ml index eac665dbb06b..7c26f170237d 100644 --- a/src/lib_shell/peer_validator.ml +++ b/src/lib_shell/peer_validator.ml @@ -377,7 +377,7 @@ let on_no_request w = pv.peer_id ; Lwt.return_unit -let on_request (type a b) w (req : (a, b) Request.t) : (a, b) result Lwt.t = +let on_request (type a b) w (req : (a, b) Request.t) _ : (a, b) result Lwt.t = let open Lwt_syntax in let pv = Worker.state w in match req with @@ -400,7 +400,7 @@ let on_request (type a b) w (req : (a, b) Request.t) : (a, b) result Lwt.t = locator [@profiler.span_s {verbosity = Notice} ["New branch"]]) let on_completion (type a request_error) _w (r : (a, request_error) Request.t) _ - st = + _ st = (match r with | Request.New_head _ -> Prometheus.Counter.inc_one metrics.new_head_completed | Request.New_branch _ -> @@ -408,7 +408,7 @@ let on_completion (type a request_error) _w (r : (a, request_error) Request.t) _ Events.(emit request_completed) (Request.view r, st) -let on_error (type a b) w st (request : (a, b) Request.t) (err : b) : +let on_error (type a b) w st (request : (a, b) Request.t) _ (err : b) : unit tzresult Lwt.t = let open Lwt_syntax in let pv = Worker.state w in @@ -565,21 +565,21 @@ let on_launch _ name parameters : (_, launch_error) result Lwt.t = Lwt.return (Ok pv) let table = - let merge w (Worker.Any_request neu) old = + let merge w (Worker.Any_request (neu, metadata_neu)) old = let pv = Worker.state w in match neu with | Request.New_branch (locator, _) -> pv.last_advertised_head <- (locator.head_hash, locator.head_header) ; - Some (Worker.Any_request neu) + Some (Worker.Any_request (neu, metadata_neu)) | Request.New_head (hash, header) -> ( pv.last_advertised_head <- (hash, header) ; (* TODO penalize decreasing fitness *) match old with - | Some (Worker.Any_request (Request.New_branch _) as old) -> + | Some (Worker.Any_request (Request.New_branch _, _) as old) -> Some old (* ignore *) - | Some (Worker.Any_request (Request.New_head _)) -> - Some (Any_request neu) - | None -> Some (Any_request neu)) + | Some (Worker.Any_request (Request.New_head _, _)) -> + Some (Any_request (neu, metadata_neu)) + | None -> Some (Any_request (neu, metadata_neu))) in Worker.create_table (Dropbox {merge}) diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index 4435c871f052..c4a14e8c2e7c 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -1430,8 +1430,9 @@ module Make type r request_error. worker -> (r, request_error) Request.t -> + Data_encoding.json option -> (r, request_error) result Lwt.t = - fun w request -> + fun w request _ -> let open Lwt_result_syntax in Prometheus.Counter.inc_one metrics.worker_counters.worker_request_count ; let pv = Worker.state w in @@ -1624,7 +1625,7 @@ module Make (Operation_hash.Set.to_seq fetching) ; return pv - let on_error (type a b) _w st (request : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (request : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = Prometheus.Counter.inc_one metrics.worker_counters.worker_error_count ; let open Lwt_syntax in @@ -1645,7 +1646,7 @@ module Make let* () = Events.(emit request_failed) (request_view, st, errs) in Lwt.return_error errs - let on_completion _w r _ st = + let on_completion _w r _ _ st = Prometheus.Counter.inc_one metrics.worker_counters.worker_completion_count ; match Request.view r with | View (Inject _) | View (Ban _) | Request.View (Flush _) -> diff --git a/src/lib_smart_rollup_node/batcher.ml b/src/lib_smart_rollup_node/batcher.ml index e34982682334..a6c7206c5d41 100644 --- a/src/lib_smart_rollup_node/batcher.ml +++ b/src/lib_smart_rollup_node/batcher.ml @@ -298,9 +298,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let state = Worker.state w in match request with | Request.Register {order; messages; drop_duplicate} -> @@ -322,7 +324,7 @@ module Handlers = struct let state = init_batcher_state plugin node_ctxt in return state - let on_error (type a b) _w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let request_view = Request.view r in @@ -338,7 +340,7 @@ module Handlers = struct | Request.Clear_queues -> emit_and_return_errors errs | Request.Remove_messages _ -> emit_and_return_errors errs - let on_completion _w r _ st = + let on_completion _w r _ _ st = match Request.view r with | Request.View (Register _ | Produce_batches | Clear_queues | Remove_messages _) -> diff --git a/src/lib_smart_rollup_node/dal_injection_queue.ml b/src/lib_smart_rollup_node/dal_injection_queue.ml index d6eb7896228c..248c65f0e76c 100644 --- a/src/lib_smart_rollup_node/dal_injection_queue.ml +++ b/src/lib_smart_rollup_node/dal_injection_queue.ml @@ -483,9 +483,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let state = Worker.state w in match request with | Request.Register {message} -> @@ -500,7 +502,7 @@ module Handlers = struct let on_launch _w () Types.{node_ctxt; dal_node_ctxt} = init_dal_worker_state node_ctxt dal_node_ctxt - let on_error (type a b) _w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in match r with @@ -514,7 +516,7 @@ module Handlers = struct let*! () = Events.(emit request_failed) (Request.view r, st, errs) in return_unit - let on_completion _w r _ st = + let on_completion _w r _ _ st = match Request.view r with | Request.View (Register _ | Produce_dal_slots _ | Set_dal_slot_indices _) -> diff --git a/src/lib_smart_rollup_node/publisher.ml b/src/lib_smart_rollup_node/publisher.ml index b5ee32901af9..a34e0e935ba5 100644 --- a/src/lib_smart_rollup_node/publisher.ml +++ b/src/lib_smart_rollup_node/publisher.ml @@ -610,9 +610,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let state = Worker.state w in match request with | Request.Publish -> protect @@ fun () -> on_publish_commitments state @@ -623,7 +625,7 @@ module Handlers = struct let on_launch _w () Types.{node_ctxt} = Lwt_result.return node_ctxt - let on_error (type a b) _w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let request_view = Request.view r in @@ -638,7 +640,7 @@ module Handlers = struct | Request.Cement -> emit_and_return_errors errs | Request.Execute_outbox -> emit_and_return_errors errs - let on_completion _w r _ st = + let on_completion _w r _ _ st = Commitment_event.Publisher.request_completed (Request.view r) st let on_no_request _ = Lwt.return_unit diff --git a/src/lib_smart_rollup_node/refutation_coordinator.ml b/src/lib_smart_rollup_node/refutation_coordinator.ml index 1aacbb3ca00c..ed8131404605 100644 --- a/src/lib_smart_rollup_node/refutation_coordinator.ml +++ b/src/lib_smart_rollup_node/refutation_coordinator.ml @@ -155,9 +155,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let state = Worker.state w in match request with | Request.Process b -> protect @@ fun () -> on_process b state @@ -167,7 +169,7 @@ module Handlers = struct let on_launch _w () node_ctxt = Lwt_result.return {node_ctxt; pending_opponents = Pkh_table.create 5} - let on_error (type a b) _w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let request_view = Request.view r in @@ -179,7 +181,7 @@ module Handlers = struct in match r with Request.Process _ -> emit_and_return_errors errs - let on_completion _w r _ st = + let on_completion _w r _ _ st = Refutation_game_event.Coordinator.request_completed (Request.view r) st let on_no_request _ = Lwt.return_unit diff --git a/src/lib_smart_rollup_node/refutation_player.ml b/src/lib_smart_rollup_node/refutation_player.ml index d3474672b0dd..70a7860ccbe6 100644 --- a/src/lib_smart_rollup_node/refutation_player.ml +++ b/src/lib_smart_rollup_node/refutation_player.ml @@ -71,9 +71,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let state = Worker.state w in match request with | Request.Play game -> protect @@ fun () -> on_play game state @@ -102,7 +104,7 @@ module Handlers = struct last_move_cache = None; } - let on_error (type a b) _w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let request_view = Request.view r in @@ -116,7 +118,7 @@ module Handlers = struct | Request.Play _ -> emit_and_return_errors errs | Request.Play_opening _ -> emit_and_return_errors errs - let on_completion _w r _ st = + let on_completion _w r _ _ st = Refutation_game_event.Player.request_completed (Request.view r) st let on_no_request _ = Lwt.return_unit -- GitLab