From d83f6d18366e2f0691277b558e6cc9085a0b0f5f Mon Sep 17 00:00:00 2001 From: Pierrick Couderc Date: Tue, 7 Jan 2025 17:00:21 +0100 Subject: [PATCH 1/4] Manifest: lib_worker depends on the profiler backends --- manifest/product_octez.ml | 38 +++++++++++++++++++++----------------- opam/octez-libs.opam | 2 +- src/lib_workers/dune | 5 ++++- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index fc32fd9b2b48..fa1ca12617ef 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -2265,17 +2265,38 @@ let octez_p2p_services = ~deps:[octez_base |> open_ ~m:"TzPervasives"; octez_rpc] ~linkall:true +let octez_profiler_backend = + octez_lib + "octez-profiler-backend" + ~internal_name:"tezos_profiler_backend" + ~path:"src/lib_profiler_backend" + ~synopsis:"Backends for the Octez Profiler" + ~deps: + [ + octez_base |> open_ ~m:"TzPervasives" |> open_; + octez_base_unix |> open_; + octez_stdlib |> open_; + octez_stdlib_unix; + opentelemetry; + ambient_context_lwt; + opentelemetry_client_cohttp_lwt; + ] + let octez_workers = + let (PPX {preprocess; preprocessor_deps}) = ppx_profiler in octez_lib "tezos-workers" ~path:"src/lib_workers" ~synopsis:"Worker library" ~documentation: Dune.[[S "package"; S "octez-libs"]; [S "mld_files"; S "tezos_workers"]] + ~preprocess + ~preprocessor_deps ~deps: [ octez_base |> open_ ~m:"TzPervasives" |> open_; octez_stdlib_unix |> open_; + octez_profiler_backend; ] let _octez_workers_tests = @@ -3789,23 +3810,6 @@ let octez_requester_tests = qcheck_tezt; ] -let octez_profiler_backend = - octez_lib - "octez-profiler-backend" - ~internal_name:"tezos_profiler_backend" - ~path:"src/lib_profiler_backend" - ~synopsis:"Backends for the Octez Profiler" - ~deps: - [ - octez_base |> open_ ~m:"TzPervasives" |> open_; - octez_base_unix |> open_; - octez_stdlib |> open_; - octez_stdlib_unix; - opentelemetry; - ambient_context_lwt; - opentelemetry_client_cohttp_lwt; - ] - let octez_shell = let (PPX {preprocess; preprocessor_deps}) = ppx_profiler in octez_shell_lib diff --git a/opam/octez-libs.opam b/opam/octez-libs.opam index bd277c709bcd..4d27e91cf214 100644 --- a/opam/octez-libs.opam +++ b/opam/octez-libs.opam @@ -76,13 +76,13 @@ depends: [ "index" { >= "1.6.0" & < "1.7.0" } "semaphore-compat" { >= "1.0.1" } "checkseum" { != "0.5.0" } + "ambient-context-lwt" { = "0.1.0" } "ringo" { >= "1.1.0" } "octez-internal-libs" { = version } "conf-rust" "integers" "ctypes" { >= "0.18.0" } "tezos-sapling-parameters" { >= "1.1.0" } - "ambient-context-lwt" { = "0.1.0" } "octez-rustzcash-deps" { with-test & = version } "bigstring" {with-test} "bam-ppx" { with-test & >= "0.3" } diff --git a/src/lib_workers/dune b/src/lib_workers/dune index e657f7efd9c8..ca48aec91323 100644 --- a/src/lib_workers/dune +++ b/src/lib_workers/dune @@ -7,7 +7,10 @@ (instrumentation (backend bisect_ppx)) (libraries octez-libs.base - octez-libs.stdlib-unix) + octez-libs.stdlib-unix + octez-libs.octez-profiler-backend) + (preprocess (pps octez-libs.ppx_profiler)) + (preprocessor_deps (env_var TEZOS_PPX_PROFILER)) (flags (:standard) -open Tezos_base.TzPervasives -- GitLab From 2127bd441766c5659f5b61821c2657406d68da92 Mon Sep 17 00:00:00 2001 From: Pierrick Couderc Date: Mon, 6 Jan 2025 14:32:56 +0100 Subject: [PATCH 2/4] Profiler/Opentelemetry: add scope update functions --- src/lib_profiler_backend/opentelemetry_profiler.ml | 6 ++++++ src/lib_profiler_backend/opentelemetry_profiler.mli | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/src/lib_profiler_backend/opentelemetry_profiler.ml b/src/lib_profiler_backend/opentelemetry_profiler.ml index 186cb27a60d7..6d31761d4491 100644 --- a/src/lib_profiler_backend/opentelemetry_profiler.ml +++ b/src/lib_profiler_backend/opentelemetry_profiler.ml @@ -51,6 +51,12 @@ let trace_operation op ?attrs = in trace ~trace_id:(op_hash_to_trace_id op_hash) ~attrs +let update_scope s f = + match s with + | Some s -> + Ambient_context.with_binding Opentelemetry.Scope.ambient_scope_key s f + | None -> f () + type (_, _) Profiler.kind += Opentelemetry_profiler : ('a, 'b) Profiler.kind type config = {service_name : string; verbosity : Profiler.verbosity} diff --git a/src/lib_profiler_backend/opentelemetry_profiler.mli b/src/lib_profiler_backend/opentelemetry_profiler.mli index 413326418392..d0ea56b73b3a 100644 --- a/src/lib_profiler_backend/opentelemetry_profiler.mli +++ b/src/lib_profiler_backend/opentelemetry_profiler.mli @@ -54,6 +54,10 @@ val trace_operation : (unit -> 'a) -> 'a +(** [update_scope s f] updates the Opentelemetry scope in the ambient context + before calling [f]. *) +val update_scope : Opentelemetry.Scope.t option -> (unit -> 'a) -> 'a + type config = {service_name : string; verbosity : Profiler.verbosity} (** Mocked driver, that serves only to register the opentelemetry as a valid -- GitLab From e8d01c3d8b45b7e48df9e2ab6c56e897fc7e0c52 Mon Sep 17 00:00:00 2001 From: Pierrick Couderc Date: Tue, 7 Jan 2025 17:29:17 +0100 Subject: [PATCH 3/4] Worker: add OpenTelemetry scope to requests --- src/lib_shell/peer_validator.ml | 12 ++-- src/lib_workers/test/test_workers_unit.ml | 18 +++--- src/lib_workers/worker.ml | 74 ++++++++++++++++------- src/lib_workers/worker.mli | 6 +- 4 files changed, 72 insertions(+), 38 deletions(-) diff --git a/src/lib_shell/peer_validator.ml b/src/lib_shell/peer_validator.ml index eac665dbb06b..1d331ce38cb6 100644 --- a/src/lib_shell/peer_validator.ml +++ b/src/lib_shell/peer_validator.ml @@ -565,21 +565,21 @@ let on_launch _ name parameters : (_, launch_error) result Lwt.t = Lwt.return (Ok pv) let table = - let merge w (Worker.Any_request neu) old = + let merge w (Worker.Any_request (neu, neu_metadata)) old = let pv = Worker.state w in match neu with | Request.New_branch (locator, _) -> pv.last_advertised_head <- (locator.head_hash, locator.head_header) ; - Some (Worker.Any_request neu) + Some (Worker.Any_request (neu, neu_metadata)) | Request.New_head (hash, header) -> ( pv.last_advertised_head <- (hash, header) ; (* TODO penalize decreasing fitness *) match old with - | Some (Worker.Any_request (Request.New_branch _) as old) -> + | Some (Worker.Any_request (Request.New_branch _, _) as old) -> Some old (* ignore *) - | Some (Worker.Any_request (Request.New_head _)) -> - Some (Any_request neu) - | None -> Some (Any_request neu)) + | Some (Worker.Any_request (Request.New_head _, _)) -> + Some (Any_request (neu, neu_metadata)) + | None -> Some (Any_request (neu, neu_metadata))) in Worker.create_table (Dropbox {merge}) diff --git a/src/lib_workers/test/test_workers_unit.ml b/src/lib_workers/test/test_workers_unit.ml index 6850bfffc2bc..e2fa9c1f4d24 100644 --- a/src/lib_workers/test/test_workers_unit.ml +++ b/src/lib_workers/test/test_workers_unit.ml @@ -112,16 +112,18 @@ let create_bounded ?on_completion = let create_dropbox ?on_completion = let table = let open Worker in - let merge _w (Any_request neu) (old : _ option) = - let (Any_request r) = + let merge _w (Any_request (neu, neu_metadata)) (old : _ option) = + let (Any_request (r, metadata)) = match (neu, old) with - | RqA i1, Some (Any_request (RqA i2)) -> Any_request (RqA (i1 + i2)) - | (RqA _ as rqa), _ -> Any_request rqa - | _, Some (Any_request (RqA _ as rqa)) -> Any_request rqa - | RqB, _ -> Any_request neu - | RqErr _, _ -> Any_request neu + | RqA i1, Some (Any_request (RqA i2, i2_metadata)) -> + Any_request (RqA (i1 + i2), i2_metadata) + | (RqA _ as rqa), _ -> Any_request (rqa, neu_metadata) + | _, Some (Any_request ((RqA _ as rqa), metadata)) -> + Any_request (rqa, metadata) + | RqB, _ -> Any_request (neu, neu_metadata) + | RqErr _, _ -> Any_request (neu, neu_metadata) in - Some (Worker.Any_request r) + Some (Worker.Any_request (r, metadata)) in Worker.create_table (Dropbox {merge}) in diff --git a/src/lib_workers/worker.ml b/src/lib_workers/worker.ml index ecc05caff66e..fc51a6757c60 100644 --- a/src/lib_workers/worker.ml +++ b/src/lib_workers/worker.ml @@ -51,6 +51,10 @@ module type T = sig type dropbox + type scope + + type metadata = {scope : scope option} + type 'a message_error = | Closed of error list option | Request_error of 'a @@ -66,7 +70,7 @@ module type T = sig } -> dropbox buffer_kind - and any_request = Any_request : _ Request.t -> any_request + and any_request = Any_request : _ Request.t * metadata -> any_request (** Create a table of workers. *) val create_table : 'kind buffer_kind -> 'kind table @@ -242,6 +246,10 @@ struct let base_name = String.concat "-" Name.base + type scope = Opentelemetry.Scope.t + + type metadata = {scope : scope option} + type 'a message_error = | Closed of error list option | Request_error of 'a @@ -249,7 +257,9 @@ struct type message = | Message : - ('a, 'b) Request.t * ('a, 'b message_error) result Lwt.u option + ('a, 'b) Request.t + * metadata + * ('a, 'b message_error) result Lwt.u option -> message type 'a queue @@ -269,7 +279,7 @@ struct } -> dropbox buffer_kind - and any_request = Any_request : _ Request.t -> any_request + and any_request = Any_request : _ Request.t * metadata -> any_request and _ buffer = | Queue_buffer : @@ -306,27 +316,32 @@ struct | Worker_types.Launching _ | Running _ | Closing _ -> None | Closed (_, _, errs) -> errs - let queue_item ?u r = (Time.System.now (), Message (r, u)) + let queue_item ~metadata ?u r = (Time.System.now (), Message (r, metadata, u)) - let drop_request w merge message_box request = + let drop_request w merge message_box request metadata = try match match Lwt_dropbox.peek message_box with - | None -> merge w (Any_request request) None - | Some (_, Message (old, _)) -> + | None -> merge w (Any_request (request, metadata)) None + | Some (_, Message (old, old_metadata, _)) -> Lwt.ignore_result (Lwt_dropbox.take message_box) ; - merge w (Any_request request) (Some (Any_request old)) + merge + w + (Any_request (request, metadata)) + (Some (Any_request (old, old_metadata))) with | None -> () - | Some (Any_request neu) -> - Lwt_dropbox.put message_box (Time.System.now (), Message (neu, None)) + | Some (Any_request (neu, neu_metadata)) -> + Lwt_dropbox.put + message_box + (Time.System.now (), Message (neu, neu_metadata, None)) with Lwt_dropbox.Closed -> () - let drop_request_and_wait w message_box request = + let drop_request_and_wait w message_box request metadata = let t, u = Lwt.wait () in Lwt.catch (fun () -> - Lwt_dropbox.put message_box (queue_item ~u request) ; + Lwt_dropbox.put message_box (queue_item ~metadata ~u request) ; t) (function | Lwt_dropbox.Closed -> @@ -365,11 +380,11 @@ struct let put_request (w : dropbox t) request = let (Dropbox {merge}) = w.table.buffer_kind in let (Dropbox_buffer message_box) = w.buffer in - drop_request w merge message_box request + drop_request w merge message_box request {scope = None} let put_request_and_wait (w : dropbox t) request = let (Dropbox_buffer message_box) = w.buffer in - drop_request_and_wait w message_box request + drop_request_and_wait w message_box request {scope = None} end module Queue = struct @@ -378,7 +393,9 @@ struct | Queue_buffer message_queue -> if Lwt_pipe.Unbounded.is_closed message_queue then Lwt.return_false else ( - Lwt_pipe.Unbounded.push message_queue (queue_item request) ; + Lwt_pipe.Unbounded.push + message_queue + (queue_item ~metadata:{scope = None} request) ; (* because pushing on an unbounded pipe is immediate, we return within Lwt explicitly for compatibility with the other case *) Lwt.return_true) @@ -387,21 +404,28 @@ struct else let open Lwt_syntax in let* () = - Lwt_pipe.Bounded.push message_queue (queue_item request) + Lwt_pipe.Bounded.push + message_queue + (queue_item ~metadata:{scope = None} request) in Lwt.return_true let push_request_now (w : infinite queue t) request = let (Queue_buffer message_queue) = w.buffer in if Lwt_pipe.Unbounded.is_closed message_queue then () - else Lwt_pipe.Unbounded.push message_queue (queue_item request) + else + Lwt_pipe.Unbounded.push + message_queue + (queue_item ~metadata:{scope = None} request) let push_request_and_wait (type a) (w : a queue t) request = match w.buffer with | Queue_buffer message_queue -> ( try let t, u = Lwt.wait () in - Lwt_pipe.Unbounded.push message_queue (queue_item ~u request) ; + Lwt_pipe.Unbounded.push + message_queue + (queue_item ~metadata:{scope = None} ~u request) ; t with Lwt_pipe.Closed -> Lwt.return_error (Closed (extract_status_errors w))) @@ -409,7 +433,9 @@ struct let t, u = Lwt.wait () in Lwt.try_bind (fun () -> - Lwt_pipe.Bounded.push message_queue (queue_item ~u request)) + Lwt_pipe.Bounded.push + message_queue + (queue_item ~metadata:{scope = None} ~u request)) (fun () -> t) (function | Lwt_pipe.Closed -> @@ -426,7 +452,9 @@ struct Lwt_pipe.Bounded.peek_all_now message_queue with Lwt_pipe.Closed -> [] in - List.map (function t, Message (req, _) -> (t, Request.view req)) peeked + List.map + (function t, Message (req, _, _) -> (t, Request.view req)) + peeked let pending_requests_length (type a) (w : a queue t) = let pipe_length (type a) (q : a buffer) = @@ -440,9 +468,9 @@ struct let close (type a) (w : a t) = let wakeup = function - | _, Message (_, Some u) -> + | _, Message (_, _, Some u) -> Lwt.wakeup_later u (Error (Closed (extract_status_errors w))) - | _, Message (_, None) -> () + | _, Message (_, _, None) -> () in let close_queue message_queue = let messages = Lwt_pipe.Bounded.pop_all_now message_queue in @@ -579,7 +607,7 @@ struct | Ok None -> let* () = Handlers.on_no_request w in loop () - | Ok (Some (pushed, Message (request, u))) -> ( + | Ok (Some (pushed, Message (request, {scope = _scope}, u))) -> ( let current_request = Request.view request in let treated = Time.System.now () in w.current_request <- Some (pushed, treated, current_request) ; diff --git a/src/lib_workers/worker.mli b/src/lib_workers/worker.mli index 9f28073b99bf..55efdfba8c25 100644 --- a/src/lib_workers/worker.mli +++ b/src/lib_workers/worker.mli @@ -57,6 +57,10 @@ module type T = sig type dropbox + type scope + + type metadata = {scope : scope option} + (** An error returned when waiting for a message pushed to the worker. [Closed errs] is returned if the worker is terminated or has crashed. If the worker is terminated, [errs] is an empty list. @@ -77,7 +81,7 @@ module type T = sig } -> dropbox buffer_kind - and any_request = Any_request : _ Request.t -> any_request + and any_request = Any_request : _ Request.t * metadata -> any_request (** Create a table of workers. *) val create_table : 'kind buffer_kind -> 'kind table -- GitLab From 471e054903b21fcf904bd280902ebfeaa7ae8d14 Mon Sep 17 00:00:00 2001 From: Pierrick Couderc Date: Wed, 8 Jan 2025 11:16:35 +0100 Subject: [PATCH 4/4] Worker: propagate Opentelemetry scope when pushing request --- src/lib_workers/worker.ml | 174 +++++++++++++++++++++----------------- 1 file changed, 97 insertions(+), 77 deletions(-) diff --git a/src/lib_workers/worker.ml b/src/lib_workers/worker.ml index fc51a6757c60..66078318671f 100644 --- a/src/lib_workers/worker.ml +++ b/src/lib_workers/worker.ml @@ -376,26 +376,45 @@ struct val pending_requests_length : 'a t -> int end + let make_metadata () = + (* This pattern is equivalent to: + ``` + let scope = + if "ppx is enabled" then + Opentelemetry.Scope.get_ambient_scope () + else + None + ``` + *) + let scope = + (None + [@profiler.custom + {driver_ids = [Opentelemetry]} + (Opentelemetry.Scope.get_ambient_scope ())]) + in + {scope} + module Dropbox = struct let put_request (w : dropbox t) request = let (Dropbox {merge}) = w.table.buffer_kind in let (Dropbox_buffer message_box) = w.buffer in - drop_request w merge message_box request {scope = None} + let metadata = make_metadata () in + drop_request w merge message_box request metadata let put_request_and_wait (w : dropbox t) request = let (Dropbox_buffer message_box) = w.buffer in - drop_request_and_wait w message_box request {scope = None} + let metadata = make_metadata () in + drop_request_and_wait w message_box request metadata end module Queue = struct let push_request (type a) (w : a queue t) request = + let metadata = make_metadata () in match w.buffer with | Queue_buffer message_queue -> if Lwt_pipe.Unbounded.is_closed message_queue then Lwt.return_false else ( - Lwt_pipe.Unbounded.push - message_queue - (queue_item ~metadata:{scope = None} request) ; + Lwt_pipe.Unbounded.push message_queue (queue_item ~metadata request) ; (* because pushing on an unbounded pipe is immediate, we return within Lwt explicitly for compatibility with the other case *) Lwt.return_true) @@ -404,28 +423,25 @@ struct else let open Lwt_syntax in let* () = - Lwt_pipe.Bounded.push - message_queue - (queue_item ~metadata:{scope = None} request) + Lwt_pipe.Bounded.push message_queue (queue_item ~metadata request) in Lwt.return_true let push_request_now (w : infinite queue t) request = + let metadata = make_metadata () in let (Queue_buffer message_queue) = w.buffer in if Lwt_pipe.Unbounded.is_closed message_queue then () - else - Lwt_pipe.Unbounded.push - message_queue - (queue_item ~metadata:{scope = None} request) + else Lwt_pipe.Unbounded.push message_queue (queue_item ~metadata request) let push_request_and_wait (type a) (w : a queue t) request = + let metadata = make_metadata () in match w.buffer with | Queue_buffer message_queue -> ( try let t, u = Lwt.wait () in Lwt_pipe.Unbounded.push message_queue - (queue_item ~metadata:{scope = None} ~u request) ; + (queue_item ~metadata ~u request) ; t with Lwt_pipe.Closed -> Lwt.return_error (Closed (extract_status_errors w))) @@ -435,7 +451,7 @@ struct (fun () -> Lwt_pipe.Bounded.push message_queue - (queue_item ~metadata:{scope = None} ~u request)) + (queue_item ~metadata ~u request)) (fun () -> t) (function | Lwt_pipe.Closed -> @@ -608,69 +624,73 @@ struct let* () = Handlers.on_no_request w in loop () | Ok (Some (pushed, Message (request, {scope = _scope}, u))) -> ( - let current_request = Request.view request in - let treated = Time.System.now () in - w.current_request <- Some (pushed, treated, current_request) ; - let* r = - match u with - | None -> ( - let open Lwt_result_syntax in - let*! res = Handlers.on_request w request in - match res with - | Error err -> Lwt.return_error err - | Ok res -> - let completed = Time.System.now () in - w.current_request <- None ; - let status = Worker_types.{pushed; treated; completed} in - let*! () = Handlers.on_completion w request res status in - let*! () = - Worker_events.(emit request_no_errors) - (current_request, status) - in - return_unit) - | Some u -> ( - (* [res] is a result. But the side effect [wakeup] - needs to happen regardless of success (Ok) or failure - (Error). To that end, we treat it locally like a regular - promise (which happens to carry a [result]) within the Lwt - monad. *) - let* res = Handlers.on_request w request in - match res with - | Error err -> - Lwt.wakeup_later u (Error (Request_error err)) ; - Lwt.return (Error err) - | Ok res -> - Lwt.wakeup_later u (Ok res) ; - let completed = Time.System.now () in - let status = Worker_types.{pushed; treated; completed} in - w.current_request <- None ; - let* () = Handlers.on_completion w request res status in - let* () = - Worker_events.(emit request_no_errors) - (current_request, status) - in - return (Ok ())) - in - match r with - | Ok () -> loop () - | Error err -> ( - let* r = - match w.current_request with - | Some (pushed, treated, _request_view) -> - let completed = Time.System.now () in - w.current_request <- None ; - Handlers.on_error - w - Worker_types.{pushed; treated; completed} - request - err - | None -> assert false - in - match r with - | Ok () -> loop () - | Error errs -> - let* () = Worker_events.(emit crashed) errs in - close handlers w (Some errs))) + (let current_request = Request.view request in + let treated = Time.System.now () in + w.current_request <- Some (pushed, treated, current_request) ; + let* r = + match u with + | None -> ( + let open Lwt_result_syntax in + let*! res = Handlers.on_request w request in + match res with + | Error err -> Lwt.return_error err + | Ok res -> + let completed = Time.System.now () in + w.current_request <- None ; + let status = Worker_types.{pushed; treated; completed} in + let*! () = Handlers.on_completion w request res status in + let*! () = + Worker_events.(emit request_no_errors) + (current_request, status) + in + return_unit) + | Some u -> ( + (* [res] is a result. But the side effect [wakeup] + needs to happen regardless of success (Ok) or failure + (Error). To that end, we treat it locally like a regular + promise (which happens to carry a [result]) within the Lwt + monad. *) + let* res = Handlers.on_request w request in + match res with + | Error err -> + Lwt.wakeup_later u (Error (Request_error err)) ; + Lwt.return (Error err) + | Ok res -> + Lwt.wakeup_later u (Ok res) ; + let completed = Time.System.now () in + let status = Worker_types.{pushed; treated; completed} in + w.current_request <- None ; + let* () = Handlers.on_completion w request res status in + let* () = + Worker_events.(emit request_no_errors) + (current_request, status) + in + return (Ok ())) + in + match r with + | Ok () -> loop () + | Error err -> ( + let* r = + match w.current_request with + | Some (pushed, treated, _request_view) -> + let completed = Time.System.now () in + w.current_request <- None ; + Handlers.on_error + w + Worker_types.{pushed; treated; completed} + request + err + | None -> assert false + in + match r with + | Ok () -> loop () + | Error errs -> + let* () = Worker_events.(emit crashed) errs in + close handlers w (Some errs))) + [@profiler.custom_f + {driver_ids = [Opentelemetry]} + (Tezos_profiler_backend.Opentelemetry_profiler.update_scope + _scope)]) in let* r = protect_result ~canceler:w.canceler (fun () -> loop ()) in match r with -- GitLab