From 084b4d89570066d04814ed576ee3fd19f7d55427 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 21 Jul 2025 21:07:20 +0100 Subject: [PATCH 1/2] Tezt_cloud: Refactor metrics --- tezt/tests/cloud/dal.ml | 613 +--------------------------------- tezt/tests/cloud/metrics.ml | 614 +++++++++++++++++++++++++++++++++++ tezt/tests/cloud/metrics.mli | 53 +++ 3 files changed, 685 insertions(+), 595 deletions(-) create mode 100644 tezt/tests/cloud/metrics.ml create mode 100644 tezt/tests/cloud/metrics.mli diff --git a/tezt/tests/cloud/dal.ml b/tezt/tests/cloud/dal.ml index c6417d7c08f1..6317467f1e60 100644 --- a/tezt/tests/cloud/dal.ml +++ b/tezt/tests/cloud/dal.ml @@ -72,54 +72,6 @@ type bootstrap = { client : Client.t; } -type public_key_hash = PKH of string - -type commitment_info = {commitment : string; publisher_pkh : string} - -type per_level_info = { - level : int; - published_commitments : (int, commitment_info) Hashtbl.t; - attestations : (public_key_hash, Dal_node_helpers.dal_status) Hashtbl.t; - attested_commitments : Z.t; - etherlink_operator_balance_sum : Tez.t; -} - -type metrics = { - level_first_commitment_published : int option; - level_first_commitment_attested : int option; - total_published_commitments : int; - total_published_commitments_per_slot : (int, int) Hashtbl.t; - (* A hash table mapping slot indices to their total number of - published commitments. *) - expected_published_commitments : int; - total_attested_commitments : int; - total_attested_commitments_per_slot : (int, int) Hashtbl.t; - (* A hash table mapping slot indices to their total number of - attested commitments. *) - ratio_published_commitments : float; - ratio_attested_commitments : float; - ratio_published_commitments_last_level : float; - ratio_attested_commitments_per_baker : - (public_key_hash, Baker_helpers.per_baker_dal_summary) Hashtbl.t; - etherlink_operator_balance_sum : Tez.t; -} - -let default_metrics = - { - level_first_commitment_published = None; - level_first_commitment_attested = None; - total_published_commitments = 0; - total_published_commitments_per_slot = Hashtbl.create 32; - expected_published_commitments = 0; - total_attested_commitments = 0; - total_attested_commitments_per_slot = Hashtbl.create 32; - ratio_published_commitments = 0.; - ratio_attested_commitments = 0.; - ratio_published_commitments_last_level = 0.; - ratio_attested_commitments_per_baker = Hashtbl.create 0; - etherlink_operator_balance_sum = Tez.zero; - } - type t = { configuration : configuration; cloud : Cloud.t; @@ -137,8 +89,8 @@ type t = { echo_rollup : Echo_rollup.operator option; time_between_blocks : int; parameters : Dal_common.Parameters.t; - infos : (int, per_level_info) Hashtbl.t; - metrics : (int, metrics) Hashtbl.t; + infos : (int, Metrics.per_level_info) Hashtbl.t; + metrics : (int, Metrics.t) Hashtbl.t; disconnection_state : Disconnect.t option; first_level : int; teztale : Teztale.t option; @@ -147,549 +99,12 @@ type t = { otel : string option; } -let aliases = - Hashtbl.create - 50 (* mapping from baker addresses to their Tzkt aliases (if known)*) - -let merge_aliases = - Option.iter (fun new_aliases -> - Hashtbl.iter - (fun key alias -> Hashtbl.replace aliases key alias) - new_aliases) - -let pp_slot_metrics fmt xs = - let open Format in - fprintf - fmt - "[ %a ]" - (pp_print_list - (fun fmt (x, y) -> fprintf fmt "(%d -> %d)" x y) - ~pp_sep:(fun fmt () -> fprintf fmt "; ")) - (List.of_seq xs - |> List.filter (fun (_, n) -> n > 0) - (* Sorting the list per slot index increasing order. *) - |> List.sort (fun (idx1, _) (idx2, _) -> Int.compare idx1 idx2)) - -let pp_metrics t - { - level_first_commitment_published; - level_first_commitment_attested; - total_published_commitments; - total_published_commitments_per_slot; - expected_published_commitments; - total_attested_commitments; - total_attested_commitments_per_slot; - ratio_published_commitments; - ratio_attested_commitments; - ratio_published_commitments_last_level; - ratio_attested_commitments_per_baker; - etherlink_operator_balance_sum; - } = - let pp_ratio fmt (num, div) = - if div = 0 then Format.fprintf fmt "Not a number: %d/0" num - else Format.fprintf fmt "%.2f" (float_of_int num *. 100. /. float_of_int div) - in - (match level_first_commitment_published with - | None -> () - | Some level_first_commitment_published -> - Log.info - "First commitment published level: %d" - level_first_commitment_published) ; - (match level_first_commitment_attested with - | None -> () - | Some level_first_commitment_attested -> - Log.info - "First commitment attested level: %d" - level_first_commitment_attested) ; - Log.info "Total published commitments: %d" total_published_commitments ; - Log.info "Expected published commitments: %d" expected_published_commitments ; - Log.info "Total attested commitments: %d" total_attested_commitments ; - Log.info "Ratio published commitments: %f" ratio_published_commitments ; - Log.info "Ratio attested commitments: %f" ratio_attested_commitments ; - Log.info - "Ratio published commitments last level: %f" - ratio_published_commitments_last_level ; - List.iter - (fun Baker_helpers.{accounts; stake; baker; _} -> - let baker_name = Agnostic_baker.name baker in - List.iter - (fun account -> - let pkh = account.Baker_helpers.delegate.public_key_hash in - match - Hashtbl.find_opt ratio_attested_commitments_per_baker (PKH pkh) - with - | None -> Log.info "We lack information about %s" pkh - | Some {attestable_slots; attested_slots; _} -> - let alias = - Hashtbl.find_opt aliases account.delegate.public_key_hash - |> Option.value ~default:account.delegate.public_key_hash - in - Log.info - "%s: Ratio for %s (with stake %d): %a" - baker_name - alias - stake - pp_ratio - (attested_slots, attestable_slots)) - accounts) - t.bakers ; - Log.info - "Sum of balances of the Etherlink operator: %s tez" - (Tez.to_string etherlink_operator_balance_sum) ; - Log.info - "DAL slots: total published commitments per slot ( -> \ - ).@.%a" - pp_slot_metrics - (Hashtbl.to_seq total_published_commitments_per_slot) ; - Log.info - "DAL slots: total attested commitments per slot ( -> ).@.%a" - pp_slot_metrics - (Hashtbl.to_seq total_attested_commitments_per_slot) - -let push_metrics t - { - level_first_commitment_published = _; - level_first_commitment_attested = _; - total_published_commitments; - total_published_commitments_per_slot; - expected_published_commitments; - total_attested_commitments; - total_attested_commitments_per_slot; - ratio_published_commitments; - ratio_attested_commitments; - ratio_published_commitments_last_level; - ratio_attested_commitments_per_baker; - etherlink_operator_balance_sum; - } = - let get_labels public_key_hash = - let alias = - Hashtbl.find_opt aliases public_key_hash - |> Option.map (fun alias -> [("alias", alias)]) - |> Option.value ~default:[] - in - let version = - Hashtbl.find_opt t.versions public_key_hash - |> Option.map (fun version -> [("version", version)]) - |> Option.value ~default:[] - in - [("attester", public_key_hash)] @ alias @ version - in - let push_attested ~labels value = - Cloud.push_metric - t.cloud - ~help:"Number of attested commitments per baker" - ~typ:`Gauge - ~labels - ~name:"tezt_dal_commitments_attested" - (float_of_int value) - in - let push_attestable ~labels value = - Cloud.push_metric - t.cloud - ~help: - "Number of attestable commitments per baker (ie published when the \ - baker is in the DAL committee at attestation level)" - ~typ:`Gauge - ~labels - ~name:"tezt_dal_commitments_attestable" - (float_of_int value) - in - let push_dal_attestation_sent ~labels value = - Cloud.push_metric - t.cloud - ~help: - "Did the baker sent a DAL attestation when they had the opportunity to" - ~typ:`Gauge - ~labels - ~name:"tezt_dal_attestation_sent" - (if value then 1. else 0.) - in - let push_metric_out_attestation_sent ~labels () = - Cloud.push_metric - t.cloud - ~help:"The baker sent an attestation while out of the DAL committee" - ~typ:`Gauge - ~labels - ~name:"tezt_attestation_sent_when_out_of_dal_committee" - 1. - in - Hashtbl.iter - (fun (PKH public_key_hash) - Baker_helpers. - { - attested_slots; - attestable_slots; - in_committee; - attestation_with_dal; - } -> - if in_committee then ( - let labels = get_labels public_key_hash in - push_attested ~labels attested_slots ; - push_attestable ~labels attestable_slots ; - push_dal_attestation_sent ~labels attestation_with_dal) - else - let labels = get_labels public_key_hash in - push_metric_out_attestation_sent ~labels ()) - ratio_attested_commitments_per_baker ; - Hashtbl.iter - (fun slot_index value -> - let labels = [("slot_index", string_of_int slot_index)] in - Cloud.push_metric - t.cloud - ~help:"Total published commitments per slot" - ~typ:`Counter - ~labels - ~name:"tezt_total_published_commitments_per_slot" - (float value)) - total_published_commitments_per_slot ; - Hashtbl.iter - (fun slot_index value -> - let labels = [("slot_index", string_of_int slot_index)] in - Cloud.push_metric - t.cloud - ~help:"Total attested commitments per slot" - ~typ:`Counter - ~labels - ~name:"tezt_total_attested_commitments_per_slot" - (float value)) - total_attested_commitments_per_slot ; - Cloud.push_metric - t.cloud - ~help:"Ratio between the number of published and expected commitments" - ~typ:`Gauge - ~name:"tezt_dal_commitments_ratio" - ~labels:[("kind", "published")] - ratio_published_commitments ; - Cloud.push_metric - t.cloud - ~help:"Ratio between the number of attested and expected commitments" - ~typ:`Gauge - ~name:"tezt_dal_commitments_ratio" - ~labels:[("kind", "attested")] - ratio_attested_commitments ; - Cloud.push_metric - t.cloud - ~help: - "Ratio between the number of attested and expected commitments per level" - ~typ:`Gauge - ~name:"tezt_dal_commitments_ratio" - ~labels:[("kind", "published_last_level")] - ratio_published_commitments_last_level ; - Cloud.push_metric - t.cloud - ~help:"Number of commitments expected to be published" - ~typ:`Counter - ~name:"tezt_dal_commitments_total" - ~labels:[("kind", "expected")] - (float_of_int expected_published_commitments) ; - Cloud.push_metric - t.cloud - ~help:"Number of published commitments " - ~typ:`Counter - ~name:"tezt_dal_commitments_total" - ~labels:[("kind", "published")] - (float_of_int total_published_commitments) ; - Cloud.push_metric - t.cloud - ~help:"Number of attested commitments" - ~typ:`Counter - ~name:"tezt_dal_commitments_total" - ~labels:[("kind", "attested")] - (float_of_int total_attested_commitments) ; - Cloud.push_metric - t.cloud - ~help:"Sum of the balances of the etherlink operator" - ~typ:`Gauge - ~name:"tezt_etherlink_operator_balance_total" - (Tez.to_float etherlink_operator_balance_sum) - -let published_level_of_attested_level t level = - level - t.parameters.attestation_lag - -let update_level_first_commitment_published _t per_level_info metrics = - match metrics.level_first_commitment_published with - | None -> - if Hashtbl.length per_level_info.published_commitments > 0 then - Some per_level_info.level - else None - | Some l -> Some l - -let update_level_first_commitment_attested t per_level_info metrics = - match metrics.level_first_commitment_attested with - | None -> - if - Z.popcount per_level_info.attested_commitments > 0 - && per_level_info.level >= t.first_level + t.parameters.attestation_lag - then Some per_level_info.level - else None - | Some l -> Some l - -let update_total_published_commitments _t per_level_info metrics = - metrics.total_published_commitments - + Hashtbl.length per_level_info.published_commitments - -let update_expected_published_commitments t metrics = - match metrics.level_first_commitment_published with - | None -> 0 - | Some _ -> - (* -1 since we are looking at level n operation submitted at the previous - level. *) - let producers = - min - (List.length t.configuration.dal_node_producers) - t.parameters.number_of_slots - in - metrics.expected_published_commitments + producers - -let update_total_attested_commitments _t per_level_info metrics = - metrics.total_attested_commitments - + Z.popcount per_level_info.attested_commitments - -let update_ratio_published_commitments _t _per_level_info metrics = - if metrics.expected_published_commitments = 0 then 0. - else - float_of_int metrics.total_published_commitments - *. 100. - /. float_of_int metrics.expected_published_commitments - -let update_ratio_published_commitments_last_level t per_level_info metrics = - match metrics.level_first_commitment_published with - | None -> 0. - | Some _ -> - let producers = - min - (List.length t.configuration.dal_node_producers) - t.parameters.number_of_slots - in - if producers = 0 then 100. - else - float_of_int (Hashtbl.length per_level_info.published_commitments) - *. 100. /. float_of_int producers - -let update_ratio_attested_commitments t per_level_info metrics = - let published_level = - published_level_of_attested_level t per_level_info.level - in - if published_level <= t.first_level then ( - Log.warn - "Unable to retrieve information for published level %d because it \ - precedes the earliest available level (%d)." - published_level - t.first_level ; - metrics.ratio_attested_commitments) - else - match Hashtbl.find_opt t.infos published_level with - | None -> - Log.warn - "Unexpected error: The level %d is missing in the infos table" - published_level ; - metrics.ratio_attested_commitments - | Some old_per_level_info -> - let n = Hashtbl.length old_per_level_info.published_commitments in - if n = 0 then metrics.ratio_attested_commitments - else - float (Z.popcount per_level_info.attested_commitments) - *. 100. /. float n - -let update_published_and_attested_commitments_per_slot t per_level_info - total_published_commitments_per_slot total_attested_commitments_per_slot = - let published_level = - published_level_of_attested_level t per_level_info.level - in - if published_level <= t.first_level then ( - Log.warn - "Unable to retrieve information for published level %d because it \ - precedes the earliest available level (%d)." - published_level - t.first_level ; - (total_published_commitments_per_slot, total_attested_commitments_per_slot)) - else - match Hashtbl.find_opt t.infos published_level with - | None -> - Log.warn - "Unexpected error: The level %d is missing in the infos table" - published_level ; - ( total_published_commitments_per_slot, - total_attested_commitments_per_slot ) - | Some old_per_level_info -> - let published_commitments = old_per_level_info.published_commitments in - for slot_index = 0 to pred t.parameters.number_of_slots do - let is_published = Hashtbl.mem published_commitments slot_index in - let total_published_commitments = - Option.value - ~default:0 - (Hashtbl.find_opt total_published_commitments_per_slot slot_index) - in - let new_total_published_commitments = - if is_published then succ total_published_commitments - else total_published_commitments - in - Hashtbl.replace - total_published_commitments_per_slot - slot_index - new_total_published_commitments ; - (* per_level_info.attested_commitments is a binary - sequence of length parameters.number_of_slots - (e.g. '00111111001110100010101011100101'). - For each index i: - - 1 indicates the slot has been attested - - 0 indicates the slot has not been attested. *) - let is_attested = - Z.testbit per_level_info.attested_commitments slot_index - in - let total_attested_commitments = - Option.value - ~default:0 - (Hashtbl.find_opt total_attested_commitments_per_slot slot_index) - in - let new_total_attested_commitments = - if is_attested then succ total_attested_commitments - else total_attested_commitments - in - Hashtbl.replace - total_attested_commitments_per_slot - slot_index - new_total_attested_commitments - done ; - ( total_published_commitments_per_slot, - total_attested_commitments_per_slot ) - -let update_ratio_attested_commitments_per_baker t per_level_info = - let default () = Hashtbl.create 0 in - let published_level = - published_level_of_attested_level t per_level_info.level - in - if published_level <= t.first_level then ( - Log.warn - "Unable to retrieve information for published level %d because it \ - precedes the earliest available level (%d)." - published_level - t.first_level ; - default ()) - else - match Hashtbl.find_opt t.infos published_level with - | None -> - Log.warn - "Unexpected error: The level %d is missing in the infos table" - published_level ; - default () - | Some published_level_info -> - (* Retrieves the number of published commitments *) - let attestable_slots = - Hashtbl.length published_level_info.published_commitments - in - let table = Hashtbl.(create (length per_level_info.attestations)) in - Hashtbl.to_seq per_level_info.attestations - |> Seq.map (fun (public_key_hash, status) -> - ( public_key_hash, - match status with - (* The baker is in the DAL committee and sent an attestation_with_dal. *) - | Dal_node_helpers.With_DAL attestation_bitset -> - Baker_helpers. - { - attestable_slots; - attested_slots = Z.popcount attestation_bitset; - in_committee = true; - attestation_with_dal = true; - } - (* The baker is out of the DAL committee and sent an attestation_with_dal. *) - | Out_of_committee -> - { - attestable_slots; - attested_slots = 0; - in_committee = false; - attestation_with_dal = false; - } - (* The baker is in the DAL committee but sent either an attestation without DAL, or no attestations. *) - | Without_DAL | Expected_to_DAL_attest -> - { - attestable_slots; - attested_slots = 0; - in_committee = true; - attestation_with_dal = false; - } )) - |> Hashtbl.add_seq table ; - table - -let get_metrics t infos_per_level metrics = - let level_first_commitment_published = - update_level_first_commitment_published t infos_per_level metrics - in - let level_first_commitment_attested = - update_level_first_commitment_attested t infos_per_level metrics - in - (* Metrics below depends on the new value for the metrics above. *) - let metrics = - { - metrics with - level_first_commitment_attested; - level_first_commitment_published; - } - in - let total_published_commitments = - update_total_published_commitments t infos_per_level metrics - in - let expected_published_commitments = - update_expected_published_commitments t metrics - in - let ratio_published_commitments_last_level = - update_ratio_published_commitments_last_level t infos_per_level metrics - in - let total_attested_commitments = - update_total_attested_commitments t infos_per_level metrics - in - (* Metrics below depends on the new value for the metrics above. *) - let metrics = - { - metrics with - level_first_commitment_attested; - level_first_commitment_published; - total_published_commitments; - expected_published_commitments; - total_attested_commitments; - ratio_published_commitments_last_level; - } - in - let ratio_published_commitments = - update_ratio_published_commitments t infos_per_level metrics - in - let ratio_attested_commitments = - update_ratio_attested_commitments t infos_per_level metrics - in - let ratio_attested_commitments_per_baker = - update_ratio_attested_commitments_per_baker t infos_per_level - in - let total_published_commitments_per_slot, total_attested_commitments_per_slot - = - update_published_and_attested_commitments_per_slot - t - infos_per_level - metrics.total_published_commitments_per_slot - metrics.total_attested_commitments_per_slot - in - { - level_first_commitment_published; - level_first_commitment_attested; - total_published_commitments; - total_published_commitments_per_slot; - expected_published_commitments; - total_attested_commitments; - total_attested_commitments_per_slot; - ratio_published_commitments; - ratio_attested_commitments; - ratio_published_commitments_last_level; - ratio_attested_commitments_per_baker; - etherlink_operator_balance_sum = - infos_per_level.etherlink_operator_balance_sum; - } - module Monitoring_app = struct (* time interval in hours at which to submit report *) let report_interval = 6 let pp_delegate fmt delegate_pkh = - match Hashtbl.find_opt aliases delegate_pkh with + match Hashtbl.find_opt Metrics.aliases delegate_pkh with | None -> Format.fprintf fmt "%s" delegate_pkh | Some alias -> Format.fprintf fmt "%s : %-26s" (String.sub delegate_pkh 0 7) alias @@ -975,7 +390,7 @@ module Monitoring_app = struct } type baker_infos = { - address : public_key_hash; + address : Metrics.public_key_hash; attest_infos : baker_attestation_numbers; stake_fraction : float; } @@ -1701,6 +1116,7 @@ end let get_infos_per_level t ~level ~metadata = let open Dal_node_helpers in + let open Metrics in let client = t.bootstrap.client in let endpoint = t.some_node_rpc_endpoint in let etherlink_operators = @@ -2589,7 +2005,7 @@ let init ~(configuration : configuration) etherlink_configuration cloud | Scatter _ | Map _ -> Network.get_level some_node_rpc_endpoint) | _ -> Network.get_level some_node_rpc_endpoint in - Hashtbl.replace metrics first_level default_metrics ; + Hashtbl.replace metrics first_level Metrics.default ; let disconnection_state = Option.map Disconnect.init configuration.disconnect in @@ -2604,7 +2020,7 @@ let init ~(configuration : configuration) etherlink_configuration cloud Network.aliases ~accounts configuration.network in let* versions = Network.versions configuration.network in - merge_aliases init_aliases ; + Metrics.merge_aliases init_aliases ; let versions = Option.value ~default:(Hashtbl.create 0) versions in let otel = Cloud.open_telemetry_endpoint cloud in (* Adds monitoring for all agents for octez-dal-node and octez-node @@ -2663,7 +2079,7 @@ let update_bakers_infos t = Network.aliases ~accounts t.configuration.network in let* versions = Network.versions t.configuration.network in - merge_aliases new_aliases ; + Metrics.merge_aliases new_aliases ; let versions = Option.value ~default:t.versions versions in t.versions <- versions ; Lwt.return_unit @@ -2678,10 +2094,17 @@ let on_new_level t level ~metadata = toplog "Level %d's info processed" level ; Hashtbl.replace t.infos level infos_per_level ; let metrics = - get_metrics t infos_per_level (Hashtbl.find t.metrics (level - 1)) + Metrics.get + ~first_level:t.first_level + ~attestation_lag:t.parameters.attestation_lag + ~dal_node_producers:t.configuration.dal_node_producers + ~number_of_slots:t.parameters.number_of_slots + ~infos:t.infos + infos_per_level + (Hashtbl.find t.metrics (level - 1)) in - pp_metrics t metrics ; - push_metrics t metrics ; + Metrics.pp ~bakers:t.bakers metrics ; + Metrics.push ~versions:t.versions ~cloud:t.cloud metrics ; Hashtbl.replace t.metrics level metrics ; let* t = match t.disconnection_state with diff --git a/tezt/tests/cloud/metrics.ml b/tezt/tests/cloud/metrics.ml new file mode 100644 index 000000000000..008e1ea5df8e --- /dev/null +++ b/tezt/tests/cloud/metrics.ml @@ -0,0 +1,614 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Nomadic Labs, *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +type public_key_hash = PKH of string + +type commitment_info = {commitment : string; publisher_pkh : string} + +type per_level_info = { + level : int; + published_commitments : (int, commitment_info) Hashtbl.t; + attestations : (public_key_hash, Dal_node_helpers.dal_status) Hashtbl.t; + attested_commitments : Z.t; + etherlink_operator_balance_sum : Tez.t; +} + +type t = { + level_first_commitment_published : int option; + level_first_commitment_attested : int option; + total_published_commitments : int; + total_published_commitments_per_slot : (int, int) Hashtbl.t; + (* A hash table mapping slot indices to their total number of + published commitments. *) + expected_published_commitments : int; + total_attested_commitments : int; + total_attested_commitments_per_slot : (int, int) Hashtbl.t; + (* A hash table mapping slot indices to their total number of + attested commitments. *) + ratio_published_commitments : float; + ratio_attested_commitments : float; + ratio_published_commitments_last_level : float; + ratio_attested_commitments_per_baker : + (public_key_hash, Baker_helpers.per_baker_dal_summary) Hashtbl.t; + etherlink_operator_balance_sum : Tez.t; +} + +let default = + { + level_first_commitment_published = None; + level_first_commitment_attested = None; + total_published_commitments = 0; + total_published_commitments_per_slot = Hashtbl.create 32; + expected_published_commitments = 0; + total_attested_commitments = 0; + total_attested_commitments_per_slot = Hashtbl.create 32; + ratio_published_commitments = 0.; + ratio_attested_commitments = 0.; + ratio_published_commitments_last_level = 0.; + ratio_attested_commitments_per_baker = Hashtbl.create 0; + etherlink_operator_balance_sum = Tez.zero; + } + +let aliases = + Hashtbl.create + 50 (* mapping from baker addresses to their Tzkt aliases (if known)*) + +let merge_aliases = + Option.iter (fun new_aliases -> + Hashtbl.iter + (fun key alias -> Hashtbl.replace aliases key alias) + new_aliases) + +let pp_slot_metrics fmt xs = + let open Format in + fprintf + fmt + "[ %a ]" + (pp_print_list + (fun fmt (x, y) -> fprintf fmt "(%d -> %d)" x y) + ~pp_sep:(fun fmt () -> fprintf fmt "; ")) + (List.of_seq xs + |> List.filter (fun (_, n) -> n > 0) + (* Sorting the list per slot index increasing order. *) + |> List.sort (fun (idx1, _) (idx2, _) -> Int.compare idx1 idx2)) + +let pp ~bakers + { + level_first_commitment_published; + level_first_commitment_attested; + total_published_commitments; + total_published_commitments_per_slot; + expected_published_commitments; + total_attested_commitments; + total_attested_commitments_per_slot; + ratio_published_commitments; + ratio_attested_commitments; + ratio_published_commitments_last_level; + ratio_attested_commitments_per_baker; + etherlink_operator_balance_sum; + } = + let pp_ratio fmt (num, div) = + if div = 0 then Format.fprintf fmt "Not a number: %d/0" num + else Format.fprintf fmt "%.2f" (float_of_int num *. 100. /. float_of_int div) + in + (match level_first_commitment_published with + | None -> () + | Some level_first_commitment_published -> + Log.info + "First commitment published level: %d" + level_first_commitment_published) ; + (match level_first_commitment_attested with + | None -> () + | Some level_first_commitment_attested -> + Log.info + "First commitment attested level: %d" + level_first_commitment_attested) ; + Log.info "Total published commitments: %d" total_published_commitments ; + Log.info "Expected published commitments: %d" expected_published_commitments ; + Log.info "Total attested commitments: %d" total_attested_commitments ; + Log.info "Ratio published commitments: %f" ratio_published_commitments ; + Log.info "Ratio attested commitments: %f" ratio_attested_commitments ; + Log.info + "Ratio published commitments last level: %f" + ratio_published_commitments_last_level ; + List.iter + (fun Baker_helpers.{accounts; stake; baker; _} -> + let baker_name = Agnostic_baker.name baker in + List.iter + (fun account -> + let pkh = account.Baker_helpers.delegate.public_key_hash in + match + Hashtbl.find_opt ratio_attested_commitments_per_baker (PKH pkh) + with + | None -> Log.info "We lack information about %s" pkh + | Some {attestable_slots; attested_slots; _} -> + let alias = + Hashtbl.find_opt aliases account.delegate.public_key_hash + |> Option.value ~default:account.delegate.public_key_hash + in + Log.info + "%s: Ratio for %s (with stake %d): %a" + baker_name + alias + stake + pp_ratio + (attested_slots, attestable_slots)) + accounts) + bakers ; + Log.info + "Sum of balances of the Etherlink operator: %s tez" + (Tez.to_string etherlink_operator_balance_sum) ; + Log.info + "DAL slots: total published commitments per slot ( -> \ + ).@.%a" + pp_slot_metrics + (Hashtbl.to_seq total_published_commitments_per_slot) ; + Log.info + "DAL slots: total attested commitments per slot ( -> ).@.%a" + pp_slot_metrics + (Hashtbl.to_seq total_attested_commitments_per_slot) + +let push ~versions ~cloud + { + level_first_commitment_published = _; + level_first_commitment_attested = _; + total_published_commitments; + total_published_commitments_per_slot; + expected_published_commitments; + total_attested_commitments; + total_attested_commitments_per_slot; + ratio_published_commitments; + ratio_attested_commitments; + ratio_published_commitments_last_level; + ratio_attested_commitments_per_baker; + etherlink_operator_balance_sum; + } = + let get_labels public_key_hash = + let alias = + Hashtbl.find_opt aliases public_key_hash + |> Option.map (fun alias -> [("alias", alias)]) + |> Option.value ~default:[] + in + let version = + Hashtbl.find_opt versions public_key_hash + |> Option.map (fun version -> [("version", version)]) + |> Option.value ~default:[] + in + [("attester", public_key_hash)] @ alias @ version + in + let push_attested ~labels value = + Cloud.push_metric + cloud + ~help:"Number of attested commitments per baker" + ~typ:`Gauge + ~labels + ~name:"tezt_dal_commitments_attested" + (float_of_int value) + in + let push_attestable ~labels value = + Cloud.push_metric + cloud + ~help: + "Number of attestable commitments per baker (ie published when the \ + baker is in the DAL committee at attestation level)" + ~typ:`Gauge + ~labels + ~name:"tezt_dal_commitments_attestable" + (float_of_int value) + in + let push_dal_attestation_sent ~labels value = + Cloud.push_metric + cloud + ~help: + "Did the baker sent a DAL attestation when they had the opportunity to" + ~typ:`Gauge + ~labels + ~name:"tezt_dal_attestation_sent" + (if value then 1. else 0.) + in + let push_metric_out_attestation_sent ~labels () = + Cloud.push_metric + cloud + ~help:"The baker sent an attestation while out of the DAL committee" + ~typ:`Gauge + ~labels + ~name:"tezt_attestation_sent_when_out_of_dal_committee" + 1. + in + Hashtbl.iter + (fun (PKH public_key_hash) + Baker_helpers. + { + attested_slots; + attestable_slots; + in_committee; + attestation_with_dal; + } -> + if in_committee then ( + let labels = get_labels public_key_hash in + push_attested ~labels attested_slots ; + push_attestable ~labels attestable_slots ; + push_dal_attestation_sent ~labels attestation_with_dal) + else + let labels = get_labels public_key_hash in + push_metric_out_attestation_sent ~labels ()) + ratio_attested_commitments_per_baker ; + Hashtbl.iter + (fun slot_index value -> + let labels = [("slot_index", string_of_int slot_index)] in + Cloud.push_metric + cloud + ~help:"Total published commitments per slot" + ~typ:`Counter + ~labels + ~name:"tezt_total_published_commitments_per_slot" + (float value)) + total_published_commitments_per_slot ; + Hashtbl.iter + (fun slot_index value -> + let labels = [("slot_index", string_of_int slot_index)] in + Cloud.push_metric + cloud + ~help:"Total attested commitments per slot" + ~typ:`Counter + ~labels + ~name:"tezt_total_attested_commitments_per_slot" + (float value)) + total_attested_commitments_per_slot ; + Cloud.push_metric + cloud + ~help:"Ratio between the number of published and expected commitments" + ~typ:`Gauge + ~name:"tezt_dal_commitments_ratio" + ~labels:[("kind", "published")] + ratio_published_commitments ; + Cloud.push_metric + cloud + ~help:"Ratio between the number of attested and expected commitments" + ~typ:`Gauge + ~name:"tezt_dal_commitments_ratio" + ~labels:[("kind", "attested")] + ratio_attested_commitments ; + Cloud.push_metric + cloud + ~help: + "Ratio between the number of attested and expected commitments per level" + ~typ:`Gauge + ~name:"tezt_dal_commitments_ratio" + ~labels:[("kind", "published_last_level")] + ratio_published_commitments_last_level ; + Cloud.push_metric + cloud + ~help:"Number of commitments expected to be published" + ~typ:`Counter + ~name:"tezt_dal_commitments_total" + ~labels:[("kind", "expected")] + (float_of_int expected_published_commitments) ; + Cloud.push_metric + cloud + ~help:"Number of published commitments " + ~typ:`Counter + ~name:"tezt_dal_commitments_total" + ~labels:[("kind", "published")] + (float_of_int total_published_commitments) ; + Cloud.push_metric + cloud + ~help:"Number of attested commitments" + ~typ:`Counter + ~name:"tezt_dal_commitments_total" + ~labels:[("kind", "attested")] + (float_of_int total_attested_commitments) ; + Cloud.push_metric + cloud + ~help:"Sum of the balances of the etherlink operator" + ~typ:`Gauge + ~name:"tezt_etherlink_operator_balance_total" + (Tez.to_float etherlink_operator_balance_sum) + +let published_level_of_attested_level ~attestation_lag level = + level - attestation_lag + +let update_level_first_commitment_published per_level_info metrics = + match metrics.level_first_commitment_published with + | None -> + if Hashtbl.length per_level_info.published_commitments > 0 then + Some per_level_info.level + else None + | Some l -> Some l + +let update_level_first_commitment_attested ~first_level ~attestation_lag + per_level_info metrics = + match metrics.level_first_commitment_attested with + | None -> + if + Z.popcount per_level_info.attested_commitments > 0 + && per_level_info.level >= first_level + attestation_lag + then Some per_level_info.level + else None + | Some l -> Some l + +let update_total_published_commitments per_level_info metrics = + metrics.total_published_commitments + + Hashtbl.length per_level_info.published_commitments + +let update_expected_published_commitments ~dal_node_producers ~number_of_slots + metrics = + match metrics.level_first_commitment_published with + | None -> 0 + | Some _ -> + (* -1 since we are looking at level n operation submitted at the previous + level. *) + let producers = min (List.length dal_node_producers) number_of_slots in + metrics.expected_published_commitments + producers + +let update_total_attested_commitments per_level_info metrics = + metrics.total_attested_commitments + + Z.popcount per_level_info.attested_commitments + +let update_ratio_published_commitments metrics = + if metrics.expected_published_commitments = 0 then 0. + else + float_of_int metrics.total_published_commitments + *. 100. + /. float_of_int metrics.expected_published_commitments + +let update_ratio_published_commitments_last_level ~dal_node_producers + ~number_of_slots per_level_info metrics = + match metrics.level_first_commitment_published with + | None -> 0. + | Some _ -> + let producers = min (List.length dal_node_producers) number_of_slots in + if producers = 0 then 100. + else + float_of_int (Hashtbl.length per_level_info.published_commitments) + *. 100. /. float_of_int producers + +let update_ratio_attested_commitments ~first_level ~infos ~attestation_lag + per_level_info metrics = + let published_level = + published_level_of_attested_level ~attestation_lag per_level_info.level + in + if published_level <= first_level then ( + Log.warn + "Unable to retrieve information for published level %d because it \ + precedes the earliest available level (%d)." + published_level + first_level ; + metrics.ratio_attested_commitments) + else + match Hashtbl.find_opt infos published_level with + | None -> + Log.warn + "Unexpected error: The level %d is missing in the infos table" + published_level ; + metrics.ratio_attested_commitments + | Some old_per_level_info -> + let n = Hashtbl.length old_per_level_info.published_commitments in + if n = 0 then metrics.ratio_attested_commitments + else + float (Z.popcount per_level_info.attested_commitments) + *. 100. /. float n + +let update_published_and_attested_commitments_per_slot ~first_level ~infos + ~number_of_slots ~attestation_lag per_level_info + total_published_commitments_per_slot total_attested_commitments_per_slot = + let published_level = + published_level_of_attested_level ~attestation_lag per_level_info.level + in + if published_level <= first_level then ( + Log.warn + "Unable to retrieve information for published level %d because it \ + precedes the earliest available level (%d)." + published_level + first_level ; + (total_published_commitments_per_slot, total_attested_commitments_per_slot)) + else + match Hashtbl.find_opt infos published_level with + | None -> + Log.warn + "Unexpected error: The level %d is missing in the infos table" + published_level ; + ( total_published_commitments_per_slot, + total_attested_commitments_per_slot ) + | Some old_per_level_info -> + let published_commitments = old_per_level_info.published_commitments in + for slot_index = 0 to pred number_of_slots do + let is_published = Hashtbl.mem published_commitments slot_index in + let total_published_commitments = + Option.value + ~default:0 + (Hashtbl.find_opt total_published_commitments_per_slot slot_index) + in + let new_total_published_commitments = + if is_published then succ total_published_commitments + else total_published_commitments + in + Hashtbl.replace + total_published_commitments_per_slot + slot_index + new_total_published_commitments ; + (* per_level_info.attested_commitments is a binary + sequence of length parameters.number_of_slots + (e.g. '00111111001110100010101011100101'). + For each index i: + - 1 indicates the slot has been attested + - 0 indicates the slot has not been attested. *) + let is_attested = + Z.testbit per_level_info.attested_commitments slot_index + in + let total_attested_commitments = + Option.value + ~default:0 + (Hashtbl.find_opt total_attested_commitments_per_slot slot_index) + in + let new_total_attested_commitments = + if is_attested then succ total_attested_commitments + else total_attested_commitments + in + Hashtbl.replace + total_attested_commitments_per_slot + slot_index + new_total_attested_commitments + done ; + ( total_published_commitments_per_slot, + total_attested_commitments_per_slot ) + +let update_ratio_attested_commitments_per_baker ~first_level ~infos + ~attestation_lag per_level_info = + let default () = Hashtbl.create 0 in + let published_level = + published_level_of_attested_level ~attestation_lag per_level_info.level + in + if published_level <= first_level then ( + Log.warn + "Unable to retrieve information for published level %d because it \ + precedes the earliest available level (%d)." + published_level + first_level ; + default ()) + else + match Hashtbl.find_opt infos published_level with + | None -> + Log.warn + "Unexpected error: The level %d is missing in the infos table" + published_level ; + default () + | Some published_level_info -> + (* Retrieves the number of published commitments *) + let attestable_slots = + Hashtbl.length published_level_info.published_commitments + in + let table = Hashtbl.(create (length per_level_info.attestations)) in + Hashtbl.to_seq per_level_info.attestations + |> Seq.map (fun (public_key_hash, status) -> + ( public_key_hash, + match status with + (* The baker is in the DAL committee and sent an attestation_with_dal. *) + | Dal_node_helpers.With_DAL attestation_bitset -> + Baker_helpers. + { + attestable_slots; + attested_slots = Z.popcount attestation_bitset; + in_committee = true; + attestation_with_dal = true; + } + (* The baker is out of the DAL committee and sent an attestation_with_dal. *) + | Out_of_committee -> + { + attestable_slots; + attested_slots = 0; + in_committee = false; + attestation_with_dal = false; + } + (* The baker is in the DAL committee but sent either an attestation without DAL, or no attestations. *) + | Without_DAL | Expected_to_DAL_attest -> + { + attestable_slots; + attested_slots = 0; + in_committee = true; + attestation_with_dal = false; + } )) + |> Hashtbl.add_seq table ; + table + +let get ~first_level ~attestation_lag ~dal_node_producers ~number_of_slots + ~infos infos_per_level metrics = + let level_first_commitment_published = + update_level_first_commitment_published infos_per_level metrics + in + let level_first_commitment_attested = + update_level_first_commitment_attested + ~first_level + ~attestation_lag + infos_per_level + metrics + in + (* Metrics below depends on the new value for the metrics above. *) + let metrics = + { + metrics with + level_first_commitment_attested; + level_first_commitment_published; + } + in + let total_published_commitments = + update_total_published_commitments infos_per_level metrics + in + let expected_published_commitments = + update_expected_published_commitments + ~dal_node_producers + ~number_of_slots + metrics + in + let ratio_published_commitments_last_level = + update_ratio_published_commitments_last_level + ~dal_node_producers + ~number_of_slots + infos_per_level + metrics + in + let total_attested_commitments = + update_total_attested_commitments infos_per_level metrics + in + (* Metrics below depends on the new value for the metrics above. *) + let metrics = + { + metrics with + level_first_commitment_attested; + level_first_commitment_published; + total_published_commitments; + expected_published_commitments; + total_attested_commitments; + ratio_published_commitments_last_level; + } + in + let ratio_published_commitments = + update_ratio_published_commitments metrics + in + let ratio_attested_commitments = + update_ratio_attested_commitments + ~first_level + ~infos + ~attestation_lag + infos_per_level + metrics + in + let ratio_attested_commitments_per_baker = + update_ratio_attested_commitments_per_baker + ~first_level + ~infos + ~attestation_lag + infos_per_level + in + let total_published_commitments_per_slot, total_attested_commitments_per_slot + = + update_published_and_attested_commitments_per_slot + ~first_level + ~infos + ~number_of_slots + ~attestation_lag + infos_per_level + metrics.total_published_commitments_per_slot + metrics.total_attested_commitments_per_slot + in + { + level_first_commitment_published; + level_first_commitment_attested; + total_published_commitments; + total_published_commitments_per_slot; + expected_published_commitments; + total_attested_commitments; + total_attested_commitments_per_slot; + ratio_published_commitments; + ratio_attested_commitments; + ratio_published_commitments_last_level; + ratio_attested_commitments_per_baker; + etherlink_operator_balance_sum = + infos_per_level.etherlink_operator_balance_sum; + } diff --git a/tezt/tests/cloud/metrics.mli b/tezt/tests/cloud/metrics.mli new file mode 100644 index 000000000000..a705cf83efa7 --- /dev/null +++ b/tezt/tests/cloud/metrics.mli @@ -0,0 +1,53 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Nomadic Labs, *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +(** Tezt‑cloud metrics helpers. *) + +(** A baker’s public key hash used as a map key. *) +type public_key_hash = PKH of string + +(** Information about a single published DAL commitment. *) +type commitment_info = {commitment : string; publisher_pkh : string} + +type per_level_info = { + level : int; + published_commitments : (int, commitment_info) Hashtbl.t; + attestations : (public_key_hash, Dal_node_helpers.dal_status) Hashtbl.t; + attested_commitments : Z.t; + etherlink_operator_balance_sum : Tez.t; +} + +type t + +(** Default starting value for metrics. *) +val default : t + +(** [aliases] maps delegate PKHs to their human‑readable aliases. *) +val aliases : (string, string) Hashtbl.t + +(** [merge_aliases aliases_map] adds new aliases from [aliases_map] into {!aliases}. *) +val merge_aliases : (string, string) Hashtbl.t option -> unit + +(** Pretty-printing function for metrics. *) +val pp : bakers:Baker_helpers.baker list -> t -> unit + +(** [push ~versions ~cloud metrics] pushes all metrics into Cloud’s Prometheus + registry, attaching optional [versions] labels for each baker PKH. *) +val push : versions:(string, string) Hashtbl.t -> cloud:Cloud.t -> t -> unit + +(** [get ~first_level ~attestation_lag ~dal_node_producers ~number_of_slots + ~infos infos_per_level metrics] updates the [metrics] statistics. *) +val get : + first_level:int -> + attestation_lag:int -> + dal_node_producers:'a list -> + number_of_slots:int -> + infos:(int, per_level_info) Hashtbl.t -> + per_level_info -> + t -> + t -- GitLab From bce76ebb5a3325d1b0501c27246badfc0fc9486d Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 21 Jul 2025 21:51:30 +0100 Subject: [PATCH 2/2] Tezt_cloud: Refactor monitoring_app --- tezt/tests/cloud/dal.ml | 1037 +-------------------------- tezt/tests/cloud/monitoring_app.ml | 1012 ++++++++++++++++++++++++++ tezt/tests/cloud/monitoring_app.mli | 46 ++ 3 files changed, 1076 insertions(+), 1019 deletions(-) create mode 100644 tezt/tests/cloud/monitoring_app.ml create mode 100644 tezt/tests/cloud/monitoring_app.mli diff --git a/tezt/tests/cloud/dal.ml b/tezt/tests/cloud/dal.ml index 6317467f1e60..8d93c9d215cd 100644 --- a/tezt/tests/cloud/dal.ml +++ b/tezt/tests/cloud/dal.ml @@ -99,1021 +99,6 @@ type t = { otel : string option; } -module Monitoring_app = struct - (* time interval in hours at which to submit report *) - let report_interval = 6 - - let pp_delegate fmt delegate_pkh = - match Hashtbl.find_opt Metrics.aliases delegate_pkh with - | None -> Format.fprintf fmt "%s" delegate_pkh - | Some alias -> - Format.fprintf fmt "%s : %-26s" (String.sub delegate_pkh 0 7) alias - - (** [network_to_image_url network] return an image for each monitored network. *) - let network_to_image_url : Network.t -> string = function - | `Rionet -> - "https://gitlab.com/tezos/tezos/-/raw/master/tezt/lib_cloud/assets/rionet.png" - | `Mainnet -> - "https://gitlab.com/tezos/tezos/-/raw/master/tezt/lib_cloud/assets/mainnet.png" - | `Ghostnet -> - "https://gitlab.com/tezos/tezos/-/raw/master/tezt/lib_cloud/assets/ghostnet.png" - | `Nextnet _ | `Seoulnet -> - "https://gitlab.com/tezos/tezos/-/raw/master/tezt/lib_cloud/assets/seoulnet.png" - | `Sandbox | `Weeklynet _ -> "no_image_yet" - - module Format_app = struct - (* Helper for Slack App message format block-kit - See: https://api.slack.com/reference/block-kit/ - *) - - let image ~url ~alt = - let open Ezjsonm in - `O - [ - ("type", string "image"); - ("image_url", string url); - ("alt_text", string alt); - ] - - (* [section content ?accessory] creates Slack App message blocks - with an optional accessory. - - The function joins content strings with newlines, formats them - using mrkdwn, and returns properly structured JSON objects for - Slack's Block Kit. *) - let section content ?accessory () = - let open Ezjsonm in - if List.is_empty content then [] - else - let text = String.concat "\n" content in - [ - `O - (("type", string "section") - :: ("text", `O [("type", string "mrkdwn"); ("text", string text)]) - :: Option.fold - ~none:[] - ~some:(fun data -> [("accessory", data)]) - accessory); - ] - - let make_payload ~slack_channel_id ?ts content = - let open Ezjsonm in - `O - (("channel", string slack_channel_id) - :: ("blocks", `A content) - :: Option.fold ~none:[] ~some:(fun ts -> [("thread_ts", string ts)]) ts - ) - end - - let post_message ?ts ~slack_channel_id ~slack_bot_token data = - let data = Format_app.make_payload ?ts ~slack_channel_id data in - let slack_endpoint = - Endpoint.make ~scheme:"https" ~host:"slack.com" ~port:443 () - in - let rpc = - RPC_core.make - ~data:(Data data) - POST - ["api"; "chat.postMessage"] - (fun _json -> ()) - in - let* response = - RPC_core.call_raw - ~extra_headers: - [("Authorization", Format.sprintf "Bearer %s" slack_bot_token)] - slack_endpoint - rpc - in - let thread_id = - let open JSON in - parse ~origin:"DAL.Monitoring_app.chat_postMessage" response.body - |-> "ts" |> as_string - in - return thread_id - - module Tasks = struct - let endpoint_from_prometheus_query ~query = - let fail ~uri_part = - Test.fail - "DAL.Monitoring_app.Tasks.endpoint_from_prometheus_query: expecting \ - a prometheus %s" - uri_part - in - let uri = - match Prometheus.get_query_endpoint ~query with - | None -> fail ~uri_part:"endpoint" - | Some endpoint -> endpoint - in - let scheme, host, port = - match Uri.(scheme uri, host uri, port uri) with - | Some scheme, Some host, Some port -> (scheme, host, port) - | None, _, _ -> fail ~uri_part:"scheme" - | _, None, _ -> fail ~uri_part:"host" - | _, _, None -> fail ~uri_part:"port" - in - let query_string = - (* Fixme: warn about `k` being dropped in the second case. We - need to keep only list of size 1 because of the way RPC_core - is implemented. Should probably be fixed soon or later. *) - List.filter_map - (function k, [v] -> Some (k, v) | _k, _ -> None) - (Uri.query uri) - in - let path = String.split_on_char '/' (Uri.path uri) in - let endpoint = Endpoint.make ~host ~scheme ~port () in - (`endpoint endpoint, `query query_string, `path path) - - let fetch ~origin ~query ~decoder = - let open RPC_core in - let `endpoint endpoint, `query query_string, `path path = - endpoint_from_prometheus_query ~query - in - let rpc = make ~query_string GET path decoder in - Lwt.catch - (fun () -> - let* response = call_raw endpoint rpc in - Lwt.return (decode_raw ~origin rpc response.body)) - (fun exn -> - Log.warn - "Unexpected error while fetching prometheus query (origin : %s): \ - '%s'" - origin - (Printexc.to_string exn) ; - Lwt.return_none) - - let decoder_prometheus_float json = - let open JSON in - let status = json |-> "status" |> as_string in - if not (String.equal status "success") then (* fixme: warning *) - None - else - let opt = - json |-> "data" |-> "result" |> as_list |> Fun.flip List.nth_opt 0 - in - match opt with - | None -> None - | Some x -> - x |-> "value" |> as_list |> Fun.flip List.nth_opt 1 - |> Option.map as_float - - let view_ratio_attested_over_published - (`attested attested, `published published) = - let open Format in - match (attested, published) with - | Some 0., Some 0. -> None - | None, None -> None - | Some attested, Some 0. -> - let s = sprintf "`unk` (`%d`/`0`)" (Int.of_float attested) in - Some s - | Some attested, None -> - let s = sprintf "`unk` (`%d`/`?`)" (Int.of_float attested) in - Some s - | None, Some published -> - let s = sprintf "`unk` (`?`/`%d`)" (Int.of_float published) in - Some s - | Some attested, Some published -> - let ratio = attested /. published *. 100. in - let s = - sprintf - "`%s` (`%d/%d`)" - (sprintf "%.2f%%" ratio) - (Int.of_float attested) - (Int.of_float published) - in - Some s - - let fetch_slot_info ~slot_index = - let query s = - Format.sprintf - "increase(tezt_total_%s_commitments_per_slot{slot_index=\"%d\"}[%dh])" - s - slot_index - report_interval - in - let decoder = decoder_prometheus_float in - let* attested = - fetch - ~origin:"fetch_slot_info.attested" - ~decoder - ~query:(query "attested") - in - let* published = - fetch - ~origin:"fetch_slot_info.published" - ~decoder - ~query:(query "published") - in - Lwt.return - (`slot_index slot_index, `attested attested, `published published) - - let view_slot_info slot_info = - let slots_info = - List.filter_map - (fun (`slot_index slot_index, attested, published) -> - view_ratio_attested_over_published (attested, published) - |> Option.map - (Format.sprintf ":black_small_square: `%02d` : %s" slot_index)) - slot_info - in - if List.is_empty slots_info then [] - else - "• Percentage of attested over published DAL commitments per slot:" - :: slots_info - - let fetch_dal_commitments_total_info () = - let query s = - Format.sprintf - {|increase(tezt_dal_commitments_total{kind="%s"}[%dh])|} - s - report_interval - in - let decoder = decoder_prometheus_float in - let* attested = - fetch - ~origin:"fetch_dal_commitments_total.attested" - ~decoder - ~query:(query "attested") - in - let* published = - fetch - ~origin:"fetch_dal_commitments_total.published" - ~decoder - ~query:(query "published") - in - let ratio = - view_ratio_attested_over_published - (`attested attested, `published published) - in - let ratio_view = - (Format.sprintf - "• Percentage of attested over published DAL commitments: %s") - (Option.value ~default:"unk" ratio) - in - let slot_size = 126_944 (* TODO: do not hard-code this *) in - let bandwidth = - Option.map - (fun x -> - Format.sprintf - "%.2f" - (x *. float_of_int slot_size - /. float_of_int (1024 * report_interval * 3600))) - attested - in - let bandwidth_view = - Format.sprintf - "• Bandwidth: %s KiB/s" - (Option.value ~default:"unk" bandwidth) - in - Lwt.return (ratio_view, bandwidth_view) - - let pp_stake fmt stake_ratio = - Format.fprintf fmt "%.2f%% stake" (stake_ratio *. 100.) - - type baker_attestation_numbers = { - (* The rate of attested/attestable slots for this baker. - An attestatble slot is a published slot for which the baker is in - the DAL committee at attestation level. - *) - slot_attestation_rate : float option; - (* The rate of attestation_with_dal when the baker is in the DAL committee. - This ratio is especially useful for small bakers which are rarely in - the DAL committee, hence have very few opportunities to attest slots - when publication is quite rare. - *) - dal_content_rate : float option; - (* This boolean is true if the baker sent at least one attestation while - out of the DAL committee. This is useful to detect bakers who attest - but have a broken DAL setup preventing them to send attestations while - in DAL committee. - *) - attests_while_out_of_dal_committee : bool; - } - - type baker_infos = { - address : Metrics.public_key_hash; - attest_infos : baker_attestation_numbers; - stake_fraction : float; - } - - let pp_baker_light fmt {address = PKH delegate_pkh; stake_fraction; _} = - Format.fprintf - fmt - "%a (%a)" - pp_delegate - delegate_pkh - pp_stake - stake_fraction - - let pp_baker_dal_status fmt baker = - Format.fprintf - fmt - "%a (%s)" - pp_baker_light - baker - (match baker.attest_infos.dal_content_rate with - | None -> "Never sent attestations while in DAL committee" - | Some 0. -> "OFF" - | Some 1. -> "ON" - | Some x -> Format.sprintf "ACTIVE %.0f%% of the time" (x *. 100.)) - - let pp_bakers_all_infos fmt baker = - Format.fprintf - fmt - "%s - %a" - (Option.fold - ~none:"Never was in committee when slots were produced" - ~some:(fun value -> - let percentage = value *. 100. in - Format.sprintf "%.2f%%" percentage) - baker.attest_infos.slot_attestation_rate) - pp_baker_dal_status - baker - - let fetch_baker_info ~tz1 ~origin = - let query = - Format.sprintf - "sum_over_time(tezt_dal_commitments_attested{attester=\"%s\"}[%dh])" - tz1 - report_interval - in - let* attested = fetch ~decoder:decoder_prometheus_float ~query ~origin in - let query = - Format.sprintf - "sum_over_time(tezt_dal_commitments_attestable{attester=\"%s\"}[%dh])" - tz1 - report_interval - in - let* attestable = - fetch ~decoder:decoder_prometheus_float ~query ~origin - in - let query = - Format.sprintf - "avg_over_time(tezt_dal_attestation_sent{attester=\"%s\"}[%dh])" - tz1 - report_interval - in - let* dal_content_rate = - fetch ~decoder:decoder_prometheus_float ~query ~origin - in - let query = - Format.sprintf - "sum_over_time(tezt_attestation_sent_when_out_of_dal_committee{attester=\"%s\"}[6h])" - tz1 - in - let* out_attestations = - fetch ~decoder:decoder_prometheus_float ~query ~origin - in - let attests_while_out_of_dal_committee = - Option.is_some out_attestations - in - let slot_attestation_rate = - match (attested, attestable) with - | None, _ | _, None | _, Some 0. -> None - | Some attested, Some attestable -> Some (attested /. attestable) - in - return - { - slot_attestation_rate; - dal_content_rate; - attests_while_out_of_dal_committee; - } - - let get_current_cycle endpoint = - let* {cycle; _} = - RPC_core.call endpoint (RPC.get_chain_block_helper_current_level ()) - in - Lwt.return cycle - - let get_bakers_with_staking_power endpoint cycle = - RPC_core.call endpoint (RPC.get_stake_distribution ~cycle ()) - - type classified_bakers = { - mute_bakers : baker_infos list; - muted_by_dal : baker_infos list; - dal_zero : baker_infos list; - dal_on : baker_infos list; - no_shards : baker_infos list; - dal_off : baker_infos list; - } - - let fetch_bakers_info endpoint = - let* cycle = get_current_cycle endpoint in - let* bakers = get_bakers_with_staking_power endpoint cycle in - let total_baking_power = - List.fold_left - (fun acc RPC.{baking_power; _} -> acc + baking_power) - 0 - bakers - in - let* bakers_info = - Lwt_list.filter_map_p - (fun RPC.{delegate; baking_power} -> - let* attest_infos = - fetch_baker_info - ~origin:(Format.sprintf "fetch_baker_info.%s" delegate) - ~tz1:delegate - in - let stake_fraction = - float_of_int baking_power /. float_of_int total_baking_power - in - Lwt.return_some - {address = PKH delegate; attest_infos; stake_fraction}) - bakers - in - let rec classify_bakers already_classified = function - | [] -> already_classified - | ({attest_infos; _} as baker) :: tl -> ( - match attest_infos.dal_content_rate with - | None -> - if attest_infos.attests_while_out_of_dal_committee then - classify_bakers - { - already_classified with - muted_by_dal = baker :: already_classified.muted_by_dal; - } - tl - else - classify_bakers - { - already_classified with - mute_bakers = baker :: already_classified.mute_bakers; - } - tl - | Some 0. -> - classify_bakers - { - already_classified with - dal_off = baker :: already_classified.dal_off; - } - tl - | _ -> ( - match attest_infos.slot_attestation_rate with - | None -> - classify_bakers - { - already_classified with - no_shards = baker :: already_classified.no_shards; - } - tl - | Some 0. -> - classify_bakers - { - already_classified with - dal_zero = baker :: already_classified.dal_zero; - } - tl - | _ -> - classify_bakers - { - already_classified with - dal_on = baker :: already_classified.dal_on; - } - tl)) - in - let {mute_bakers; muted_by_dal; dal_zero; dal_on; no_shards; dal_off} = - classify_bakers - { - mute_bakers = []; - muted_by_dal = []; - dal_zero = []; - dal_on = []; - no_shards = []; - dal_off = []; - } - bakers_info - in - let ( >> ) cmp1 cmp2 x y = - match cmp1 x y with 0 -> cmp2 x y | cmp -> cmp - in - let stake_descending x y = - Float.compare y.stake_fraction x.stake_fraction - in - let attestation_rate_ascending x y = - Option.compare - Float.compare - x.attest_infos.slot_attestation_rate - y.attest_infos.slot_attestation_rate - in - let dal_mention_perf_ascending x y = - Option.compare - Float.compare - x.attest_infos.dal_content_rate - y.attest_infos.dal_content_rate - in - let mute_bakers = List.sort stake_descending mute_bakers in - let muted_by_dal = List.sort stake_descending muted_by_dal in - let dal_zero = - List.sort (stake_descending >> dal_mention_perf_ascending) dal_zero - in - let no_shards = - List.sort (stake_descending >> dal_mention_perf_ascending) no_shards - in - let dal_on = - List.sort - (attestation_rate_ascending >> dal_mention_perf_ascending - >> stake_descending) - dal_on - in - let dal_off = List.sort stake_descending dal_off in - (* `group_by n l` outputs the list of lists with the same elements as `l` - but with `n` elements per list (except the last one). - For instance - `group_by 4 [a_1, ..., a_10] = [[a_1, ..., a_4], [a_5, ..., a_8], [a_9, a_10]]` - *) - let group_by n = - let rec bis local_acc main_acc k = function - | [] -> List.rev (List.rev local_acc :: main_acc) - | l when k = 0 -> bis [] (List.rev local_acc :: main_acc) n l - | hd :: tl -> bis (hd :: local_acc) main_acc (k - 1) tl - in - bis [] [] n - in - let agglomerate_infos bakers = - let nb, stake = - List.fold_left - (fun (nb, stake_acc) {stake_fraction; _} -> - (nb + 1, stake_acc +. stake_fraction)) - (0, 0.) - bakers - in - Format.sprintf - "They are %d representing %.2f%% of the stake" - nb - (stake *. 100.) - in - let encapsulate_in_code_block strings = ("```" :: strings) @ ["```"] in - let display catch_phrase printer bakers = - if bakers = [] then [] - else - [catch_phrase; agglomerate_infos bakers] - :: List.map - encapsulate_in_code_block - (group_by 20 (List.map (Format.asprintf "%a" printer) bakers)) - in - let display_muted = - display - ":alert: *Those bakers never sent attestations.*" - pp_baker_light - mute_bakers - in - let display_muted_by_DAL = - display - ":alert: *Those bakers never sent attestation while in DAL \ - committee, however they sent attestations when they are out of it. \ - This is quite unexpected. They probably have an issue.*" - pp_baker_light - muted_by_dal - in - let display_zero = - display - ":triangular_flag_on_post: *Those bakers have a broken DAL node. \ - They send attestations with a DAL content, but do not attest any \ - slots.*" - pp_baker_dal_status - dal_zero - in - let display_on = - display - ":white_check_mark: *Those bakers sent attestations with a DAL \ - content.*" - pp_bakers_all_infos - dal_on - in - let display_off = - display - ":x: *Those bakers never turned their DAL on.*" - pp_baker_light - dal_off - in - let display_no_shards = - display - ":microscope: *Those bakers seems to have a working DAL node, but \ - never were in committee when slots were produced, hence we cannot \ - say much about how well it works.*" - pp_baker_light - no_shards - in - Lwt.return - ((["Details of bakers performance:"] :: display_muted_by_DAL) - @ display_zero @ display_on @ display_no_shards @ display_off - @ display_muted) - - let fetch_slots_info () = - let* data = - (* fixme: should use protocol parameterized number of slots *) - Lwt_list.map_p - (fun slot_index -> fetch_slot_info ~slot_index) - (List.init 32 Fun.id) - in - Lwt.return (view_slot_info data) - - let action ~slack_bot_token ~slack_channel_id ~configuration endpoint () = - let* endpoint in - let network = configuration.network in - let title_info = - Format.sprintf - "*DAL report* for the *%s* network over the last %d hours." - (String.capitalize_ascii (Network.to_string network)) - report_interval - in - let* ratio_dal_commitments_total_info, bandwidth_info = - fetch_dal_commitments_total_info () - in - let* slots_info = fetch_slots_info () in - let network_overview_info = - bandwidth_info :: ratio_dal_commitments_total_info :: slots_info - in - let data = - let open Format_app in - section [title_info] () - @ section ["*Network overview*"] () - @ section - network_overview_info - ~accessory: - (Format_app.image - ~url:(network_to_image_url network) - ~alt:(Network.to_string network)) - () - in - let* thread_id = post_message ~slack_channel_id ~slack_bot_token data in - let* bakers_info = fetch_bakers_info endpoint in - Lwt_list.iter_s - (fun to_post -> - let data = - let open Format_app in - section to_post () - in - let* _ts = - post_message ~ts:thread_id ~slack_channel_id ~slack_bot_token data - in - Lwt.return_unit) - bakers_info - - (* Relies on UTC (Universal Time Coordinated). - Paris operates on Central European Time (CET), which is UTC+1 - during standard time (winter months). During daylight saving - time (summer months), Paris switches to Central European Summer - Time (CEST), which is UTC+2. - *) - let register_chronos_task cloud ~configuration endpoint = - match Cloud.notifier cloud with - | Notifier_null -> () - | Notifier_slack {slack_bot_token; slack_channel_id; _} -> - let task = - let action () = - action - ~slack_bot_token - ~slack_channel_id - ~configuration - endpoint - () - in - let tm = Format.sprintf "0 0-23/%d * * *" report_interval in - Chronos.task ~name:"network-overview" ~tm ~action () - in - Cloud.register_chronos_task cloud task - end - - module Alert = struct - let report_lost_dal_rewards ~slack_channel_id ~slack_bot_token ~network - ~level ~cycle ~lost_dal_rewards = - let data = - let header = - Format.sprintf - "*[lost-dal-rewards]* On network `%s`, delegates have lost DAL \ - rewards at cycle `%d`, level `%d`. \ - " - (Network.to_string network) - cycle - level - (Network.to_string network) - level - in - let content = - List.map - (fun (`delegate delegate, `change change) -> - Format.asprintf - ":black_small_square: %a has missed ~%.1f tez DAL attestation \ - rewards" - pp_delegate - delegate - (float_of_int change /. 1_000_000.)) - lost_dal_rewards - in - Format_app.section (header :: content) () - in - let* _ts = post_message ~slack_channel_id ~slack_bot_token data in - Lwt.return_unit - - let check_for_lost_dal_rewards t ~metadata = - match Cloud.notifier t.cloud with - | Notifier_null -> unit - | Notifier_slack {slack_channel_id; slack_bot_token; _} -> - let cycle = JSON.(metadata |-> "level_info" |-> "cycle" |> as_int) in - let level = JSON.(metadata |-> "level_info" |-> "level" |> as_int) in - let balance_updates = - JSON.(metadata |-> "balance_updates" |> as_list) - in - let lost_dal_rewards = - List.filter_map - (fun balance_update -> - let category = - JSON.(balance_update |-> "category" |> as_string_opt) - in - match category with - | None -> None - | Some category -> - if String.equal category "lost DAL attesting rewards" then - let delegate = - JSON.(balance_update |-> "delegate" |> as_string) - in - let change = - JSON.(balance_update |-> "change" |> as_int) - in - Some (`delegate delegate, `change change) - else None) - balance_updates - in - if List.is_empty lost_dal_rewards then unit - else - let network = t.configuration.network in - report_lost_dal_rewards - ~slack_channel_id - ~slack_bot_token - ~network - ~level - ~cycle - ~lost_dal_rewards - - let check_for_lost_dal_rewards t ~metadata = - Lwt.catch - (fun () -> check_for_lost_dal_rewards t ~metadata) - (fun exn -> - Log.warn - "Monitor_app.Alert.check_for_lost_dal_rewards: unexpected error: \ - '%s'" - (Printexc.to_string exn) ; - unit) - - let report_dal_accusations ~slack_channel_id ~slack_bot_token ~network - ~level ~cycle dal_accusations = - let data = - let header = - Format.sprintf - "*[dal-accusations]* On network `%s`, delegates have been accused \ - at cycle `%d`, level `%d`." - (Network.to_string network) - cycle - level - in - let content = - List.map - (fun ( `attestation_level attestation_level, - `slot_index slot_index, - `delegate delegate, - `op_hash hash ) -> - Format.asprintf - ":black_small_square: %a for attesting slot index `%d` at \ - level `%d`. " - pp_delegate - delegate - slot_index - attestation_level - (Network.to_string network) - hash) - dal_accusations - in - Format_app.section (header :: content) () - in - let* _ts = post_message ~slack_channel_id ~slack_bot_token data in - Lwt.return_unit - - let check_for_dal_accusations t ~cycle ~level ~operations ~endpoint = - match Cloud.notifier t.cloud with - | Notifier_null -> unit - | Notifier_slack {slack_channel_id; slack_bot_token; _} -> - let open JSON in - let accusations = - operations |> as_list |> Fun.flip List.nth 2 |> as_list - in - let dal_entrapments = - List.filter_map - (fun accusation -> - let contents = - accusation |-> "contents" |> as_list |> Fun.flip List.nth 0 - in - match contents |-> "kind" |> as_string_opt with - | Some "dal_entrapment_evidence" -> - let attestation_level = - contents |-> "attestation" |-> "operations" |-> "level" - |> as_int - in - let slot_index = contents |-> "slot_index" |> as_int in - let shard = - contents |-> "shard_with_proof" |-> "shard" |> as_list - |> Fun.flip List.nth 0 |> as_int - in - let hash = accusation |-> "hash" |> as_string in - Some - ( `attestation_level attestation_level, - `shard shard, - `slot_index slot_index, - `op_hash hash ) - | None | Some _ -> None) - accusations - in - (* todo: optimize to avoid too many RPC calls (?) *) - let* dal_accusations = - Lwt_list.map_p - (fun ( `attestation_level attestation_level, - `shard shard, - `slot_index slot_index, - `op_hash hash ) -> - let* json = - RPC_core.call - endpoint - (RPC.get_chain_block_context_dal_shards - ~level:attestation_level - ()) - in - let assignment = - let open JSON in - List.find_opt - (fun assignment -> - let indexes = - assignment |-> "indexes" |> as_list |> List.map as_int - in - List.mem shard indexes) - (json |> as_list) - in - let delegate = - Option.fold - ~none:"should_not_happen" - ~some:(fun x -> x |-> "delegate" |> as_string) - assignment - in - return - ( `attestation_level attestation_level, - `slot_index slot_index, - `delegate delegate, - `op_hash hash )) - dal_entrapments - in - if List.is_empty dal_accusations then unit - else - let network = t.configuration.network in - report_dal_accusations - ~slack_channel_id - ~slack_bot_token - ~network - ~cycle - ~level - dal_accusations - - let check_for_dal_accusations t ~cycle ~level ~operations ~endpoint = - Lwt.catch - (fun () -> - check_for_dal_accusations t ~cycle ~level ~operations ~endpoint) - (fun exn -> - Log.warn - "Monitor_app.Alert.check_for_dal_accusations: unexpected error: \ - '%s'" - (Printexc.to_string exn) ; - unit) - - type attestation_transition = Stopped_attesting | Restarted_attesting - - let report_attestation_transition ~slack_channel_id ~slack_bot_token - ~network ~level ~pkh ~transition ~attestation_percentage = - let data = - let header = - Format.asprintf - (match transition with - | Restarted_attesting -> - "*[dal-attester-is-back]* On network `%s` at level `%d`, \ - delegate `%a` who was under the reward threshold is again \ - above threshold. New attestation rate is %.2f%%." - | Stopped_attesting -> - "*[dal-attester-dropped]* On network `%s` at level `%d`, \ - delegate `%a` DAL attestation rate dropped under the reward \ - threshold. New attestation rate is %.2f%%.") - (Network.to_string network) - level - pp_delegate - pkh - attestation_percentage - in - Format_app.section [header] () - in - let* _ts = post_message ~slack_channel_id ~slack_bot_token data in - Lwt.return_unit - - let check_for_recently_missed_a_lot = - (* TODO: This correspond to the number of bakers which activated DAL on mainnet. - We expect more bakers to run the DAL, this initial size might be then increased - to avoid rescaling of the hash table. - *) - let prev_was_enough = Hashtbl.create 50 in - let to_treat_delegates = ref [] in - fun t ~level ~metadata -> - match Cloud.notifier t.cloud with - | Notifier_null -> unit - | Notifier_slack {slack_bot_token; slack_channel_id; _} -> ( - let cycle_position = - JSON.(metadata |-> "level_info" |-> "cycle_position" |> as_int) - in - (* We do not run this function during the first 20 levels of a cycle, - since the attestation rate are not relevant when so few levels happened. - Especially since 0 attestation out of 0 attestable slots is considered - as 100% attestation rate. *) - if cycle_position < 20 then unit - else - let endpoint = t.some_node_rpc_endpoint in - let treat_delegate pkh = - let* infos = - RPC_core.call endpoint - @@ RPC.get_chain_block_context_delegate pkh - in - let dal_participation = JSON.(infos |-> "dal_participation") in - let attest_enough = - JSON.( - dal_participation |-> "sufficient_dal_participation" - |> as_bool) - in - match Hashtbl.find_opt prev_was_enough pkh with - | None -> - let () = Hashtbl.add prev_was_enough pkh attest_enough in - unit - | Some prev_status -> - if prev_status = attest_enough then unit - else - let attestable = - JSON.( - dal_participation |-> "delegate_attestable_dal_slots" - |> as_float) - in - (* If no slots was attestatble, the value of [attest_enough] - is [true], but it is quite meaningless, since we do not want - to state that the DAL node is working well again simply - because there was nothing to attest. *) - if attestable = 0. then unit - else ( - Hashtbl.add prev_was_enough pkh attest_enough ; - let network = t.configuration.network in - let attested = - JSON.( - dal_participation |-> "delegate_attested_dal_slots" - |> as_float) - in - let attestation_percentage = - 100. *. attested /. attestable - in - let transition = - if attest_enough then Restarted_attesting - else Stopped_attesting - in - report_attestation_transition - ~slack_channel_id - ~slack_bot_token - ~network - ~level - ~pkh - ~transition - ~attestation_percentage) - in - let refill_delegates_to_treat () = - let query_string = [("active", "true")] in - let* delegates = - RPC_core.call endpoint - @@ RPC.get_chain_block_context_delegates ~query_string () - in - to_treat_delegates := delegates ; - unit - in - (* To avoid spawning a high number of RPCs simultaneously, we treat 2 delegates at each level. *) - let to_treat_now, treat_later = - Tezos_stdlib.TzList.split_n 2 !to_treat_delegates - in - let* () = Lwt_list.iter_p treat_delegate to_treat_now in - match treat_later with - | [] -> refill_delegates_to_treat () - | remaining_delegates -> - to_treat_delegates := remaining_delegates ; - unit) - - let check_for_recently_missed_a_lot t ~level ~metadata = - Lwt.catch - (fun () -> check_for_recently_missed_a_lot t ~level ~metadata) - (fun exn -> - Log.warn - "Monitor_app.Alert.check_for_recently_missed_a_lot: unexpected \ - error: '%s'" - (Printexc.to_string exn) ; - unit) - end -end - let get_infos_per_level t ~level ~metadata = let open Dal_node_helpers in let open Metrics in @@ -1230,13 +215,21 @@ let get_infos_per_level t ~level ~metadata = let open Monitoring_app.Alert in let* () = check_for_dal_accusations - t + ~cloud:t.cloud + ~network:t.configuration.network ~cycle ~level ~operations ~endpoint:t.some_node_rpc_endpoint in - let* () = check_for_recently_missed_a_lot t ~level ~metadata in + let* () = + check_for_recently_missed_a_lot + ~cloud:t.cloud + ~endpoint:t.some_node_rpc_endpoint + ~network:t.configuration.network + ~level + ~metadata + in unit else Lwt.return_unit in @@ -2145,7 +1138,10 @@ let on_new_cycle t ~level = in (* This action is performed only if `--dal-slack-webhook` is provided. *) if t.configuration.network_health_monitoring then - Monitoring_app.Alert.check_for_lost_dal_rewards t ~metadata + Monitoring_app.Alert.check_for_lost_dal_rewards + ~cloud:t.cloud + ~network:t.configuration.network + ~metadata else Lwt.return_unit let on_new_block t ~level = @@ -2535,7 +1531,10 @@ let register (module Cli : Scenarios_cli.Dal) = Lwt.return agent in if configuration.network_health_monitoring then - Monitoring_app.Tasks.register_chronos_task cloud ~configuration endpoint ; + Monitoring_app.Tasks.register_chronos_task + cloud + ~network:configuration.network + endpoint ; let* t = init ~configuration etherlink_configuration cloud next_agent in Lwt.wakeup resolver_endpoint t.some_node_rpc_endpoint ; toplog "Starting main loop" ; diff --git a/tezt/tests/cloud/monitoring_app.ml b/tezt/tests/cloud/monitoring_app.ml new file mode 100644 index 000000000000..c0670528dfa1 --- /dev/null +++ b/tezt/tests/cloud/monitoring_app.ml @@ -0,0 +1,1012 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Nomadic Labs, *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +(* time interval in hours at which to submit report *) +let report_interval = 6 + +let pp_delegate fmt delegate_pkh = + match Hashtbl.find_opt Metrics.aliases delegate_pkh with + | None -> Format.fprintf fmt "%s" delegate_pkh + | Some alias -> + Format.fprintf fmt "%s : %-26s" (String.sub delegate_pkh 0 7) alias + +(** [network_to_image_url network] return an image for each monitored network. *) +let network_to_image_url : Network.t -> string = function + | `Rionet -> + "https://gitlab.com/tezos/tezos/-/raw/master/tezt/lib_cloud/assets/rionet.png" + | `Mainnet -> + "https://gitlab.com/tezos/tezos/-/raw/master/tezt/lib_cloud/assets/mainnet.png" + | `Ghostnet -> + "https://gitlab.com/tezos/tezos/-/raw/master/tezt/lib_cloud/assets/ghostnet.png" + | `Nextnet _ | `Seoulnet -> + "https://gitlab.com/tezos/tezos/-/raw/master/tezt/lib_cloud/assets/seoulnet.png" + | `Sandbox | `Weeklynet _ -> "no_image_yet" + +module Format_app = struct + (* Helper for Slack App message format block-kit + See: https://api.slack.com/reference/block-kit/ + *) + + let image ~url ~alt = + let open Ezjsonm in + `O + [ + ("type", string "image"); + ("image_url", string url); + ("alt_text", string alt); + ] + + (* [section content ?accessory] creates Slack App message blocks + with an optional accessory. + + The function joins content strings with newlines, formats them + using mrkdwn, and returns properly structured JSON objects for + Slack's Block Kit. *) + let section content ?accessory () = + let open Ezjsonm in + if List.is_empty content then [] + else + let text = String.concat "\n" content in + [ + `O + (("type", string "section") + :: ("text", `O [("type", string "mrkdwn"); ("text", string text)]) + :: Option.fold + ~none:[] + ~some:(fun data -> [("accessory", data)]) + accessory); + ] + + let make_payload ~slack_channel_id ?ts content = + let open Ezjsonm in + `O + (("channel", string slack_channel_id) + :: ("blocks", `A content) + :: Option.fold ~none:[] ~some:(fun ts -> [("thread_ts", string ts)]) ts) +end + +let post_message ?ts ~slack_channel_id ~slack_bot_token data = + let data = Format_app.make_payload ?ts ~slack_channel_id data in + let slack_endpoint = + Endpoint.make ~scheme:"https" ~host:"slack.com" ~port:443 () + in + let rpc = + RPC_core.make + ~data:(Data data) + POST + ["api"; "chat.postMessage"] + (fun _json -> ()) + in + let* response = + RPC_core.call_raw + ~extra_headers: + [("Authorization", Format.sprintf "Bearer %s" slack_bot_token)] + slack_endpoint + rpc + in + let thread_id = + let open JSON in + parse ~origin:"DAL.Monitoring_app.chat_postMessage" response.body + |-> "ts" |> as_string + in + return thread_id + +module Tasks = struct + let endpoint_from_prometheus_query ~query = + let fail ~uri_part = + Test.fail + "DAL.Monitoring_app.Tasks.endpoint_from_prometheus_query: expecting a \ + prometheus %s" + uri_part + in + let uri = + match Prometheus.get_query_endpoint ~query with + | None -> fail ~uri_part:"endpoint" + | Some endpoint -> endpoint + in + let scheme, host, port = + match Uri.(scheme uri, host uri, port uri) with + | Some scheme, Some host, Some port -> (scheme, host, port) + | None, _, _ -> fail ~uri_part:"scheme" + | _, None, _ -> fail ~uri_part:"host" + | _, _, None -> fail ~uri_part:"port" + in + let query_string = + (* Fixme: warn about `k` being dropped in the second case. We + need to keep only list of size 1 because of the way RPC_core + is implemented. Should probably be fixed soon or later. *) + List.filter_map + (function k, [v] -> Some (k, v) | _k, _ -> None) + (Uri.query uri) + in + let path = String.split_on_char '/' (Uri.path uri) in + let endpoint = Endpoint.make ~host ~scheme ~port () in + (`endpoint endpoint, `query query_string, `path path) + + let fetch ~origin ~query ~decoder = + let open RPC_core in + let `endpoint endpoint, `query query_string, `path path = + endpoint_from_prometheus_query ~query + in + let rpc = make ~query_string GET path decoder in + Lwt.catch + (fun () -> + let* response = call_raw endpoint rpc in + Lwt.return (decode_raw ~origin rpc response.body)) + (fun exn -> + Log.warn + "Unexpected error while fetching prometheus query (origin : %s): '%s'" + origin + (Printexc.to_string exn) ; + Lwt.return_none) + + let decoder_prometheus_float json = + let open JSON in + let status = json |-> "status" |> as_string in + if not (String.equal status "success") then (* fixme: warning *) + None + else + let opt = + json |-> "data" |-> "result" |> as_list |> Fun.flip List.nth_opt 0 + in + match opt with + | None -> None + | Some x -> + x |-> "value" |> as_list |> Fun.flip List.nth_opt 1 + |> Option.map as_float + + let view_ratio_attested_over_published + (`attested attested, `published published) = + let open Format in + match (attested, published) with + | Some 0., Some 0. -> None + | None, None -> None + | Some attested, Some 0. -> + let s = sprintf "`unk` (`%d`/`0`)" (Int.of_float attested) in + Some s + | Some attested, None -> + let s = sprintf "`unk` (`%d`/`?`)" (Int.of_float attested) in + Some s + | None, Some published -> + let s = sprintf "`unk` (`?`/`%d`)" (Int.of_float published) in + Some s + | Some attested, Some published -> + let ratio = attested /. published *. 100. in + let s = + sprintf + "`%s` (`%d/%d`)" + (sprintf "%.2f%%" ratio) + (Int.of_float attested) + (Int.of_float published) + in + Some s + + let fetch_slot_info ~slot_index = + let query s = + Format.sprintf + "increase(tezt_total_%s_commitments_per_slot{slot_index=\"%d\"}[%dh])" + s + slot_index + report_interval + in + let decoder = decoder_prometheus_float in + let* attested = + fetch + ~origin:"fetch_slot_info.attested" + ~decoder + ~query:(query "attested") + in + let* published = + fetch + ~origin:"fetch_slot_info.published" + ~decoder + ~query:(query "published") + in + Lwt.return (`slot_index slot_index, `attested attested, `published published) + + let view_slot_info slot_info = + let slots_info = + List.filter_map + (fun (`slot_index slot_index, attested, published) -> + view_ratio_attested_over_published (attested, published) + |> Option.map + (Format.sprintf ":black_small_square: `%02d` : %s" slot_index)) + slot_info + in + if List.is_empty slots_info then [] + else + "• Percentage of attested over published DAL commitments per slot:" + :: slots_info + + let fetch_dal_commitments_total_info () = + let query s = + Format.sprintf + {|increase(tezt_dal_commitments_total{kind="%s"}[%dh])|} + s + report_interval + in + let decoder = decoder_prometheus_float in + let* attested = + fetch + ~origin:"fetch_dal_commitments_total.attested" + ~decoder + ~query:(query "attested") + in + let* published = + fetch + ~origin:"fetch_dal_commitments_total.published" + ~decoder + ~query:(query "published") + in + let ratio = + view_ratio_attested_over_published + (`attested attested, `published published) + in + let ratio_view = + (Format.sprintf + "• Percentage of attested over published DAL commitments: %s") + (Option.value ~default:"unk" ratio) + in + let slot_size = 126_944 (* TODO: do not hard-code this *) in + let bandwidth = + Option.map + (fun x -> + Format.sprintf + "%.2f" + (x *. float_of_int slot_size + /. float_of_int (1024 * report_interval * 3600))) + attested + in + let bandwidth_view = + Format.sprintf + "• Bandwidth: %s KiB/s" + (Option.value ~default:"unk" bandwidth) + in + Lwt.return (ratio_view, bandwidth_view) + + let pp_stake fmt stake_ratio = + Format.fprintf fmt "%.2f%% stake" (stake_ratio *. 100.) + + type baker_attestation_numbers = { + (* The rate of attested/attestable slots for this baker. + An attestatble slot is a published slot for which the baker is in + the DAL committee at attestation level. + *) + slot_attestation_rate : float option; + (* The rate of attestation_with_dal when the baker is in the DAL committee. + This ratio is especially useful for small bakers which are rarely in + the DAL committee, hence have very few opportunities to attest slots + when publication is quite rare. + *) + dal_content_rate : float option; + (* This boolean is true if the baker sent at least one attestation while + out of the DAL committee. This is useful to detect bakers who attest + but have a broken DAL setup preventing them to send attestations while + in DAL committee. + *) + attests_while_out_of_dal_committee : bool; + } + + type baker_infos = { + address : Metrics.public_key_hash; + attest_infos : baker_attestation_numbers; + stake_fraction : float; + } + + let pp_baker_light fmt {address = PKH delegate_pkh; stake_fraction; _} = + Format.fprintf + fmt + "%a (%a)" + pp_delegate + delegate_pkh + pp_stake + stake_fraction + + let pp_baker_dal_status fmt baker = + Format.fprintf + fmt + "%a (%s)" + pp_baker_light + baker + (match baker.attest_infos.dal_content_rate with + | None -> "Never sent attestations while in DAL committee" + | Some 0. -> "OFF" + | Some 1. -> "ON" + | Some x -> Format.sprintf "ACTIVE %.0f%% of the time" (x *. 100.)) + + let pp_bakers_all_infos fmt baker = + Format.fprintf + fmt + "%s - %a" + (Option.fold + ~none:"Never was in committee when slots were produced" + ~some:(fun value -> + let percentage = value *. 100. in + Format.sprintf "%.2f%%" percentage) + baker.attest_infos.slot_attestation_rate) + pp_baker_dal_status + baker + + let fetch_baker_info ~tz1 ~origin = + let query = + Format.sprintf + "sum_over_time(tezt_dal_commitments_attested{attester=\"%s\"}[%dh])" + tz1 + report_interval + in + let* attested = fetch ~decoder:decoder_prometheus_float ~query ~origin in + let query = + Format.sprintf + "sum_over_time(tezt_dal_commitments_attestable{attester=\"%s\"}[%dh])" + tz1 + report_interval + in + let* attestable = fetch ~decoder:decoder_prometheus_float ~query ~origin in + let query = + Format.sprintf + "avg_over_time(tezt_dal_attestation_sent{attester=\"%s\"}[%dh])" + tz1 + report_interval + in + let* dal_content_rate = + fetch ~decoder:decoder_prometheus_float ~query ~origin + in + let query = + Format.sprintf + "sum_over_time(tezt_attestation_sent_when_out_of_dal_committee{attester=\"%s\"}[6h])" + tz1 + in + let* out_attestations = + fetch ~decoder:decoder_prometheus_float ~query ~origin + in + let attests_while_out_of_dal_committee = Option.is_some out_attestations in + let slot_attestation_rate = + match (attested, attestable) with + | None, _ | _, None | _, Some 0. -> None + | Some attested, Some attestable -> Some (attested /. attestable) + in + return + { + slot_attestation_rate; + dal_content_rate; + attests_while_out_of_dal_committee; + } + + let get_current_cycle endpoint = + let* {cycle; _} = + RPC_core.call endpoint (RPC.get_chain_block_helper_current_level ()) + in + Lwt.return cycle + + let get_bakers_with_staking_power endpoint cycle = + RPC_core.call endpoint (RPC.get_stake_distribution ~cycle ()) + + type classified_bakers = { + mute_bakers : baker_infos list; + muted_by_dal : baker_infos list; + dal_zero : baker_infos list; + dal_on : baker_infos list; + no_shards : baker_infos list; + dal_off : baker_infos list; + } + + let fetch_bakers_info endpoint = + let* cycle = get_current_cycle endpoint in + let* bakers = get_bakers_with_staking_power endpoint cycle in + let total_baking_power = + List.fold_left + (fun acc RPC.{baking_power; _} -> acc + baking_power) + 0 + bakers + in + let* bakers_info = + Lwt_list.filter_map_p + (fun RPC.{delegate; baking_power} -> + let* attest_infos = + fetch_baker_info + ~origin:(Format.sprintf "fetch_baker_info.%s" delegate) + ~tz1:delegate + in + let stake_fraction = + float_of_int baking_power /. float_of_int total_baking_power + in + Lwt.return_some {address = PKH delegate; attest_infos; stake_fraction}) + bakers + in + let rec classify_bakers already_classified = function + | [] -> already_classified + | ({attest_infos; _} as baker) :: tl -> ( + match attest_infos.dal_content_rate with + | None -> + if attest_infos.attests_while_out_of_dal_committee then + classify_bakers + { + already_classified with + muted_by_dal = baker :: already_classified.muted_by_dal; + } + tl + else + classify_bakers + { + already_classified with + mute_bakers = baker :: already_classified.mute_bakers; + } + tl + | Some 0. -> + classify_bakers + { + already_classified with + dal_off = baker :: already_classified.dal_off; + } + tl + | _ -> ( + match attest_infos.slot_attestation_rate with + | None -> + classify_bakers + { + already_classified with + no_shards = baker :: already_classified.no_shards; + } + tl + | Some 0. -> + classify_bakers + { + already_classified with + dal_zero = baker :: already_classified.dal_zero; + } + tl + | _ -> + classify_bakers + { + already_classified with + dal_on = baker :: already_classified.dal_on; + } + tl)) + in + let {mute_bakers; muted_by_dal; dal_zero; dal_on; no_shards; dal_off} = + classify_bakers + { + mute_bakers = []; + muted_by_dal = []; + dal_zero = []; + dal_on = []; + no_shards = []; + dal_off = []; + } + bakers_info + in + let ( >> ) cmp1 cmp2 x y = + match cmp1 x y with 0 -> cmp2 x y | cmp -> cmp + in + let stake_descending x y = + Float.compare y.stake_fraction x.stake_fraction + in + let attestation_rate_ascending x y = + Option.compare + Float.compare + x.attest_infos.slot_attestation_rate + y.attest_infos.slot_attestation_rate + in + let dal_mention_perf_ascending x y = + Option.compare + Float.compare + x.attest_infos.dal_content_rate + y.attest_infos.dal_content_rate + in + let mute_bakers = List.sort stake_descending mute_bakers in + let muted_by_dal = List.sort stake_descending muted_by_dal in + let dal_zero = + List.sort (stake_descending >> dal_mention_perf_ascending) dal_zero + in + let no_shards = + List.sort (stake_descending >> dal_mention_perf_ascending) no_shards + in + let dal_on = + List.sort + (attestation_rate_ascending >> dal_mention_perf_ascending + >> stake_descending) + dal_on + in + let dal_off = List.sort stake_descending dal_off in + (* `group_by n l` outputs the list of lists with the same elements as `l` + but with `n` elements per list (except the last one). + For instance + `group_by 4 [a_1, ..., a_10] = [[a_1, ..., a_4], [a_5, ..., a_8], [a_9, a_10]]` + *) + let group_by n = + let rec bis local_acc main_acc k = function + | [] -> List.rev (List.rev local_acc :: main_acc) + | l when k = 0 -> bis [] (List.rev local_acc :: main_acc) n l + | hd :: tl -> bis (hd :: local_acc) main_acc (k - 1) tl + in + bis [] [] n + in + let agglomerate_infos bakers = + let nb, stake = + List.fold_left + (fun (nb, stake_acc) {stake_fraction; _} -> + (nb + 1, stake_acc +. stake_fraction)) + (0, 0.) + bakers + in + Format.sprintf + "They are %d representing %.2f%% of the stake" + nb + (stake *. 100.) + in + let encapsulate_in_code_block strings = ("```" :: strings) @ ["```"] in + let display catch_phrase printer bakers = + if bakers = [] then [] + else + [catch_phrase; agglomerate_infos bakers] + :: List.map + encapsulate_in_code_block + (group_by 20 (List.map (Format.asprintf "%a" printer) bakers)) + in + let display_muted = + display + ":alert: *Those bakers never sent attestations.*" + pp_baker_light + mute_bakers + in + let display_muted_by_DAL = + display + ":alert: *Those bakers never sent attestation while in DAL committee, \ + however they sent attestations when they are out of it. This is quite \ + unexpected. They probably have an issue.*" + pp_baker_light + muted_by_dal + in + let display_zero = + display + ":triangular_flag_on_post: *Those bakers have a broken DAL node. They \ + send attestations with a DAL content, but do not attest any slots.*" + pp_baker_dal_status + dal_zero + in + let display_on = + display + ":white_check_mark: *Those bakers sent attestations with a DAL \ + content.*" + pp_bakers_all_infos + dal_on + in + let display_off = + display + ":x: *Those bakers never turned their DAL on.*" + pp_baker_light + dal_off + in + let display_no_shards = + display + ":microscope: *Those bakers seems to have a working DAL node, but \ + never were in committee when slots were produced, hence we cannot say \ + much about how well it works.*" + pp_baker_light + no_shards + in + Lwt.return + ((["Details of bakers performance:"] :: display_muted_by_DAL) + @ display_zero @ display_on @ display_no_shards @ display_off + @ display_muted) + + let fetch_slots_info () = + let* data = + (* fixme: should use protocol parameterized number of slots *) + Lwt_list.map_p + (fun slot_index -> fetch_slot_info ~slot_index) + (List.init 32 Fun.id) + in + Lwt.return (view_slot_info data) + + let action ~slack_bot_token ~slack_channel_id ~network endpoint () = + let* endpoint in + let title_info = + Format.sprintf + "*DAL report* for the *%s* network over the last %d hours." + (String.capitalize_ascii (Network.to_string network)) + report_interval + in + let* ratio_dal_commitments_total_info, bandwidth_info = + fetch_dal_commitments_total_info () + in + let* slots_info = fetch_slots_info () in + let network_overview_info = + bandwidth_info :: ratio_dal_commitments_total_info :: slots_info + in + let data = + let open Format_app in + section [title_info] () + @ section ["*Network overview*"] () + @ section + network_overview_info + ~accessory: + (Format_app.image + ~url:(network_to_image_url network) + ~alt:(Network.to_string network)) + () + in + let* thread_id = post_message ~slack_channel_id ~slack_bot_token data in + let* bakers_info = fetch_bakers_info endpoint in + Lwt_list.iter_s + (fun to_post -> + let data = + let open Format_app in + section to_post () + in + let* _ts = + post_message ~ts:thread_id ~slack_channel_id ~slack_bot_token data + in + Lwt.return_unit) + bakers_info + + (* Relies on UTC (Universal Time Coordinated). + Paris operates on Central European Time (CET), which is UTC+1 + during standard time (winter months). During daylight saving + time (summer months), Paris switches to Central European Summer + Time (CEST), which is UTC+2. + *) + let register_chronos_task cloud ~network endpoint = + match Cloud.notifier cloud with + | Notifier_null -> () + | Notifier_slack {slack_bot_token; slack_channel_id; _} -> + let task = + let action () = + action ~slack_bot_token ~slack_channel_id ~network endpoint () + in + let tm = Format.sprintf "0 0-23/%d * * *" report_interval in + Chronos.task ~name:"network-overview" ~tm ~action () + in + Cloud.register_chronos_task cloud task +end + +module Alert = struct + let report_lost_dal_rewards ~slack_channel_id ~slack_bot_token ~network ~level + ~cycle ~lost_dal_rewards = + let data = + let header = + Format.sprintf + "*[lost-dal-rewards]* On network `%s`, delegates have lost DAL \ + rewards at cycle `%d`, level `%d`. \ + " + (Network.to_string network) + cycle + level + (Network.to_string network) + level + in + let content = + List.map + (fun (`delegate delegate, `change change) -> + Format.asprintf + ":black_small_square: %a has missed ~%.1f tez DAL attestation \ + rewards" + pp_delegate + delegate + (float_of_int change /. 1_000_000.)) + lost_dal_rewards + in + Format_app.section (header :: content) () + in + let* _ts = post_message ~slack_channel_id ~slack_bot_token data in + Lwt.return_unit + + let check_for_lost_dal_rewards ~cloud ~network ~metadata = + match Cloud.notifier cloud with + | Notifier_null -> unit + | Notifier_slack {slack_channel_id; slack_bot_token; _} -> + let cycle = JSON.(metadata |-> "level_info" |-> "cycle" |> as_int) in + let level = JSON.(metadata |-> "level_info" |-> "level" |> as_int) in + let balance_updates = + JSON.(metadata |-> "balance_updates" |> as_list) + in + let lost_dal_rewards = + List.filter_map + (fun balance_update -> + let category = + JSON.(balance_update |-> "category" |> as_string_opt) + in + match category with + | None -> None + | Some category -> + if String.equal category "lost DAL attesting rewards" then + let delegate = + JSON.(balance_update |-> "delegate" |> as_string) + in + let change = JSON.(balance_update |-> "change" |> as_int) in + Some (`delegate delegate, `change change) + else None) + balance_updates + in + if List.is_empty lost_dal_rewards then unit + else + report_lost_dal_rewards + ~slack_channel_id + ~slack_bot_token + ~network + ~level + ~cycle + ~lost_dal_rewards + + let check_for_lost_dal_rewards ~cloud ~network ~metadata = + Lwt.catch + (fun () -> check_for_lost_dal_rewards ~cloud ~network ~metadata) + (fun exn -> + Log.warn + "Monitor_app.Alert.check_for_lost_dal_rewards: unexpected error: '%s'" + (Printexc.to_string exn) ; + unit) + + let report_dal_accusations ~slack_channel_id ~slack_bot_token ~network ~level + ~cycle dal_accusations = + let data = + let header = + Format.sprintf + "*[dal-accusations]* On network `%s`, delegates have been accused at \ + cycle `%d`, level `%d`." + (Network.to_string network) + cycle + level + in + let content = + List.map + (fun ( `attestation_level attestation_level, + `slot_index slot_index, + `delegate delegate, + `op_hash hash ) -> + Format.asprintf + ":black_small_square: %a for attesting slot index `%d` at level \ + `%d`. " + pp_delegate + delegate + slot_index + attestation_level + (Network.to_string network) + hash) + dal_accusations + in + Format_app.section (header :: content) () + in + let* _ts = post_message ~slack_channel_id ~slack_bot_token data in + Lwt.return_unit + + let check_for_dal_accusations ~cloud ~network ~cycle ~level ~operations + ~endpoint = + match Cloud.notifier cloud with + | Notifier_null -> unit + | Notifier_slack {slack_channel_id; slack_bot_token; _} -> + let open JSON in + let accusations = + operations |> as_list |> Fun.flip List.nth 2 |> as_list + in + let dal_entrapments = + List.filter_map + (fun accusation -> + let contents = + accusation |-> "contents" |> as_list |> Fun.flip List.nth 0 + in + match contents |-> "kind" |> as_string_opt with + | Some "dal_entrapment_evidence" -> + let attestation_level = + contents |-> "attestation" |-> "operations" |-> "level" + |> as_int + in + let slot_index = contents |-> "slot_index" |> as_int in + let shard = + contents |-> "shard_with_proof" |-> "shard" |> as_list + |> Fun.flip List.nth 0 |> as_int + in + let hash = accusation |-> "hash" |> as_string in + Some + ( `attestation_level attestation_level, + `shard shard, + `slot_index slot_index, + `op_hash hash ) + | None | Some _ -> None) + accusations + in + (* todo: optimize to avoid too many RPC calls (?) *) + let* dal_accusations = + Lwt_list.map_p + (fun ( `attestation_level attestation_level, + `shard shard, + `slot_index slot_index, + `op_hash hash ) -> + let* json = + RPC_core.call + endpoint + (RPC.get_chain_block_context_dal_shards + ~level:attestation_level + ()) + in + let assignment = + let open JSON in + List.find_opt + (fun assignment -> + let indexes = + assignment |-> "indexes" |> as_list |> List.map as_int + in + List.mem shard indexes) + (json |> as_list) + in + let delegate = + Option.fold + ~none:"should_not_happen" + ~some:(fun x -> x |-> "delegate" |> as_string) + assignment + in + return + ( `attestation_level attestation_level, + `slot_index slot_index, + `delegate delegate, + `op_hash hash )) + dal_entrapments + in + if List.is_empty dal_accusations then unit + else + report_dal_accusations + ~slack_channel_id + ~slack_bot_token + ~network + ~cycle + ~level + dal_accusations + + let check_for_dal_accusations ~cloud ~network ~cycle ~level ~operations + ~endpoint = + Lwt.catch + (fun () -> + check_for_dal_accusations + ~cloud + ~network + ~cycle + ~level + ~operations + ~endpoint) + (fun exn -> + Log.warn + "Monitor_app.Alert.check_for_dal_accusations: unexpected error: '%s'" + (Printexc.to_string exn) ; + unit) + + type attestation_transition = Stopped_attesting | Restarted_attesting + + let report_attestation_transition ~slack_channel_id ~slack_bot_token ~network + ~level ~pkh ~transition ~attestation_percentage = + let data = + let header = + Format.asprintf + (match transition with + | Restarted_attesting -> + "*[dal-attester-is-back]* On network `%s` at level `%d`, \ + delegate `%a` who was under the reward threshold is again above \ + threshold. New attestation rate is %.2f%%." + | Stopped_attesting -> + "*[dal-attester-dropped]* On network `%s` at level `%d`, \ + delegate `%a` DAL attestation rate dropped under the reward \ + threshold. New attestation rate is %.2f%%.") + (Network.to_string network) + level + pp_delegate + pkh + attestation_percentage + in + Format_app.section [header] () + in + let* _ts = post_message ~slack_channel_id ~slack_bot_token data in + Lwt.return_unit + + let check_for_recently_missed_a_lot = + (* TODO: This correspond to the number of bakers which activated DAL on mainnet. + We expect more bakers to run the DAL, this initial size might be then increased + to avoid rescaling of the hash table. + *) + let prev_was_enough = Hashtbl.create 50 in + let to_treat_delegates = ref [] in + fun ~cloud ~endpoint ~network ~level ~metadata -> + match Cloud.notifier cloud with + | Notifier_null -> unit + | Notifier_slack {slack_bot_token; slack_channel_id; _} -> ( + let cycle_position = + JSON.(metadata |-> "level_info" |-> "cycle_position" |> as_int) + in + (* We do not run this function during the first 20 levels of a cycle, + since the attestation rate are not relevant when so few levels happened. + Especially since 0 attestation out of 0 attestable slots is considered + as 100% attestation rate. *) + if cycle_position < 20 then unit + else + let treat_delegate pkh = + let* infos = + RPC_core.call endpoint + @@ RPC.get_chain_block_context_delegate pkh + in + let dal_participation = JSON.(infos |-> "dal_participation") in + let attest_enough = + JSON.( + dal_participation |-> "sufficient_dal_participation" + |> as_bool) + in + match Hashtbl.find_opt prev_was_enough pkh with + | None -> + let () = Hashtbl.add prev_was_enough pkh attest_enough in + unit + | Some prev_status -> + if prev_status = attest_enough then unit + else + let attestable = + JSON.( + dal_participation |-> "delegate_attestable_dal_slots" + |> as_float) + in + (* If no slots was attestatble, the value of [attest_enough] + is [true], but it is quite meaningless, since we do not want + to state that the DAL node is working well again simply + because there was nothing to attest. *) + if attestable = 0. then unit + else ( + Hashtbl.add prev_was_enough pkh attest_enough ; + let attested = + JSON.( + dal_participation |-> "delegate_attested_dal_slots" + |> as_float) + in + let attestation_percentage = + 100. *. attested /. attestable + in + let transition = + if attest_enough then Restarted_attesting + else Stopped_attesting + in + report_attestation_transition + ~slack_channel_id + ~slack_bot_token + ~network + ~level + ~pkh + ~transition + ~attestation_percentage) + in + let refill_delegates_to_treat () = + let query_string = [("active", "true")] in + let* delegates = + RPC_core.call endpoint + @@ RPC.get_chain_block_context_delegates ~query_string () + in + to_treat_delegates := delegates ; + unit + in + (* To avoid spawning a high number of RPCs simultaneously, we treat 2 delegates at each level. *) + let to_treat_now, treat_later = + Tezos_stdlib.TzList.split_n 2 !to_treat_delegates + in + let* () = Lwt_list.iter_p treat_delegate to_treat_now in + match treat_later with + | [] -> refill_delegates_to_treat () + | remaining_delegates -> + to_treat_delegates := remaining_delegates ; + unit) + + let check_for_recently_missed_a_lot ~cloud ~endpoint ~network ~level ~metadata + = + Lwt.catch + (fun () -> + check_for_recently_missed_a_lot + ~cloud + ~endpoint + ~network + ~level + ~metadata) + (fun exn -> + Log.warn + "Monitor_app.Alert.check_for_recently_missed_a_lot: unexpected \ + error: '%s'" + (Printexc.to_string exn) ; + unit) +end diff --git a/tezt/tests/cloud/monitoring_app.mli b/tezt/tests/cloud/monitoring_app.mli new file mode 100644 index 000000000000..35b14434f931 --- /dev/null +++ b/tezt/tests/cloud/monitoring_app.mli @@ -0,0 +1,46 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Nomadic Labs, *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +(** Monitoring helpers for scheduling reports and raising alerts. *) + +(** Scheduled Reports via Chronos *) +module Tasks : sig + (** [register_chronos_task ~cloud ~network endpoint] registers a recurring + Chronos task that will gather the latest metrics, format them and send + them to the configured Slack channel. *) + val register_chronos_task : + Cloud.t -> network:Network.t -> Endpoint.t Lwt.t -> unit +end + +module Alert : sig + (** [check_for_lost_dal_rewards ~cloud ~network ~metadata] + examines Tezos block [metadata] for any lost DAL attesting rewards + balance updates. If found, posts a Slack alert. *) + val check_for_lost_dal_rewards : + cloud:Cloud.t -> network:Network.t -> metadata:JSON.t -> unit Lwt.t + + (** [check_for_dal_accusations ~cloud ~network ~cycle ~level ~operations ~endpoint] + scans the block’s consensus operations for any DAL entrapment evidence. If any + are found, posts a Slack alert describing each accusation. *) + val check_for_dal_accusations : + cloud:Cloud.t -> + network:[< Network.t] -> + cycle:int -> + level:int -> + operations:JSON.t -> + endpoint:Endpoint.t -> + unit Lwt.t + + val check_for_recently_missed_a_lot : + cloud:Cloud.t -> + endpoint:Endpoint.t -> + network:Network.t -> + level:int -> + metadata:JSON.t -> + unit Lwt.t +end -- GitLab