diff --git a/etherlink/bin_node/lib_dev/block_producer.ml b/etherlink/bin_node/lib_dev/block_producer.ml index c1d8b2d53be1d62e81a761bdf365efebef165a45..5039be27c8f6d2161f9f1b05a17c2ec596ad3bd2 100644 --- a/etherlink/bin_node/lib_dev/block_producer.ml +++ b/etherlink/bin_node/lib_dev/block_producer.ml @@ -280,9 +280,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let state = Worker.state w in match request with | Request.Produce_block (with_delayed_transactions, timestamp, force) -> @@ -309,11 +311,11 @@ module Handlers = struct let on_launch _w () (parameters : Types.parameters) = Lwt_result_syntax.return parameters - let on_error (type a b) _w _st (_r : (a, b) Request.t) (_errs : b) : + let on_error (type a b) _w _st (_r : (a, b) Request.t) _ (_errs : b) : unit tzresult Lwt.t = Lwt_result_syntax.return_unit - let on_completion _ _ _ _ = Lwt.return_unit + let on_completion _ _ _ _ _ = Lwt.return_unit let on_no_request _ = Lwt.return_unit diff --git a/etherlink/bin_node/lib_dev/blueprints_publisher.ml b/etherlink/bin_node/lib_dev/blueprints_publisher.ml index b746d3fc27c57611c41845c5efa8a6960fc86149..6229e1e6127d8fed2af8a88fac611f5817807fe1 100644 --- a/etherlink/bin_node/lib_dev/blueprints_publisher.ml +++ b/etherlink/bin_node/lib_dev/blueprints_publisher.ml @@ -282,8 +282,11 @@ module Handlers = struct let on_request : type r request_error. - self -> (r, request_error) Request.t -> (r, request_error) result Lwt.t = - fun self request -> + self -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun self request _ -> let open Lwt_result_syntax in match request with | Publish {level; payload} -> @@ -309,15 +312,15 @@ module Handlers = struct unlock the transaction pool in case it was locked. *) Tx_pool.unlock_transactions ()) - let on_completion (type a err) _self (_r : (a, err) Request.t) (_res : a) _st - = + let on_completion (type a err) _self (_r : (a, err) Request.t) _ (_res : a) + _st = Lwt_syntax.return_unit let on_no_request _self = Lwt.return_unit let on_close _self = Lwt.return_unit - let on_error (type a b) _w _st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w _st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let request_view = Request.view r in diff --git a/etherlink/bin_node/lib_dev/evm_context.ml b/etherlink/bin_node/lib_dev/evm_context.ml index f0074b180c529d356c172e423a04ad99652d0191..0c8433f4b66dcfe381d0269902be8f0584317a14 100644 --- a/etherlink/bin_node/lib_dev/evm_context.ml +++ b/etherlink/bin_node/lib_dev/evm_context.ml @@ -1589,8 +1589,11 @@ module Handlers = struct let on_request : type r request_error. - self -> (r, request_error) Request.t -> (r, request_error) result Lwt.t = - fun self request -> + self -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun self request _ -> let open Lwt_result_syntax in match request with | Apply_evm_events {finalized_level; events} -> @@ -1648,8 +1651,8 @@ module Handlers = struct ctxt blueprint_with_events - let on_completion (type a err) _self (_r : (a, err) Request.t) (_res : a) _st - = + let on_completion (type a err) _self (_r : (a, err) Request.t) _ (_res : a) + _st = Lwt_syntax.return_unit let on_no_request _self = Lwt.return_unit @@ -1671,7 +1674,7 @@ module Handlers = struct | Potential_observer_reorg _ -> Eq end - let on_error (type a b) _self _st (req : (a, b) Request.t) (errs : b) : + let on_error (type a b) _self _st (req : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in match (req, errs) with diff --git a/etherlink/bin_node/lib_dev/evm_events_follower.ml b/etherlink/bin_node/lib_dev/evm_events_follower.ml index 680d1f7400b9d42c72a7cfff182933e4483dafb1..3063a1a126365641d601340aa4d294b5c307cbae 100644 --- a/etherlink/bin_node/lib_dev/evm_events_follower.ml +++ b/etherlink/bin_node/lib_dev/evm_events_follower.ml @@ -255,9 +255,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun worker request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun worker request _ -> match request with | Request.New_rollup_node_block rollup_head -> protect @@ fun () -> new_rollup_block worker rollup_head @@ -280,9 +282,10 @@ module Handlers = struct worker -> Tezos_base.Worker_types.request_status -> (r, request_error) Request.t -> + Data_encoding.json option -> request_error -> unit tzresult Lwt.t = - fun _w _ req errs -> + fun _w _ req _ errs -> let open Lwt_result_syntax in match req with | Request.New_rollup_node_block _ -> @@ -300,7 +303,7 @@ module Handlers = struct in return_unit - let on_completion _ _ _ _ = Lwt.return_unit + let on_completion _ _ _ _ _ = Lwt.return_unit let on_no_request _ = Lwt.return_unit diff --git a/etherlink/bin_node/lib_dev/evm_websocket.ml b/etherlink/bin_node/lib_dev/evm_websocket.ml index bb3389a0eb2f2b8d7ccb55186691f34cfd8f193b..87f343c6ffc61b3be3106ed563e719e1e6602aa2 100644 --- a/etherlink/bin_node/lib_dev/evm_websocket.ml +++ b/etherlink/bin_node/lib_dev/evm_websocket.ml @@ -356,9 +356,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun worker request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun worker request _ -> match request with | Request.Frame fr -> protect @@ fun () -> on_frame worker fr |> Lwt_result.ok @@ -405,7 +407,7 @@ module Handlers = struct in return state - let on_error (type a b) _w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let request_view = Request.view r in @@ -415,7 +417,7 @@ module Handlers = struct in match r with Request.Frame _ -> emit_and_continue errs - let on_completion _w r _ st = + let on_completion _w r _ _ st = match Request.view r with | Request.View (Frame _) -> Event.(emit request_completed_debug) (Request.view r, st) diff --git a/etherlink/bin_node/lib_dev/signals_publisher.ml b/etherlink/bin_node/lib_dev/signals_publisher.ml index 6072d56c659bac54b8a01aa3cc06f4f5133d30e6..6674c18fd546ecc866f2eab08f9d443ea4060b99 100644 --- a/etherlink/bin_node/lib_dev/signals_publisher.ml +++ b/etherlink/bin_node/lib_dev/signals_publisher.ml @@ -179,9 +179,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> match request with | New_rollup_node_block {finalized_level} -> protect @@ fun () -> Worker.new_rollup_block w finalized_level @@ -191,11 +193,11 @@ module Handlers = struct let on_launch _w () (parameters : Types.parameters) = Types.of_parameters parameters - let on_error (type a b) _w _st (_r : (a, b) Request.t) (_errs : b) : + let on_error (type a b) _w _st (_r : (a, b) Request.t) _ (_errs : b) : unit tzresult Lwt.t = Lwt_result_syntax.return_unit - let on_completion _ _ _ _ = Lwt.return_unit + let on_completion _ _ _ _ _ = Lwt.return_unit let on_no_request _ = Lwt.return_unit diff --git a/etherlink/bin_node/lib_dev/tx_pool.ml b/etherlink/bin_node/lib_dev/tx_pool.ml index d2e922325248863e920a32d051604e74d49d0391..3e8ac69c8c6f0063709691e12b191005395a30dd 100644 --- a/etherlink/bin_node/lib_dev/tx_pool.ml +++ b/etherlink/bin_node/lib_dev/tx_pool.ml @@ -726,9 +726,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let open Lwt_result_syntax in let state = Worker.state w in match request with @@ -790,11 +792,11 @@ module Handlers = struct in Lwt_result_syntax.return state - let on_error (type a b) _w _st (_r : (a, b) Request.t) (_errs : b) : + let on_error (type a b) _w _st (_r : (a, b) Request.t) _ (_errs : b) : unit tzresult Lwt.t = Lwt_result_syntax.return_unit - let on_completion _ _ _ _ = Lwt.return_unit + let on_completion _ _ _ _ _ = Lwt.return_unit let on_no_request _ = Lwt.return_unit diff --git a/src/lib_injector/injector_functor.ml b/src/lib_injector/injector_functor.ml index 630a39fcc9aa36ce221003d7eed13fd97cd81649..60826f893658eb79042e29ee38589cb8993f4c8d 100644 --- a/src/lib_injector/injector_functor.ml +++ b/src/lib_injector/injector_functor.ml @@ -1280,8 +1280,9 @@ module Make (Parameters : PARAMETERS) = struct type r request_error. worker -> (r, request_error) Request.t -> + Data_encoding.json option -> (r, request_error) result Lwt.t = - fun w request -> + fun w request _ -> let open Lwt_result_syntax in let state = Worker.state w in match request with @@ -1326,7 +1327,7 @@ module Make (Parameters : PARAMETERS) = struct strategy tags - let on_error (type a b) w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let state = Worker.state w in @@ -1340,7 +1341,7 @@ module Make (Parameters : PARAMETERS) = struct | Request.Inject -> emit_and_return_errors errs | Request.Clear _ -> emit_and_return_errors errs - let on_completion w r _ st = + let on_completion w r _ _ st = let state = Worker.state w in Event.(emit2 request_completed_debug) state (Request.view r) st diff --git a/src/lib_shell/block_validator.ml b/src/lib_shell/block_validator.ml index aaf5bac63d77b3c7b7c5251b66db384eb0912343..59f49796f3b704e02e137d7fcdd67d7c263081da 100644 --- a/src/lib_shell/block_validator.ml +++ b/src/lib_shell/block_validator.ml @@ -518,8 +518,11 @@ let metrics = Shell_metrics.Block_validator.init Name.base let on_request : type r request_error. - t -> (r, request_error) Request.t -> (r, request_error) result Lwt.t = - fun w r -> + t -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w r _ -> Prometheus.Counter.inc_one metrics.worker_counters.worker_request_count ; match r with | Request.Request_validation r -> @@ -562,7 +565,7 @@ let on_launch _ _ (limits, start_testchain, db, validation_process) : inapplicable_blocks_after_validation; } -let on_error (type a b) (_w : t) st (r : (a, b) Request.t) (errs : b) = +let on_error (type a b) (_w : t) st (r : (a, b) Request.t) _ (errs : b) = let open Lwt_syntax in Prometheus.Counter.inc_one metrics.worker_counters.worker_error_count ; match r with @@ -595,8 +598,13 @@ let check_and_quit_on_context_errors errors = let on_completion : type a b. - t -> (a, b) Request.t -> a -> Worker_types.request_status -> unit Lwt.t = - fun _w request v st -> + t -> + (a, b) Request.t -> + Data_encoding.json option -> + a -> + Worker_types.request_status -> + unit Lwt.t = + fun _w request _ v st -> let open Lwt_syntax in Prometheus.Counter.inc_one metrics.worker_counters.worker_completion_count ; match (request, v) with diff --git a/src/lib_shell/chain_validator.ml b/src/lib_shell/chain_validator.ml index 2975a65e429795578166b606eab9e1683bb079e8..cc935f8087cb801b76370bdebaffd0b8efd11e52 100644 --- a/src/lib_shell/chain_validator.ml +++ b/src/lib_shell/chain_validator.ml @@ -603,7 +603,7 @@ let on_disconnection w peer_id = return_unit) let on_request (type a b) w start_testchain active_chains spawn_child - (req : (a, b) Request.t) : (a, b) result Lwt.t = + (req : (a, b) Request.t) _ : (a, b) result Lwt.t = let nv = Worker.state w in Prometheus.Counter.inc_one nv.parameters.metrics.worker_counters.worker_request_count ; @@ -670,7 +670,7 @@ let collect_proto ~metrics (chain_store, block) = Lwt.return_unit) (fun _ -> Lwt.return_unit) -let on_error (type a b) w st (request : (a, b) Request.t) (errs : b) : +let on_error (type a b) w st (request : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let nv = Worker.state w in @@ -705,7 +705,7 @@ let on_error (type a b) w st (request : (a, b) Request.t) (errs : b) : | Notify_head _ -> ( match errs with _ -> .) | Disconnection _ -> ( match errs with _ -> .) -let on_completion (type a b) w (req : (a, b) Request.t) (update : a) +let on_completion (type a b) w (req : (a, b) Request.t) _ (update : a) request_status = let open Lwt_syntax in let nv = Worker.state w in diff --git a/src/lib_shell/peer_validator.ml b/src/lib_shell/peer_validator.ml index eac665dbb06b0f642442e2aa74d4079669c9e97a..7c26f170237d3b144269f2acb29a83da714026f0 100644 --- a/src/lib_shell/peer_validator.ml +++ b/src/lib_shell/peer_validator.ml @@ -377,7 +377,7 @@ let on_no_request w = pv.peer_id ; Lwt.return_unit -let on_request (type a b) w (req : (a, b) Request.t) : (a, b) result Lwt.t = +let on_request (type a b) w (req : (a, b) Request.t) _ : (a, b) result Lwt.t = let open Lwt_syntax in let pv = Worker.state w in match req with @@ -400,7 +400,7 @@ let on_request (type a b) w (req : (a, b) Request.t) : (a, b) result Lwt.t = locator [@profiler.span_s {verbosity = Notice} ["New branch"]]) let on_completion (type a request_error) _w (r : (a, request_error) Request.t) _ - st = + _ st = (match r with | Request.New_head _ -> Prometheus.Counter.inc_one metrics.new_head_completed | Request.New_branch _ -> @@ -408,7 +408,7 @@ let on_completion (type a request_error) _w (r : (a, request_error) Request.t) _ Events.(emit request_completed) (Request.view r, st) -let on_error (type a b) w st (request : (a, b) Request.t) (err : b) : +let on_error (type a b) w st (request : (a, b) Request.t) _ (err : b) : unit tzresult Lwt.t = let open Lwt_syntax in let pv = Worker.state w in @@ -565,21 +565,21 @@ let on_launch _ name parameters : (_, launch_error) result Lwt.t = Lwt.return (Ok pv) let table = - let merge w (Worker.Any_request neu) old = + let merge w (Worker.Any_request (neu, metadata_neu)) old = let pv = Worker.state w in match neu with | Request.New_branch (locator, _) -> pv.last_advertised_head <- (locator.head_hash, locator.head_header) ; - Some (Worker.Any_request neu) + Some (Worker.Any_request (neu, metadata_neu)) | Request.New_head (hash, header) -> ( pv.last_advertised_head <- (hash, header) ; (* TODO penalize decreasing fitness *) match old with - | Some (Worker.Any_request (Request.New_branch _) as old) -> + | Some (Worker.Any_request (Request.New_branch _, _) as old) -> Some old (* ignore *) - | Some (Worker.Any_request (Request.New_head _)) -> - Some (Any_request neu) - | None -> Some (Any_request neu)) + | Some (Worker.Any_request (Request.New_head _, _)) -> + Some (Any_request (neu, metadata_neu)) + | None -> Some (Any_request (neu, metadata_neu))) in Worker.create_table (Dropbox {merge}) diff --git a/src/lib_shell/prevalidator.ml b/src/lib_shell/prevalidator.ml index 4435c871f052faff62eb4f9e209614b9a0d24087..c4a14e8c2e7cb5c641a452a91b7bfc20c663a70d 100644 --- a/src/lib_shell/prevalidator.ml +++ b/src/lib_shell/prevalidator.ml @@ -1430,8 +1430,9 @@ module Make type r request_error. worker -> (r, request_error) Request.t -> + Data_encoding.json option -> (r, request_error) result Lwt.t = - fun w request -> + fun w request _ -> let open Lwt_result_syntax in Prometheus.Counter.inc_one metrics.worker_counters.worker_request_count ; let pv = Worker.state w in @@ -1624,7 +1625,7 @@ module Make (Operation_hash.Set.to_seq fetching) ; return pv - let on_error (type a b) _w st (request : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (request : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = Prometheus.Counter.inc_one metrics.worker_counters.worker_error_count ; let open Lwt_syntax in @@ -1645,7 +1646,7 @@ module Make let* () = Events.(emit request_failed) (request_view, st, errs) in Lwt.return_error errs - let on_completion _w r _ st = + let on_completion _w r _ _ st = Prometheus.Counter.inc_one metrics.worker_counters.worker_completion_count ; match Request.view r with | View (Inject _) | View (Ban _) | Request.View (Flush _) -> diff --git a/src/lib_smart_rollup_node/batcher.ml b/src/lib_smart_rollup_node/batcher.ml index e34982682334e50e8693349533a625c2c5a9431a..a6c7206c5d410bc71c826ba791bb2c1816e8a883 100644 --- a/src/lib_smart_rollup_node/batcher.ml +++ b/src/lib_smart_rollup_node/batcher.ml @@ -298,9 +298,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let state = Worker.state w in match request with | Request.Register {order; messages; drop_duplicate} -> @@ -322,7 +324,7 @@ module Handlers = struct let state = init_batcher_state plugin node_ctxt in return state - let on_error (type a b) _w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let request_view = Request.view r in @@ -338,7 +340,7 @@ module Handlers = struct | Request.Clear_queues -> emit_and_return_errors errs | Request.Remove_messages _ -> emit_and_return_errors errs - let on_completion _w r _ st = + let on_completion _w r _ _ st = match Request.view r with | Request.View (Register _ | Produce_batches | Clear_queues | Remove_messages _) -> diff --git a/src/lib_smart_rollup_node/dal_injection_queue.ml b/src/lib_smart_rollup_node/dal_injection_queue.ml index d6eb7896228ccb4bdd356010bddbb79edcc80bd6..248c65f0e76c2680522b68266e99120d7bfada80 100644 --- a/src/lib_smart_rollup_node/dal_injection_queue.ml +++ b/src/lib_smart_rollup_node/dal_injection_queue.ml @@ -483,9 +483,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let state = Worker.state w in match request with | Request.Register {message} -> @@ -500,7 +502,7 @@ module Handlers = struct let on_launch _w () Types.{node_ctxt; dal_node_ctxt} = init_dal_worker_state node_ctxt dal_node_ctxt - let on_error (type a b) _w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in match r with @@ -514,7 +516,7 @@ module Handlers = struct let*! () = Events.(emit request_failed) (Request.view r, st, errs) in return_unit - let on_completion _w r _ st = + let on_completion _w r _ _ st = match Request.view r with | Request.View (Register _ | Produce_dal_slots _ | Set_dal_slot_indices _) -> diff --git a/src/lib_smart_rollup_node/publisher.ml b/src/lib_smart_rollup_node/publisher.ml index b5ee32901af92b8a5bfc1bd8a8c4e60e4dde2ebc..a34e0e935ba5c84bf2dd072ba80d26b672da8082 100644 --- a/src/lib_smart_rollup_node/publisher.ml +++ b/src/lib_smart_rollup_node/publisher.ml @@ -610,9 +610,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let state = Worker.state w in match request with | Request.Publish -> protect @@ fun () -> on_publish_commitments state @@ -623,7 +625,7 @@ module Handlers = struct let on_launch _w () Types.{node_ctxt} = Lwt_result.return node_ctxt - let on_error (type a b) _w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let request_view = Request.view r in @@ -638,7 +640,7 @@ module Handlers = struct | Request.Cement -> emit_and_return_errors errs | Request.Execute_outbox -> emit_and_return_errors errs - let on_completion _w r _ st = + let on_completion _w r _ _ st = Commitment_event.Publisher.request_completed (Request.view r) st let on_no_request _ = Lwt.return_unit diff --git a/src/lib_smart_rollup_node/refutation_coordinator.ml b/src/lib_smart_rollup_node/refutation_coordinator.ml index 1aacbb3ca00c4d873d998924c4b1c47b03f02339..ed813140460527b5433dbc673b11540f9dc410c3 100644 --- a/src/lib_smart_rollup_node/refutation_coordinator.ml +++ b/src/lib_smart_rollup_node/refutation_coordinator.ml @@ -155,9 +155,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let state = Worker.state w in match request with | Request.Process b -> protect @@ fun () -> on_process b state @@ -167,7 +169,7 @@ module Handlers = struct let on_launch _w () node_ctxt = Lwt_result.return {node_ctxt; pending_opponents = Pkh_table.create 5} - let on_error (type a b) _w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let request_view = Request.view r in @@ -179,7 +181,7 @@ module Handlers = struct in match r with Request.Process _ -> emit_and_return_errors errs - let on_completion _w r _ st = + let on_completion _w r _ _ st = Refutation_game_event.Coordinator.request_completed (Request.view r) st let on_no_request _ = Lwt.return_unit diff --git a/src/lib_smart_rollup_node/refutation_player.ml b/src/lib_smart_rollup_node/refutation_player.ml index d3474672b0ddf2ada62ae4af24541717b74d082a..70a7860ccbe61b0020ef098e735a924c970ef703 100644 --- a/src/lib_smart_rollup_node/refutation_player.ml +++ b/src/lib_smart_rollup_node/refutation_player.ml @@ -71,9 +71,11 @@ module Handlers = struct let on_request : type r request_error. - worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun w request -> + worker -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun w request _ -> let state = Worker.state w in match request with | Request.Play game -> protect @@ fun () -> on_play game state @@ -102,7 +104,7 @@ module Handlers = struct last_move_cache = None; } - let on_error (type a b) _w st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) _w st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let request_view = Request.view r in @@ -116,7 +118,7 @@ module Handlers = struct | Request.Play _ -> emit_and_return_errors errs | Request.Play_opening _ -> emit_and_return_errors errs - let on_completion _w r _ st = + let on_completion _w r _ _ st = Refutation_game_event.Player.request_completed (Request.view r) st let on_no_request _ = Lwt.return_unit diff --git a/src/lib_workers/test/test_workers_unit.ml b/src/lib_workers/test/test_workers_unit.ml index 6850bfffc2bca975d8563e3431f7a0a24ad46b0d..360f2dcf143451cb16900f779529dc12aba711d5 100644 --- a/src/lib_workers/test/test_workers_unit.ml +++ b/src/lib_workers/test/test_workers_unit.ml @@ -45,9 +45,11 @@ let create_handlers (type a) ?on_completion () = let on_request : type r request_error. - self -> (r, request_error) Request.t -> (r, request_error) result Lwt.t - = - fun _w request -> + self -> + (r, request_error) Request.t -> + Data_encoding.json option -> + (r, request_error) result Lwt.t = + fun _w request _ -> let open Lwt_result_syntax in match request with | Request.RqA _i -> return_unit @@ -62,7 +64,7 @@ let create_handlers (type a) ?on_completion () = let open Lwt_result_syntax in return (ref []) - let on_error (type a b) w _st (r : (a, b) Request.t) (errs : b) : + let on_error (type a b) w _st (r : (a, b) Request.t) _ (errs : b) : unit tzresult Lwt.t = let open Lwt_result_syntax in let history = Worker.state w in @@ -76,7 +78,7 @@ let create_handlers (type a) ?on_completion () = history := "RqErr" :: !history ; return_unit) - let on_completion w r _ _st = + let on_completion w r _ _ _st = let open Lwt_syntax in let history = Worker.state w in let () = @@ -112,16 +114,18 @@ let create_bounded ?on_completion = let create_dropbox ?on_completion = let table = let open Worker in - let merge _w (Any_request neu) (old : _ option) = - let (Any_request r) = + let merge _w (Any_request (neu, neu_metadata)) (old : _ option) = + let (Any_request (r, metadata)) = match (neu, old) with - | RqA i1, Some (Any_request (RqA i2)) -> Any_request (RqA (i1 + i2)) - | (RqA _ as rqa), _ -> Any_request rqa - | _, Some (Any_request (RqA _ as rqa)) -> Any_request rqa - | RqB, _ -> Any_request neu - | RqErr _, _ -> Any_request neu + | RqA i1, Some (Any_request (RqA i2, _)) -> + Any_request (RqA (i1 + i2), neu_metadata) + | (RqA _ as rqa), _ -> Any_request (rqa, neu_metadata) + | _, Some (Any_request ((RqA _ as rqa), metadata)) -> + Any_request (rqa, metadata) + | RqB, _ -> Any_request (neu, neu_metadata) + | RqErr _, _ -> Any_request (neu, neu_metadata) in - Some (Worker.Any_request r) + Some (Worker.Any_request (r, metadata)) in Worker.create_table (Dropbox {merge}) in diff --git a/src/lib_workers/worker.ml b/src/lib_workers/worker.ml index ecc05caff66e94031ebb9d5349178696397004dd..7e3c3b6b27eb68c4d771c23e285e3322dc500911 100644 --- a/src/lib_workers/worker.ml +++ b/src/lib_workers/worker.ml @@ -56,6 +56,8 @@ module type T = sig | Request_error of 'a | Any of exn + type metadata = Data_encoding.json + (** Supported kinds of internal buffers. *) type _ buffer_kind = | Queue : infinite queue buffer_kind @@ -66,7 +68,7 @@ module type T = sig } -> dropbox buffer_kind - and any_request = Any_request : _ Request.t -> any_request + and any_request = Any_request : _ Request.t * metadata option -> any_request (** Create a table of workers. *) val create_table : 'kind buffer_kind -> 'kind table @@ -92,6 +94,7 @@ module type T = sig val on_request : self -> ('a, 'request_error) Request.t -> + metadata option -> ('a, 'request_error) result Lwt.t (** Called when no request has been made before the timeout, if @@ -111,6 +114,7 @@ module type T = sig self -> Worker_types.request_status -> ('a, 'request_error) Request.t -> + metadata option -> 'request_error -> unit tzresult Lwt.t @@ -119,6 +123,7 @@ module type T = sig val on_completion : self -> ('a, 'request_error) Request.t -> + metadata option -> 'a -> Worker_types.request_status -> unit Lwt.t @@ -143,9 +148,11 @@ module type T = sig module type BOX = sig type t - val put_request : t -> ('a, 'request_error) Request.t -> unit + val put_request : + ?metadata:metadata -> t -> ('a, 'request_error) Request.t -> unit val put_request_and_wait : + ?metadata:metadata -> t -> ('a, 'request_error) Request.t -> ('a, 'request_error message_error) result Lwt.t @@ -155,11 +162,13 @@ module type T = sig type 'a t val push_request_and_wait : + ?metadata:metadata -> 'q t -> ('a, 'request_error) Request.t -> ('a, 'request_error message_error) result Lwt.t - val push_request : 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t + val push_request : + ?metadata:metadata -> 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t val pending_requests : 'a t -> (Time.System.t * Request.view) list @@ -175,7 +184,10 @@ module type T = sig (** Adds a message to the queue immediately. *) val push_request_now : - infinite queue t -> ('a, 'request_error) Request.t -> unit + ?metadata:metadata -> + infinite queue t -> + ('a, 'request_error) Request.t -> + unit end (** Exports the canceler to allow cancellation of other tasks when this @@ -247,9 +259,13 @@ struct | Request_error of 'a | Any of exn + type metadata = Data_encoding.json + type message = | Message : - ('a, 'b) Request.t * ('a, 'b message_error) result Lwt.u option + ('a, 'b) Request.t + * metadata option + * ('a, 'b message_error) result Lwt.u option -> message type 'a queue @@ -269,7 +285,7 @@ struct } -> dropbox buffer_kind - and any_request = Any_request : _ Request.t -> any_request + and any_request = Any_request : _ Request.t * metadata option -> any_request and _ buffer = | Queue_buffer : @@ -306,27 +322,32 @@ struct | Worker_types.Launching _ | Running _ | Closing _ -> None | Closed (_, _, errs) -> errs - let queue_item ?u r = (Time.System.now (), Message (r, u)) + let queue_item ?u metadata r = (Time.System.now (), Message (r, metadata, u)) - let drop_request w merge message_box request = + let drop_request ?metadata w merge message_box request = try match match Lwt_dropbox.peek message_box with - | None -> merge w (Any_request request) None - | Some (_, Message (old, _)) -> + | None -> merge w (Any_request (request, metadata)) None + | Some (_, Message (old, metadata_old, _)) -> Lwt.ignore_result (Lwt_dropbox.take message_box) ; - merge w (Any_request request) (Some (Any_request old)) + merge + w + (Any_request (request, metadata)) + (Some (Any_request (old, metadata_old))) with | None -> () - | Some (Any_request neu) -> - Lwt_dropbox.put message_box (Time.System.now (), Message (neu, None)) + | Some (Any_request (neu, metadata)) -> + Lwt_dropbox.put + message_box + (Time.System.now (), Message (neu, metadata, None)) with Lwt_dropbox.Closed -> () - let drop_request_and_wait w message_box request = + let drop_request_and_wait ?metadata w message_box request = let t, u = Lwt.wait () in Lwt.catch (fun () -> - Lwt_dropbox.put message_box (queue_item ~u request) ; + Lwt_dropbox.put message_box (queue_item metadata ~u request) ; t) (function | Lwt_dropbox.Closed -> @@ -338,9 +359,11 @@ struct module type BOX = sig type t - val put_request : t -> ('a, 'request_error) Request.t -> unit + val put_request : + ?metadata:metadata -> t -> ('a, 'request_error) Request.t -> unit val put_request_and_wait : + ?metadata:metadata -> t -> ('a, 'request_error) Request.t -> ('a, 'request_error message_error) result Lwt.t @@ -350,11 +373,13 @@ struct type 'a t val push_request_and_wait : + ?metadata:metadata -> 'q t -> ('a, 'request_error) Request.t -> ('a, 'request_error message_error) result Lwt.t - val push_request : 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t + val push_request : + ?metadata:metadata -> 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t val pending_requests : 'a t -> (Time.System.t * Request.view) list @@ -362,23 +387,23 @@ struct end module Dropbox = struct - let put_request (w : dropbox t) request = + let put_request ?metadata (w : dropbox t) request = let (Dropbox {merge}) = w.table.buffer_kind in let (Dropbox_buffer message_box) = w.buffer in - drop_request w merge message_box request + drop_request ?metadata w merge message_box request - let put_request_and_wait (w : dropbox t) request = + let put_request_and_wait ?metadata (w : dropbox t) request = let (Dropbox_buffer message_box) = w.buffer in - drop_request_and_wait w message_box request + drop_request_and_wait ?metadata w message_box request end module Queue = struct - let push_request (type a) (w : a queue t) request = + let push_request ?metadata (type a) (w : a queue t) request = match w.buffer with | Queue_buffer message_queue -> if Lwt_pipe.Unbounded.is_closed message_queue then Lwt.return_false else ( - Lwt_pipe.Unbounded.push message_queue (queue_item request) ; + Lwt_pipe.Unbounded.push message_queue (queue_item metadata request) ; (* because pushing on an unbounded pipe is immediate, we return within Lwt explicitly for compatibility with the other case *) Lwt.return_true) @@ -387,21 +412,23 @@ struct else let open Lwt_syntax in let* () = - Lwt_pipe.Bounded.push message_queue (queue_item request) + Lwt_pipe.Bounded.push message_queue (queue_item metadata request) in Lwt.return_true - let push_request_now (w : infinite queue t) request = + let push_request_now ?metadata (w : infinite queue t) request = let (Queue_buffer message_queue) = w.buffer in if Lwt_pipe.Unbounded.is_closed message_queue then () - else Lwt_pipe.Unbounded.push message_queue (queue_item request) + else Lwt_pipe.Unbounded.push message_queue (queue_item metadata request) - let push_request_and_wait (type a) (w : a queue t) request = + let push_request_and_wait ?metadata (type a) (w : a queue t) request = match w.buffer with | Queue_buffer message_queue -> ( try let t, u = Lwt.wait () in - Lwt_pipe.Unbounded.push message_queue (queue_item ~u request) ; + Lwt_pipe.Unbounded.push + message_queue + (queue_item ~u metadata request) ; t with Lwt_pipe.Closed -> Lwt.return_error (Closed (extract_status_errors w))) @@ -409,7 +436,9 @@ struct let t, u = Lwt.wait () in Lwt.try_bind (fun () -> - Lwt_pipe.Bounded.push message_queue (queue_item ~u request)) + Lwt_pipe.Bounded.push + message_queue + (queue_item ~u metadata request)) (fun () -> t) (function | Lwt_pipe.Closed -> @@ -426,7 +455,9 @@ struct Lwt_pipe.Bounded.peek_all_now message_queue with Lwt_pipe.Closed -> [] in - List.map (function t, Message (req, _) -> (t, Request.view req)) peeked + List.map + (function t, Message (req, _, _) -> (t, Request.view req)) + peeked let pending_requests_length (type a) (w : a queue t) = let pipe_length (type a) (q : a buffer) = @@ -440,9 +471,9 @@ struct let close (type a) (w : a t) = let wakeup = function - | _, Message (_, Some u) -> + | _, Message (_, _, Some u) -> Lwt.wakeup_later u (Error (Closed (extract_status_errors w))) - | _, Message (_, None) -> () + | _, Message (_, _, None) -> () in let close_queue message_queue = let messages = Lwt_pipe.Bounded.pop_all_now message_queue in @@ -514,6 +545,7 @@ struct val on_request : self -> ('a, 'request_error) Request.t -> + metadata option -> ('a, 'request_error) result Lwt.t val on_no_request : self -> unit Lwt.t @@ -524,12 +556,14 @@ struct self -> Worker_types.request_status -> ('a, 'request_error) Request.t -> + metadata option -> 'request_error -> unit tzresult Lwt.t val on_completion : self -> ('a, 'request_error) Request.t -> + metadata option -> 'a -> Worker_types.request_status -> unit Lwt.t @@ -579,7 +613,7 @@ struct | Ok None -> let* () = Handlers.on_no_request w in loop () - | Ok (Some (pushed, Message (request, u))) -> ( + | Ok (Some (pushed, Message (request, metadata, u))) -> ( let current_request = Request.view request in let treated = Time.System.now () in w.current_request <- Some (pushed, treated, current_request) ; @@ -587,14 +621,16 @@ struct match u with | None -> ( let open Lwt_result_syntax in - let*! res = Handlers.on_request w request in + let*! res = Handlers.on_request w request metadata in match res with | Error err -> Lwt.return_error err | Ok res -> let completed = Time.System.now () in w.current_request <- None ; let status = Worker_types.{pushed; treated; completed} in - let*! () = Handlers.on_completion w request res status in + let*! () = + Handlers.on_completion w request metadata res status + in let*! () = Worker_events.(emit request_no_errors) (current_request, status) @@ -606,7 +642,7 @@ struct (Error). To that end, we treat it locally like a regular promise (which happens to carry a [result]) within the Lwt monad. *) - let* res = Handlers.on_request w request in + let* res = Handlers.on_request w request metadata in match res with | Error err -> Lwt.wakeup_later u (Error (Request_error err)) ; @@ -616,7 +652,9 @@ struct let completed = Time.System.now () in let status = Worker_types.{pushed; treated; completed} in w.current_request <- None ; - let* () = Handlers.on_completion w request res status in + let* () = + Handlers.on_completion w request metadata res status + in let* () = Worker_events.(emit request_no_errors) (current_request, status) @@ -635,6 +673,7 @@ struct w Worker_types.{pushed; treated; completed} request + metadata err | None -> assert false in diff --git a/src/lib_workers/worker.mli b/src/lib_workers/worker.mli index 9f28073b99bf3aa1aa405cb94af00d46b7847b4a..6dafd21d37bdea26b1d3cc770f2fc48a944abeb1 100644 --- a/src/lib_workers/worker.mli +++ b/src/lib_workers/worker.mli @@ -67,6 +67,8 @@ module type T = sig | Request_error of 'a | Any of exn + type metadata = Data_encoding.json + (** Supported kinds of internal buffers. *) type _ buffer_kind = | Queue : infinite queue buffer_kind @@ -77,7 +79,7 @@ module type T = sig } -> dropbox buffer_kind - and any_request = Any_request : _ Request.t -> any_request + and any_request = Any_request : _ Request.t * metadata option -> any_request (** Create a table of workers. *) val create_table : 'kind buffer_kind -> 'kind table @@ -104,6 +106,7 @@ module type T = sig val on_request : self -> ('a, 'request_error) Request.t -> + metadata option -> ('a, 'request_error) result Lwt.t (** Called when no request has been made before the timeout, if @@ -121,6 +124,7 @@ module type T = sig self -> Worker_types.request_status -> ('a, 'request_error) Request.t -> + metadata option -> 'request_error -> unit tzresult Lwt.t @@ -129,6 +133,7 @@ module type T = sig val on_completion : self -> ('a, 'request_error) Request.t -> + metadata option -> 'a -> Worker_types.request_status -> unit Lwt.t @@ -158,12 +163,14 @@ module type T = sig (** [put_request worker request] sends the [request] to the [worker]. If the [worker] dropbox is closed, then it is a no-op. *) - val put_request : t -> ('a, 'request_error) Request.t -> unit + val put_request : + ?metadata:metadata -> t -> ('a, 'request_error) Request.t -> unit (** [put_request_and_wait worker request] sends the [request] to the [worker] and waits for its completion. If the worker dropbox is closed, then it returns [Error Closed]. *) val put_request_and_wait : + ?metadata:metadata -> t -> ('a, 'request_error) Request.t -> ('a, 'request_error message_error) result Lwt.t @@ -178,6 +185,7 @@ module type T = sig then it returns [Error Closed]. If the buffer is a bounded queue and the underlying queue is full, the call is blocking. *) val push_request_and_wait : + ?metadata:metadata -> 'q t -> ('a, 'request_error) Request.t -> ('a, 'request_error message_error) result Lwt.t @@ -186,7 +194,8 @@ module type T = sig promise returned is [true] if the request was pushed successfuly or [false] if the worker queue is closed. If the buffer is a bounded queue and the underlying queue is full, the call is blocking. *) - val push_request : 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t + val push_request : + ?metadata:metadata -> 'q t -> ('a, 'request_error) Request.t -> bool Lwt.t val pending_requests : 'a t -> (Time.System.t * Request.view) list @@ -202,7 +211,10 @@ module type T = sig (** Adds a message to the queue immediately. *) val push_request_now : - infinite queue t -> ('a, 'request_error) Request.t -> unit + ?metadata:metadata -> + infinite queue t -> + ('a, 'request_error) Request.t -> + unit end (** Exports the canceler to allow cancellation of other tasks when this