diff --git a/etherlink/bin_node/lib_dev/block_producer.ml b/etherlink/bin_node/lib_dev/block_producer.ml index 3534650f565901c52b03b659ce1cbfa454b97265..bc26514213c4e86d498a760bedc53c1cceece3c3 100644 --- a/etherlink/bin_node/lib_dev/block_producer.ml +++ b/etherlink/bin_node/lib_dev/block_producer.ml @@ -83,6 +83,9 @@ module Request = struct (bool * Time.Protocol.t * bool) -> ([`Block_produced of int | `No_block], tztrace) t + let name : type a b. (a, b) t -> string = function + | Produce_block _ -> "Produce_block" + type view = View : _ t -> view let view (req : _ t) = View req @@ -110,7 +113,7 @@ module Request = struct let pp _ppf (View _) = () end -module Worker = Worker.MakeSingle (Name) (Request) (Types) +module Worker = Octez_telemetry.Worker.MakeSingle (Name) (Request) (Types) type worker = Worker.infinite Worker.queue Worker.t diff --git a/etherlink/bin_node/lib_dev/blueprints_publisher.ml b/etherlink/bin_node/lib_dev/blueprints_publisher.ml index 1ad76e90e8cdde0479a46e40cec8df45d0342bfd..4555b1947872f423107165e633f66ad2f5d032f0 100644 --- a/etherlink/bin_node/lib_dev/blueprints_publisher.ml +++ b/etherlink/bin_node/lib_dev/blueprints_publisher.ml @@ -61,7 +61,11 @@ module Name = struct end module Worker = struct - include Worker.MakeSingle (Name) (Blueprints_publisher_types.Request) (Types) + include + Octez_telemetry.Worker.MakeSingle + (Name) + (Blueprints_publisher_types.Request) + (Types) let rollup_node_endpoint worker = (state worker).rollup_node_endpoint diff --git a/etherlink/bin_node/lib_dev/blueprints_publisher_types.ml b/etherlink/bin_node/lib_dev/blueprints_publisher_types.ml index 4832c7a39d919744c85b10f8c6a68d2616fbdcc7..cdfc2f9aed2f6527fa0d4262ce5955c25dc23a8d 100644 --- a/etherlink/bin_node/lib_dev/blueprints_publisher_types.ml +++ b/etherlink/bin_node/lib_dev/blueprints_publisher_types.ml @@ -17,6 +17,11 @@ module Request = struct | Publish : {level : Z.t; payload : payload} -> (unit, error trace) t | New_rollup_node_block : int32 -> (unit, error trace) t + let name (type a b) (t : (a, b) t) = + match t with + | Publish _ -> "Publish" + | New_rollup_node_block _ -> "New_rollup_node_block" + type view = View : _ t -> view let view req = View req diff --git a/etherlink/bin_node/lib_dev/blueprints_publisher_types.mli b/etherlink/bin_node/lib_dev/blueprints_publisher_types.mli index 5ce715784fe589c44f6aa31891cacff8e7866211..25c8093f808ff373908a9a1be01212e9e02ea41f 100644 --- a/etherlink/bin_node/lib_dev/blueprints_publisher_types.mli +++ b/etherlink/bin_node/lib_dev/blueprints_publisher_types.mli @@ -25,6 +25,8 @@ module Request : sig (** Request to publish a blueprint. *) | New_rollup_node_block : int32 -> (unit, error trace) t + val name : ('a, 'b) t -> string + type view = View : _ t -> view (** [inbox_payload payload] returns the inbox payload associated diff --git a/etherlink/bin_node/lib_dev/evm_context.ml b/etherlink/bin_node/lib_dev/evm_context.ml index e0ed7627f3a8254ed5ba996fe7ca3e63f1c07ad4..8f0471d32f2de7af7504e0a88dc63037088d40a6 100644 --- a/etherlink/bin_node/lib_dev/evm_context.ml +++ b/etherlink/bin_node/lib_dev/evm_context.ml @@ -1832,7 +1832,7 @@ module State = struct return_none end -module Worker = Worker.MakeSingle (Name) (Request) (Types) +module Worker = Octez_telemetry.Worker.MakeSingle (Name) (Request) (Types) type worker = Worker.infinite Worker.queue Worker.t diff --git a/etherlink/bin_node/lib_dev/evm_context_types.ml b/etherlink/bin_node/lib_dev/evm_context_types.ml index dfe96a2fefd1bd033060087e1ffa443409a66698..f6ff943e3e2e47ea82cecada08429b7dd66ea37a 100644 --- a/etherlink/bin_node/lib_dev/evm_context_types.ml +++ b/etherlink/bin_node/lib_dev/evm_context_types.ml @@ -37,6 +37,16 @@ module Request = struct } -> (Ethereum_types.quantity option, tztrace) t + let name (type a b) (t : (a, b) t) = + match t with + | Apply_evm_events _ -> "Apply_evm_events" + | Apply_blueprint _ -> "Apply_blueprint" + | Last_known_L1_level -> "Last_known_l1_level" + | Delayed_inbox_hashes -> "Delayed_inbox_hashes" + | Patch_state _ -> "Patch_state" + | Wasm_pvm_version -> "Wasm_pvm_version" + | Potential_observer_reorg _ -> "Potential_observer_reorg" + type view = View : _ t -> view let view req = View req diff --git a/etherlink/bin_node/lib_dev/evm_events_follower.ml b/etherlink/bin_node/lib_dev/evm_events_follower.ml index 2c362f6d2a1a45ff6290437a56edf926937e0620..4426f907bb8138f10a3728ac0dbd392147a6bca6 100644 --- a/etherlink/bin_node/lib_dev/evm_events_follower.ml +++ b/etherlink/bin_node/lib_dev/evm_events_follower.ml @@ -38,7 +38,8 @@ module Name = struct end module Worker = - Worker.MakeSingle (Name) (Evm_events_follower_types.Request) (Types) + Octez_telemetry.Worker.MakeSingle (Name) (Evm_events_follower_types.Request) + (Types) type worker = Worker.infinite Worker.queue Worker.t diff --git a/etherlink/bin_node/lib_dev/evm_events_follower_types.ml b/etherlink/bin_node/lib_dev/evm_events_follower_types.ml index d1be3970c4b086de2ea4246dc3d236ac3bd86062..31eb6a6fb6e2c29decd74553723289cf8fb1a900 100644 --- a/etherlink/bin_node/lib_dev/evm_events_follower_types.ml +++ b/etherlink/bin_node/lib_dev/evm_events_follower_types.ml @@ -10,6 +10,11 @@ module Request = struct | New_rollup_node_block : Int32.t -> (unit, error trace) t | Apply_evm_events : Int32.t -> (unit, error trace) t + let name (type a b) (t : (a, b) t) = + match t with + | New_rollup_node_block _ -> "New_rollup_node_block" + | Apply_evm_events _ -> "Apply_evm_events" + type view = View : _ t -> view let view (req : _ t) = View req diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 54986554c12796038b134ee2bcb68f6f0fb41602..c7f567c644e83662fe9e7828e2c1b5f4b266d753 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -6,8 +6,6 @@ (* *) (*****************************************************************************) -open Tezos_workers - type parameters = {config : Configuration.tx_queue; keep_alive : bool} type queue_variant = [`Accepted | `Refused] @@ -408,6 +406,20 @@ module Request = struct } -> (unit, tztrace) t + let name (type a b) (t : (a, b) t) = + match t with + | Inject _ -> "Inject" + | Find _ -> "Find" + | Nonce _ -> "Nonce" + | Tick _ -> "Tick" + | Clear -> "Clear" + | Lock_transactions -> "Lock_transactions" + | Unlock_transactions -> "Unlock_transactions" + | Is_locked -> "Is_locked" + | Content -> "Content" + | Pop_transactions _ -> "Pop_transactions" + | Confirm_transactions _ -> "Confirm_transactions" + type view = View : _ t -> view let view req = View req @@ -536,7 +548,7 @@ module Request = struct | Confirm_transactions _ -> fprintf fmt "Confirming transactions" end -module Worker = Worker.MakeSingle (Name) (Request) (Types) +module Worker = Octez_telemetry.Worker.MakeSingle (Name) (Request) (Types) type worker = Worker.infinite Worker.queue Worker.t diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index 63ae2dda8ea40d57834f011a8a2d08345ef71027..29f9904d20e3a1e2ad7a4c6ba32edd6e1cfc9b53 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -4529,6 +4529,7 @@ let octez_telemetry = resto; octez_rpc_http; octez_rpc_http_server; + octez_workers; ] let _bip39_generator = diff --git a/src/lib_telemetry/dune b/src/lib_telemetry/dune index 7168f14767b93d7315c44268d7848a48b0b344a4..6d1ea33ab678923a65ae9b7d1bd08e0e054bf1af 100644 --- a/src/lib_telemetry/dune +++ b/src/lib_telemetry/dune @@ -11,7 +11,8 @@ opentelemetry-lwt octez-libs.resto octez-libs.rpc-http - octez-libs.rpc-http-server) + octez-libs.rpc-http-server + octez-libs.tezos-workers) (flags (:standard) -open Tezos_base.TzPervasives diff --git a/src/lib_telemetry/worker.ml b/src/lib_telemetry/worker.ml new file mode 100644 index 0000000000000000000000000000000000000000..e5a8b091ad1d8157da3650fbe99532a369476472 --- /dev/null +++ b/src/lib_telemetry/worker.ml @@ -0,0 +1,134 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +module type REQUEST = sig + include Tezos_base.Worker_intf.REQUEST + + val name : (_, _) t -> string +end + +module Hook_request (Request : Tezos_base.Worker_intf.REQUEST) : sig + include Tezos_base.Worker_intf.REQUEST + + val make : + parent_scope:Opentelemetry.Scope.t -> ('a, 'b) Request.t -> ('a, 'b) t + + val parent_scope : (_, _) t -> Opentelemetry.Scope.t + + val request : ('a, 'b) t -> ('a, 'b) Request.t +end = struct + type ('a, 'b) t = { + parent_scope : Opentelemetry.Scope.t; + request : ('a, 'b) Request.t; + } + + type view = View : _ t -> view + + let view (req : _ t) = View req + + let encoding = + let open Data_encoding in + conv + (fun (View {request; _}) -> Request.view request) + (fun _ -> assert false) + Request.encoding + + let pp ppf (View {request; _}) = Request.pp ppf (Request.view request) + + let parent_scope {parent_scope; _} = parent_scope + + let request {request; _} = request + + let make ~parent_scope request = {parent_scope; request} +end + +module MakeSingle + (Name : Tezos_base.Worker_intf.NAME) + (Request : REQUEST) + (Types : Tezos_base.Worker_intf.TYPES) = +struct + module Raw_name = struct + include Name + + (* We are calling [MakeSingle] twice. If we were to use the same [Name] + module each time, these two functor calls would generate the exact same + events, leading to a runtime error. *) + let base = base @ ["raw"] + end + + module Request_with_hook = Hook_request (Request) + module Raw_worker = + Tezos_workers.Worker.MakeSingle (Raw_name) (Request) (Types) + module Worker = + Tezos_workers.Worker.MakeSingle (Name) (Request_with_hook) (Types) + + let service_name = String.concat "." Name.base + + module Hook_handlers (Handlers : Raw_worker.HANDLERS) : + Worker.HANDLERS + with type launch_error = Handlers.launch_error + and type self = Handlers.self = struct + include Handlers + + let on_request self request = + Opentelemetry_lwt.Trace.with_ + ~scope:(Request_with_hook.parent_scope request) + ~service_name + ~attrs: + [ + ( "worker.request_name", + `String (Request.name (Request_with_hook.request request)) ); + ] + Format.( + sprintf + "%s/handler" + (Request.name (Request_with_hook.request request))) + @@ fun _scope -> on_request self (Request_with_hook.request request) + + let on_error self status request error = + Opentelemetry.Scope.set_status + (Request_with_hook.parent_scope request) + {message = "Request returned an error"; code = Status_code_error} ; + on_error self status (Request_with_hook.request request) error + + let on_completion self request result status = + Opentelemetry.Scope.set_status + (Request_with_hook.parent_scope request) + {message = "Request completed successfully"; code = Status_code_ok} ; + on_completion self (Request_with_hook.request request) result status + end + + module Instrumented_queue = struct + let with_ request = + Opentelemetry_lwt.Trace.with_ + ~service_name + ~attrs:[("worker.request_name", `String (Request.name request))] + Format.(sprintf "%s/call" (Request.name request)) + + let push_request_and_wait worker request = + with_ request @@ fun scope -> + Worker.Queue.push_request_and_wait + worker + (Request_with_hook.make ~parent_scope:scope request) + + let push_request worker request = + with_ request @@ fun scope -> + Worker.Queue.push_request + worker + (Request_with_hook.make ~parent_scope:scope request) + end + + include Worker + module Queue = Instrumented_queue + + let launch (type kind launch_error) table i parameters + (module Handlers : Raw_worker.HANDLERS + with type launch_error = launch_error + and type self = kind Worker.t) = + let module Hooked_handlers = Hook_handlers (Handlers) in + Worker.launch table i parameters (module Hooked_handlers) +end diff --git a/src/lib_telemetry/worker.mli b/src/lib_telemetry/worker.mli new file mode 100644 index 0000000000000000000000000000000000000000..243459e030870aec2ec319d15ef7377ac7f80bf4 --- /dev/null +++ b/src/lib_telemetry/worker.mli @@ -0,0 +1,52 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +module type REQUEST = sig + include Tezos_base.Worker_intf.REQUEST + + (** [name t] is used to make the span name associated to [t] *) + val name : (_, _) t -> string +end + +(** Drop-in replacement of {!Tezos_workers.Worker.MakeSingle}, where each + request processing is wrapped in an OpenTelemetry span. *) +module MakeSingle : functor + (Name : Tezos_base.Worker_intf.NAME) + (Raw_request : REQUEST) + (Types : Tezos_base.Worker_intf.TYPES) + -> sig + module Raw_worker : + Tezos_workers.Worker.T + with type Name.t = Name.t + with type ('a, 'b) Request.t = ('a, 'b) Raw_request.t + and module Types = Types + + include + Tezos_workers.Worker.T with module Name = Name and module Types = Types + + (** Instrumented queue tracing a request handling from the moment it is being + pushed to a worker’s pending queue. *) + module Queue : sig + val push_request_and_wait : + 'a queue t -> + ('b, 'c) Raw_request.t -> + ('b, 'c message_error) result Lwt.t + + val push_request : 'a queue t -> ('b, 'c) Raw_request.t -> bool Lwt.t + end + + (** [launch] starts an instrumented worker based on handlers implemented for + the “raw” (non-instrumented) requests. *) + val launch : + 'kind table -> + Name.t -> + Types.parameters -> + (module Raw_worker.HANDLERS + with type launch_error = 'launch_error + and type self = 'kind t) -> + ('kind t, 'launch_error) result Lwt.t +end