diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index fc32fd9b2b48783e7000115066cac71007882b5c..fa1ca12617efc1043157fb8a01f8a61a667d9fb9 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -2265,17 +2265,38 @@ let octez_p2p_services = ~deps:[octez_base |> open_ ~m:"TzPervasives"; octez_rpc] ~linkall:true +let octez_profiler_backend = + octez_lib + "octez-profiler-backend" + ~internal_name:"tezos_profiler_backend" + ~path:"src/lib_profiler_backend" + ~synopsis:"Backends for the Octez Profiler" + ~deps: + [ + octez_base |> open_ ~m:"TzPervasives" |> open_; + octez_base_unix |> open_; + octez_stdlib |> open_; + octez_stdlib_unix; + opentelemetry; + ambient_context_lwt; + opentelemetry_client_cohttp_lwt; + ] + let octez_workers = + let (PPX {preprocess; preprocessor_deps}) = ppx_profiler in octez_lib "tezos-workers" ~path:"src/lib_workers" ~synopsis:"Worker library" ~documentation: Dune.[[S "package"; S "octez-libs"]; [S "mld_files"; S "tezos_workers"]] + ~preprocess + ~preprocessor_deps ~deps: [ octez_base |> open_ ~m:"TzPervasives" |> open_; octez_stdlib_unix |> open_; + octez_profiler_backend; ] let _octez_workers_tests = @@ -3789,23 +3810,6 @@ let octez_requester_tests = qcheck_tezt; ] -let octez_profiler_backend = - octez_lib - "octez-profiler-backend" - ~internal_name:"tezos_profiler_backend" - ~path:"src/lib_profiler_backend" - ~synopsis:"Backends for the Octez Profiler" - ~deps: - [ - octez_base |> open_ ~m:"TzPervasives" |> open_; - octez_base_unix |> open_; - octez_stdlib |> open_; - octez_stdlib_unix; - opentelemetry; - ambient_context_lwt; - opentelemetry_client_cohttp_lwt; - ] - let octez_shell = let (PPX {preprocess; preprocessor_deps}) = ppx_profiler in octez_shell_lib diff --git a/opam/octez-libs.opam b/opam/octez-libs.opam index bd277c709bcdb47c04218f158cdf352edda19eca..4d27e91cf2142ae4d7f76a6ec081826a1fe42254 100644 --- a/opam/octez-libs.opam +++ b/opam/octez-libs.opam @@ -76,13 +76,13 @@ depends: [ "index" { >= "1.6.0" & < "1.7.0" } "semaphore-compat" { >= "1.0.1" } "checkseum" { != "0.5.0" } + "ambient-context-lwt" { = "0.1.0" } "ringo" { >= "1.1.0" } "octez-internal-libs" { = version } "conf-rust" "integers" "ctypes" { >= "0.18.0" } "tezos-sapling-parameters" { >= "1.1.0" } - "ambient-context-lwt" { = "0.1.0" } "octez-rustzcash-deps" { with-test & = version } "bigstring" {with-test} "bam-ppx" { with-test & >= "0.3" } diff --git a/src/lib_profiler_backend/opentelemetry_profiler.ml b/src/lib_profiler_backend/opentelemetry_profiler.ml index 186cb27a60d7d6574dc7102fa350f38063fd4c61..6d31761d4491a8675c747e7c04786a2a210a6107 100644 --- a/src/lib_profiler_backend/opentelemetry_profiler.ml +++ b/src/lib_profiler_backend/opentelemetry_profiler.ml @@ -51,6 +51,12 @@ let trace_operation op ?attrs = in trace ~trace_id:(op_hash_to_trace_id op_hash) ~attrs +let update_scope s f = + match s with + | Some s -> + Ambient_context.with_binding Opentelemetry.Scope.ambient_scope_key s f + | None -> f () + type (_, _) Profiler.kind += Opentelemetry_profiler : ('a, 'b) Profiler.kind type config = {service_name : string; verbosity : Profiler.verbosity} diff --git a/src/lib_profiler_backend/opentelemetry_profiler.mli b/src/lib_profiler_backend/opentelemetry_profiler.mli index 413326418392cc87625e707e70760ac150516b73..d0ea56b73b3a0d6e6defc89946b77d6309497774 100644 --- a/src/lib_profiler_backend/opentelemetry_profiler.mli +++ b/src/lib_profiler_backend/opentelemetry_profiler.mli @@ -54,6 +54,10 @@ val trace_operation : (unit -> 'a) -> 'a +(** [update_scope s f] updates the Opentelemetry scope in the ambient context + before calling [f]. *) +val update_scope : Opentelemetry.Scope.t option -> (unit -> 'a) -> 'a + type config = {service_name : string; verbosity : Profiler.verbosity} (** Mocked driver, that serves only to register the opentelemetry as a valid diff --git a/src/lib_shell/peer_validator.ml b/src/lib_shell/peer_validator.ml index eac665dbb06b0f642442e2aa74d4079669c9e97a..1d331ce38cb6862bc7369f8f29d7e59af124ab4f 100644 --- a/src/lib_shell/peer_validator.ml +++ b/src/lib_shell/peer_validator.ml @@ -565,21 +565,21 @@ let on_launch _ name parameters : (_, launch_error) result Lwt.t = Lwt.return (Ok pv) let table = - let merge w (Worker.Any_request neu) old = + let merge w (Worker.Any_request (neu, neu_metadata)) old = let pv = Worker.state w in match neu with | Request.New_branch (locator, _) -> pv.last_advertised_head <- (locator.head_hash, locator.head_header) ; - Some (Worker.Any_request neu) + Some (Worker.Any_request (neu, neu_metadata)) | Request.New_head (hash, header) -> ( pv.last_advertised_head <- (hash, header) ; (* TODO penalize decreasing fitness *) match old with - | Some (Worker.Any_request (Request.New_branch _) as old) -> + | Some (Worker.Any_request (Request.New_branch _, _) as old) -> Some old (* ignore *) - | Some (Worker.Any_request (Request.New_head _)) -> - Some (Any_request neu) - | None -> Some (Any_request neu)) + | Some (Worker.Any_request (Request.New_head _, _)) -> + Some (Any_request (neu, neu_metadata)) + | None -> Some (Any_request (neu, neu_metadata))) in Worker.create_table (Dropbox {merge}) diff --git a/src/lib_workers/dune b/src/lib_workers/dune index e657f7efd9c8600e1ed0c7bcfee80e12ec07e14a..ca48aec9132397640baffa620ef922fa86b45740 100644 --- a/src/lib_workers/dune +++ b/src/lib_workers/dune @@ -7,7 +7,10 @@ (instrumentation (backend bisect_ppx)) (libraries octez-libs.base - octez-libs.stdlib-unix) + octez-libs.stdlib-unix + octez-libs.octez-profiler-backend) + (preprocess (pps octez-libs.ppx_profiler)) + (preprocessor_deps (env_var TEZOS_PPX_PROFILER)) (flags (:standard) -open Tezos_base.TzPervasives diff --git a/src/lib_workers/test/test_workers_unit.ml b/src/lib_workers/test/test_workers_unit.ml index 6850bfffc2bca975d8563e3431f7a0a24ad46b0d..e2fa9c1f4d24b191be31d7e74b81c051c9ea94d0 100644 --- a/src/lib_workers/test/test_workers_unit.ml +++ b/src/lib_workers/test/test_workers_unit.ml @@ -112,16 +112,18 @@ let create_bounded ?on_completion = let create_dropbox ?on_completion = let table = let open Worker in - let merge _w (Any_request neu) (old : _ option) = - let (Any_request r) = + let merge _w (Any_request (neu, neu_metadata)) (old : _ option) = + let (Any_request (r, metadata)) = match (neu, old) with - | RqA i1, Some (Any_request (RqA i2)) -> Any_request (RqA (i1 + i2)) - | (RqA _ as rqa), _ -> Any_request rqa - | _, Some (Any_request (RqA _ as rqa)) -> Any_request rqa - | RqB, _ -> Any_request neu - | RqErr _, _ -> Any_request neu + | RqA i1, Some (Any_request (RqA i2, i2_metadata)) -> + Any_request (RqA (i1 + i2), i2_metadata) + | (RqA _ as rqa), _ -> Any_request (rqa, neu_metadata) + | _, Some (Any_request ((RqA _ as rqa), metadata)) -> + Any_request (rqa, metadata) + | RqB, _ -> Any_request (neu, neu_metadata) + | RqErr _, _ -> Any_request (neu, neu_metadata) in - Some (Worker.Any_request r) + Some (Worker.Any_request (r, metadata)) in Worker.create_table (Dropbox {merge}) in diff --git a/src/lib_workers/worker.ml b/src/lib_workers/worker.ml index ecc05caff66e94031ebb9d5349178696397004dd..66078318671f62ef1e045e885cd72d4af4e853b1 100644 --- a/src/lib_workers/worker.ml +++ b/src/lib_workers/worker.ml @@ -51,6 +51,10 @@ module type T = sig type dropbox + type scope + + type metadata = {scope : scope option} + type 'a message_error = | Closed of error list option | Request_error of 'a @@ -66,7 +70,7 @@ module type T = sig } -> dropbox buffer_kind - and any_request = Any_request : _ Request.t -> any_request + and any_request = Any_request : _ Request.t * metadata -> any_request (** Create a table of workers. *) val create_table : 'kind buffer_kind -> 'kind table @@ -242,6 +246,10 @@ struct let base_name = String.concat "-" Name.base + type scope = Opentelemetry.Scope.t + + type metadata = {scope : scope option} + type 'a message_error = | Closed of error list option | Request_error of 'a @@ -249,7 +257,9 @@ struct type message = | Message : - ('a, 'b) Request.t * ('a, 'b message_error) result Lwt.u option + ('a, 'b) Request.t + * metadata + * ('a, 'b message_error) result Lwt.u option -> message type 'a queue @@ -269,7 +279,7 @@ struct } -> dropbox buffer_kind - and any_request = Any_request : _ Request.t -> any_request + and any_request = Any_request : _ Request.t * metadata -> any_request and _ buffer = | Queue_buffer : @@ -306,27 +316,32 @@ struct | Worker_types.Launching _ | Running _ | Closing _ -> None | Closed (_, _, errs) -> errs - let queue_item ?u r = (Time.System.now (), Message (r, u)) + let queue_item ~metadata ?u r = (Time.System.now (), Message (r, metadata, u)) - let drop_request w merge message_box request = + let drop_request w merge message_box request metadata = try match match Lwt_dropbox.peek message_box with - | None -> merge w (Any_request request) None - | Some (_, Message (old, _)) -> + | None -> merge w (Any_request (request, metadata)) None + | Some (_, Message (old, old_metadata, _)) -> Lwt.ignore_result (Lwt_dropbox.take message_box) ; - merge w (Any_request request) (Some (Any_request old)) + merge + w + (Any_request (request, metadata)) + (Some (Any_request (old, old_metadata))) with | None -> () - | Some (Any_request neu) -> - Lwt_dropbox.put message_box (Time.System.now (), Message (neu, None)) + | Some (Any_request (neu, neu_metadata)) -> + Lwt_dropbox.put + message_box + (Time.System.now (), Message (neu, neu_metadata, None)) with Lwt_dropbox.Closed -> () - let drop_request_and_wait w message_box request = + let drop_request_and_wait w message_box request metadata = let t, u = Lwt.wait () in Lwt.catch (fun () -> - Lwt_dropbox.put message_box (queue_item ~u request) ; + Lwt_dropbox.put message_box (queue_item ~metadata ~u request) ; t) (function | Lwt_dropbox.Closed -> @@ -361,24 +376,45 @@ struct val pending_requests_length : 'a t -> int end + let make_metadata () = + (* This pattern is equivalent to: + ``` + let scope = + if "ppx is enabled" then + Opentelemetry.Scope.get_ambient_scope () + else + None + ``` + *) + let scope = + (None + [@profiler.custom + {driver_ids = [Opentelemetry]} + (Opentelemetry.Scope.get_ambient_scope ())]) + in + {scope} + module Dropbox = struct let put_request (w : dropbox t) request = let (Dropbox {merge}) = w.table.buffer_kind in let (Dropbox_buffer message_box) = w.buffer in - drop_request w merge message_box request + let metadata = make_metadata () in + drop_request w merge message_box request metadata let put_request_and_wait (w : dropbox t) request = let (Dropbox_buffer message_box) = w.buffer in - drop_request_and_wait w message_box request + let metadata = make_metadata () in + drop_request_and_wait w message_box request metadata end module Queue = struct let push_request (type a) (w : a queue t) request = + let metadata = make_metadata () in match w.buffer with | Queue_buffer message_queue -> if Lwt_pipe.Unbounded.is_closed message_queue then Lwt.return_false else ( - Lwt_pipe.Unbounded.push message_queue (queue_item request) ; + Lwt_pipe.Unbounded.push message_queue (queue_item ~metadata request) ; (* because pushing on an unbounded pipe is immediate, we return within Lwt explicitly for compatibility with the other case *) Lwt.return_true) @@ -387,21 +423,25 @@ struct else let open Lwt_syntax in let* () = - Lwt_pipe.Bounded.push message_queue (queue_item request) + Lwt_pipe.Bounded.push message_queue (queue_item ~metadata request) in Lwt.return_true let push_request_now (w : infinite queue t) request = + let metadata = make_metadata () in let (Queue_buffer message_queue) = w.buffer in if Lwt_pipe.Unbounded.is_closed message_queue then () - else Lwt_pipe.Unbounded.push message_queue (queue_item request) + else Lwt_pipe.Unbounded.push message_queue (queue_item ~metadata request) let push_request_and_wait (type a) (w : a queue t) request = + let metadata = make_metadata () in match w.buffer with | Queue_buffer message_queue -> ( try let t, u = Lwt.wait () in - Lwt_pipe.Unbounded.push message_queue (queue_item ~u request) ; + Lwt_pipe.Unbounded.push + message_queue + (queue_item ~metadata ~u request) ; t with Lwt_pipe.Closed -> Lwt.return_error (Closed (extract_status_errors w))) @@ -409,7 +449,9 @@ struct let t, u = Lwt.wait () in Lwt.try_bind (fun () -> - Lwt_pipe.Bounded.push message_queue (queue_item ~u request)) + Lwt_pipe.Bounded.push + message_queue + (queue_item ~metadata ~u request)) (fun () -> t) (function | Lwt_pipe.Closed -> @@ -426,7 +468,9 @@ struct Lwt_pipe.Bounded.peek_all_now message_queue with Lwt_pipe.Closed -> [] in - List.map (function t, Message (req, _) -> (t, Request.view req)) peeked + List.map + (function t, Message (req, _, _) -> (t, Request.view req)) + peeked let pending_requests_length (type a) (w : a queue t) = let pipe_length (type a) (q : a buffer) = @@ -440,9 +484,9 @@ struct let close (type a) (w : a t) = let wakeup = function - | _, Message (_, Some u) -> + | _, Message (_, _, Some u) -> Lwt.wakeup_later u (Error (Closed (extract_status_errors w))) - | _, Message (_, None) -> () + | _, Message (_, _, None) -> () in let close_queue message_queue = let messages = Lwt_pipe.Bounded.pop_all_now message_queue in @@ -579,70 +623,74 @@ struct | Ok None -> let* () = Handlers.on_no_request w in loop () - | Ok (Some (pushed, Message (request, u))) -> ( - let current_request = Request.view request in - let treated = Time.System.now () in - w.current_request <- Some (pushed, treated, current_request) ; - let* r = - match u with - | None -> ( - let open Lwt_result_syntax in - let*! res = Handlers.on_request w request in - match res with - | Error err -> Lwt.return_error err - | Ok res -> - let completed = Time.System.now () in - w.current_request <- None ; - let status = Worker_types.{pushed; treated; completed} in - let*! () = Handlers.on_completion w request res status in - let*! () = - Worker_events.(emit request_no_errors) - (current_request, status) - in - return_unit) - | Some u -> ( - (* [res] is a result. But the side effect [wakeup] - needs to happen regardless of success (Ok) or failure - (Error). To that end, we treat it locally like a regular - promise (which happens to carry a [result]) within the Lwt - monad. *) - let* res = Handlers.on_request w request in - match res with - | Error err -> - Lwt.wakeup_later u (Error (Request_error err)) ; - Lwt.return (Error err) - | Ok res -> - Lwt.wakeup_later u (Ok res) ; - let completed = Time.System.now () in - let status = Worker_types.{pushed; treated; completed} in - w.current_request <- None ; - let* () = Handlers.on_completion w request res status in - let* () = - Worker_events.(emit request_no_errors) - (current_request, status) - in - return (Ok ())) - in - match r with - | Ok () -> loop () - | Error err -> ( - let* r = - match w.current_request with - | Some (pushed, treated, _request_view) -> - let completed = Time.System.now () in - w.current_request <- None ; - Handlers.on_error - w - Worker_types.{pushed; treated; completed} - request - err - | None -> assert false - in - match r with - | Ok () -> loop () - | Error errs -> - let* () = Worker_events.(emit crashed) errs in - close handlers w (Some errs))) + | Ok (Some (pushed, Message (request, {scope = _scope}, u))) -> ( + (let current_request = Request.view request in + let treated = Time.System.now () in + w.current_request <- Some (pushed, treated, current_request) ; + let* r = + match u with + | None -> ( + let open Lwt_result_syntax in + let*! res = Handlers.on_request w request in + match res with + | Error err -> Lwt.return_error err + | Ok res -> + let completed = Time.System.now () in + w.current_request <- None ; + let status = Worker_types.{pushed; treated; completed} in + let*! () = Handlers.on_completion w request res status in + let*! () = + Worker_events.(emit request_no_errors) + (current_request, status) + in + return_unit) + | Some u -> ( + (* [res] is a result. But the side effect [wakeup] + needs to happen regardless of success (Ok) or failure + (Error). To that end, we treat it locally like a regular + promise (which happens to carry a [result]) within the Lwt + monad. *) + let* res = Handlers.on_request w request in + match res with + | Error err -> + Lwt.wakeup_later u (Error (Request_error err)) ; + Lwt.return (Error err) + | Ok res -> + Lwt.wakeup_later u (Ok res) ; + let completed = Time.System.now () in + let status = Worker_types.{pushed; treated; completed} in + w.current_request <- None ; + let* () = Handlers.on_completion w request res status in + let* () = + Worker_events.(emit request_no_errors) + (current_request, status) + in + return (Ok ())) + in + match r with + | Ok () -> loop () + | Error err -> ( + let* r = + match w.current_request with + | Some (pushed, treated, _request_view) -> + let completed = Time.System.now () in + w.current_request <- None ; + Handlers.on_error + w + Worker_types.{pushed; treated; completed} + request + err + | None -> assert false + in + match r with + | Ok () -> loop () + | Error errs -> + let* () = Worker_events.(emit crashed) errs in + close handlers w (Some errs))) + [@profiler.custom_f + {driver_ids = [Opentelemetry]} + (Tezos_profiler_backend.Opentelemetry_profiler.update_scope + _scope)]) in let* r = protect_result ~canceler:w.canceler (fun () -> loop ()) in match r with diff --git a/src/lib_workers/worker.mli b/src/lib_workers/worker.mli index 9f28073b99bf3aa1aa405cb94af00d46b7847b4a..55efdfba8c250fd8f097ba6b9d9a4d359f510594 100644 --- a/src/lib_workers/worker.mli +++ b/src/lib_workers/worker.mli @@ -57,6 +57,10 @@ module type T = sig type dropbox + type scope + + type metadata = {scope : scope option} + (** An error returned when waiting for a message pushed to the worker. [Closed errs] is returned if the worker is terminated or has crashed. If the worker is terminated, [errs] is an empty list. @@ -77,7 +81,7 @@ module type T = sig } -> dropbox buffer_kind - and any_request = Any_request : _ Request.t -> any_request + and any_request = Any_request : _ Request.t * metadata -> any_request (** Create a table of workers. *) val create_table : 'kind buffer_kind -> 'kind table