From b472203e165cbddefdf79806371b1cc6b965297a Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 28 Oct 2025 11:52:49 +0000 Subject: [PATCH 1/2] Tezt: Add mockup for streaming RPC --- tezt/lib_tezos/dal_node.ml | 51 ++++++++++++++++++++++++++++++++++--- tezt/lib_tezos/dal_node.mli | 2 +- 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/tezt/lib_tezos/dal_node.ml b/tezt/lib_tezos/dal_node.ml index 82cb6cdc6013..85215f5df166 100644 --- a/tezt/lib_tezos/dal_node.ml +++ b/tezt/lib_tezos/dal_node.ml @@ -636,7 +636,7 @@ module Proxy = struct end module Mockup = struct - type answer = [`Response of string] + type answer = [`Response of string | `Stream of string Lwt_stream.t] type route = { path_pattern : Re.Pcre.regexp; @@ -675,7 +675,13 @@ module Mockup = struct | None -> Cohttp_lwt_unix.Server.respond_not_found () | Some (`Response body) -> Log.debug "[%s] responding with mock: '%s'" t.name body ; - Cohttp_lwt_unix.Server.respond_string ~status:`OK ~body ()) + Cohttp_lwt_unix.Server.respond_string ~status:`OK ~body () + | Some (`Stream stream) -> + let headers = + Cohttp.Header.init_with "content-type" "application/json" + in + let body = Cohttp_lwt.Body.of_stream stream in + Cohttp_lwt_unix.Server.respond ~headers ~status:`OK ~body ()) | None -> Log.warn "[%s] no mock route for: '%s', failing." t.name uri_str ; Cohttp_lwt_unix.Server.respond_error @@ -697,8 +703,47 @@ end module Mockup_for_baker = struct type t = Mockup.t + let json_backfill_event ~slot_ids ~no_shards_levels = + let open Ezjsonm in + let slot_id_to_json (level, index) = + dict [("slot_level", int level); ("slot_index", int index)] + in + value_to_string + (dict + [ + ("kind", string "backfill"); + ( "backfill_payload", + dict + [ + ("slot_ids", list slot_id_to_json slot_ids); + ("no_shards_attestation_levels", list int no_shards_levels); + ] ); + ]) + + (* Emit a single [Backfill] line that covers a range of published levels on + slot_index=0 so the baker’s cache contains the bit we need for the test's + attested_level. *) + let monitor_route ~published_levels = + let path_pattern = + "^/profiles/(tz[1234][A-Za-z0-9]+)/monitor/attestable_slots/?$" + in + Mockup.route ~path_pattern ~callback:(fun ~path -> + let re = Re.Pcre.regexp path_pattern in + let _attester = + match Re.exec_opt re path with + | Some groups -> Re.Group.get groups 1 + | None -> Test.fail "failed to extract pkh from %s" path + in + let slot_ids = List.init published_levels (fun i -> (i + 1, 0)) in + let body_line = + json_backfill_event ~slot_ids ~no_shards_levels:[] ^ "\n" + in + let stream = Lwt_stream.of_list [body_line] in + Lwt.return_some (`Stream stream)) + let routes ~attestation_lag ~attesters ~attestable_slots = [ + monitor_route ~published_levels:20; (let path_pattern = Format.sprintf "/profiles/(tz[1234][a-zA-Z0-9]+)/attested_levels/([1-9][0-9]*)/attestable_slots" @@ -709,7 +754,7 @@ module Mockup_for_baker = struct let attester = match Re.exec_opt re path with | Some groups -> Re.Group.get groups 1 - | None -> Test.fail "failed to extract attested_level from %s" path + | None -> Test.fail "failed to extract pkh from %s" path in let attested_level = match Re.exec_opt re path with diff --git a/tezt/lib_tezos/dal_node.mli b/tezt/lib_tezos/dal_node.mli index 287483adc232..cc8b466ec1cc 100644 --- a/tezt/lib_tezos/dal_node.mli +++ b/tezt/lib_tezos/dal_node.mli @@ -294,7 +294,7 @@ module Mockup : sig type t (** Represents a possible response from a mocked-up route. *) - type answer = [`Response of string] + type answer = [`Response of string | `Stream of string Lwt_stream.t] (** A route definition. *) type route -- GitLab From f8baa1846a95d6d81d7cdda895db39fd87606dae Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 29 Oct 2025 15:50:13 +0000 Subject: [PATCH 2/2] Tezt: Add proxy for streaming RPC --- tezt/lib_tezos/dal_node.ml | 206 ++++++++++++++++++++++++++++++++---- tezt/lib_tezos/dal_node.mli | 11 +- tezt/tests/dal.ml | 65 ++++-------- 3 files changed, 215 insertions(+), 67 deletions(-) diff --git a/tezt/lib_tezos/dal_node.ml b/tezt/lib_tezos/dal_node.ml index 85215f5df166..d86e9977e6f0 100644 --- a/tezt/lib_tezos/dal_node.ml +++ b/tezt/lib_tezos/dal_node.ml @@ -542,13 +542,14 @@ let debug_print_store_schemas ?path ?hooks () = Process.check process module Proxy = struct - type answer = [`Response of string] + type answer = [`Response of string | `Stream of Cohttp_lwt.Body.t] type route = { path : Re.Str.regexp; callback : path:string -> fetch_answer:(unit -> Ezjsonm.t Lwt.t) -> + fetch_stream:(unit -> (Cohttp.Response.t * Cohttp_lwt.Body.t) Lwt.t) -> answer option Lwt.t; } @@ -568,6 +569,167 @@ module Proxy = struct let find_mocked_action t ~path = List.find_opt (fun act -> Re.Str.string_match act.path path 0) t.routes + (** [rewrite_attestable_stream_line ~number_of_slots ~published_level line] + rewrites a single JSON line from [/profiles//monitor/attestable_slots]. + The goal is to make the faulty delegate appear to have all slots attestable for + [~published_level]. To do this, we need to add the missing slot indices for each level + in the [Backfill] element that is always the first one sent in the monitoring stream. + Note: this function heavily depends on the types of elements that are in the returned RPC + stream, information which can be found at + {!Tezos_dal_node_services.Types.Attestable_slots_watcher_table.Attestable_event.t}. *) + let rewrite_attestable_stream_line ~number_of_slots ~published_level line = + let open Ezjsonm in + let make_slot_id slot_level slot_index = + dict [("slot_level", int slot_level); ("slot_index", int slot_index)] + in + let parse_slot_id v = + let slot_level = find v ["slot_level"] |> get_int in + let slot_index = find v ["slot_index"] |> get_int in + (slot_level, slot_index) + in + + let rewrite_backfill v = + let payload = find v ["backfill_payload"] in + let slot_ids = + match find_opt payload ["slot_ids"] with + | None -> [] + | Some arr -> get_list parse_slot_id arr + in + (* Find which indices at published_level are already present *) + let existing_indices = + List.filter_map + (fun (slot_level, slot_index) -> + if slot_level = published_level then Some slot_index else None) + slot_ids + in + (* Generate missing indices *) + let all_indices = List.init number_of_slots Fun.id in + let missing_indices = + List.filter (fun i -> not (List.mem i existing_indices)) all_indices + in + (* Append missing slot_ids *) + let new_slot_ids = + slot_ids @ List.map (fun i -> (published_level, i)) missing_indices + in + (* Build updated JSON *) + let new_slot_ids_json = + list + (fun (slot_level, slot_index) -> make_slot_id slot_level slot_index) + new_slot_ids + in + update v ["backfill_payload"; "slot_ids"] (Some new_slot_ids_json) + |> value_to_string + in + let result = + try + let v = from_string line in + match find_opt v ["kind"] |> Option.map get_string with + | Some "backfill" -> rewrite_backfill v + | _ -> line + with _ -> line + in + result ^ "\n" + + let routes ~attestation_lag ~number_of_slots ~faulty_delegate + ~target_attested_level = + let open Ezjsonm in + (* Converts a Cohttp body into an JSON line stream *) + let json_lines_of_body body = + let chunks = Cohttp_lwt.Body.to_stream body in + let buf = Buffer.create 2048 in + + let chop s = + let n = String.length s in + if n > 0 && s.[n - 1] = '\r' then String.sub s 0 (n - 1) else s + in + + let rec produce_line () = + let contents = Buffer.contents buf in + match String.index_opt contents '\n' with + | Some idx -> + (* We have a complete line *) + let line = String.sub contents 0 idx in + let rest = + String.sub contents (idx + 1) (String.length contents - idx - 1) + in + Buffer.clear buf ; + Buffer.add_string buf rest ; + Lwt.return_some (chop line) + | None -> ( + (* Need more data from the stream *) + let* next_chunk = Lwt_stream.get chunks in + match next_chunk with + | None -> + (* Stream ended - emit final partial line if any *) + if Buffer.length buf = 0 then Lwt.return_none + else + let line = Buffer.contents buf in + Buffer.clear buf ; + Lwt.return_some (chop line) + | Some chunk -> + Buffer.add_string buf chunk ; + produce_line ()) + in + + Lwt_stream.from produce_line + in + [ + (let path = + Re.Str.regexp + @@ Format.sprintf + "/profiles/%s/monitor/attestable_slots" + faulty_delegate + in + route ~path ~callback:(fun ~path:_ ~fetch_answer:_ ~fetch_stream -> + let* _resp, body = fetch_stream () in + let published_level = target_attested_level - attestation_lag in + let lines = json_lines_of_body body in + let rewritten_lines = + Lwt_stream.map + (rewrite_attestable_stream_line + ~number_of_slots + ~published_level) + lines + in + let rewritten_body = Cohttp_lwt.Body.of_stream rewritten_lines in + return (Some (`Stream rewritten_body)))); + (let path = + Re.Str.regexp + @@ Format.sprintf + "/profiles/%s/attested_levels/%d/attestable_slots" + faulty_delegate + target_attested_level + in + route ~path ~callback:(fun ~path:_ ~fetch_answer ~fetch_stream:_ -> + let* dal_node_answer = fetch_answer () in + let v = value dal_node_answer in + let kind = find v ["kind"] |> get_string in + let new_v = + if String.equal kind "attestable_slots_set" then + let attestable_slots_set = + (* Declare each slot attestable. *) + find v ["attestable_slots_set"] + |> get_list get_bool + |> List.map (fun _ -> true) + |> list bool + in + update v ["attestable_slots_set"] (Some attestable_slots_set) + else v + in + return (Some (`Response (value_to_string new_v))))); + ] + + let make ~name ~attestation_lag ~number_of_slots ~faulty_delegate + ~target_attested_level = + let routes = + routes + ~attestation_lag + ~number_of_slots + ~faulty_delegate + ~target_attested_level + in + make ~name ~routes + let run t ~honest_dal_node ~faulty_dal_node = let dal_uri uri = Uri.make @@ -586,24 +748,29 @@ module Proxy = struct match find_mocked_action t ~path with | Some action -> ( Log.debug "[%s] mocking data for request: '%s'" t.name uri_str ; - let* res = - action.callback ~path ~fetch_answer:(fun () -> - let headers = Cohttp.Request.headers req in - let* _resp, body = - Cohttp_lwt_unix.Client.call - ~headers - ~body - method_ - (dal_uri uri) - in - let* body_str = Cohttp_lwt.Body.to_string body in - return (Ezjsonm.from_string body_str)) + let headers = Cohttp.Request.headers req in + let fetch_answer () = + let* _resp, body = + Cohttp_lwt_unix.Client.call ~headers ~body method_ (dal_uri uri) + in + let* body_str = Cohttp_lwt.Body.to_string body in + return (Ezjsonm.from_string body_str) + in + let fetch_stream () = + Cohttp_lwt_unix.Client.call ~headers ~body method_ (dal_uri uri) in + let* res = action.callback ~path ~fetch_answer ~fetch_stream in match res with | None -> Cohttp_lwt_unix.Server.respond_not_found () | Some (`Response body) -> - Log.debug "[%s] mocking with custom answer '%s'" t.name body ; - Cohttp_lwt_unix.Server.respond_string ~status:`OK ~body ()) + Log.debug + "[%s] mocking with custom response answer '%s'" + t.name + body ; + Cohttp_lwt_unix.Server.respond_string ~status:`OK ~body () + | Some (`Stream stream_body) -> + Log.debug "[%s] mocking with custom stream answer" t.name ; + Cohttp_lwt_unix.Server.respond ~status:`OK ~body:stream_body ()) | None -> Log.debug "[%s] forwarding the following request to the honest dal node: '%s'" @@ -613,15 +780,10 @@ module Proxy = struct let* resp, body = Cohttp_lwt_unix.Client.call ~headers ~body method_ (dal_uri uri) in - let* body_str = Cohttp_lwt.Body.to_string body in - Log.debug - "[%s] mocking with honest dal node answer: '%s'" - t.name - body_str ; - Cohttp_lwt_unix.Server.respond_string + Cohttp_lwt_unix.Server.respond ~status:(Cohttp.Response.status resp) ~headers:(Cohttp.Response.headers resp) - ~body:body_str + ~body () in let start () = diff --git a/tezt/lib_tezos/dal_node.mli b/tezt/lib_tezos/dal_node.mli index cc8b466ec1cc..301d5f9d7cf2 100644 --- a/tezt/lib_tezos/dal_node.mli +++ b/tezt/lib_tezos/dal_node.mli @@ -255,7 +255,7 @@ module Proxy : sig type proxy (** Represents a possible response from a proxy route. *) - type answer = [`Response of string] + type answer = [`Response of string | `Stream of Cohttp_lwt.Body.t] (** A route definition. *) type route @@ -273,11 +273,18 @@ module Proxy : sig callback: (path:string -> fetch_answer:(unit -> Ezjsonm.t Lwt.t) -> + fetch_stream:(unit -> (Cohttp.Response.t * Cohttp_lwt.Body.t) Lwt.t) -> answer option Lwt.t) -> route (** Creates a new proxy instance. *) - val make : name:string -> routes:route list -> proxy + val make : + name:string -> + attestation_lag:int -> + number_of_slots:int -> + faulty_delegate:string -> + target_attested_level:int -> + proxy (** Starts running the proxy server. *) val run : proxy -> honest_dal_node:t -> faulty_dal_node:t -> unit diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index cdbd6409aa76..70f0a38b5f7b 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -10636,25 +10636,28 @@ let test_denunciation_next_cycle protocol dal_parameters cryptobox node client [faulty_delegate] misbehaves. Specifically, it: - Creates a DAL node [proxy] whose role is to mimick the honest - [dal_node] by forwarding to the honest DAL node each RPC call - except for [/profiles//attested_levels//attestable_slots], - where [] is the [faulty_delegate] and [] is the level [2 * - blocks_per_cycle + 1]. For this RPC path, it will declare for the - [faulty_delegate] each slot attestable by replacing the honest - DAL node answer field ['attestable_slots_set'] to an array of - ['true'] values. + [dal_node] by forwarding to the honest DAL node each RPC call, + except for the attestable-slots endpoints for the [faulty_delegate]: + - the streamed RPC [/profiles//monitor/attestable_slots], where + the proxy rewrites the JSON stream so that, at + [ = 2 * blocks_per_cycle + 1] (the target attested level), + the delegate appears to have all slots attestable (by adding + the corresponding slot_ids in the backfill and/or live events); + - the legacy per-level RPC [/profiles//attested_levels//attestable_slots], + which is also overridden for compatibility by flipping the returned + ['attestable_slots_set'] array to all ['true']. The test proceeds as follows: 1. Bakes [blocks_per_cycle] blocks to avoid the period in which - the DAL node is not able to inject an accusation because of the - accusation delay introduced by the migration. + the DAL node is not able to inject an accusation because of the + accusation delay introduced by the migration. 2. Bakes [blocks_per_cycle] blocks while publishing on slot index - [0] and checks that [faulty_delegate] is not denounced at the end. + [0] and checks that [faulty_delegate] is not denounced at the end. 3. Bakes again [blocks_per_cycle] blocks to finally reach [3 * - blocks_per_cycle] blocks. + blocks_per_cycle] blocks. - Finally, the test @@ -10662,8 +10665,8 @@ let test_denunciation_next_cycle protocol dal_parameters cryptobox node client the third cycle and verifies that: - Multiple DAL attesting rewards were minted. - The delegates that lost the DAL rewards are the [faulty_delegate] - and the ones that lost the consensus attesting rewards because they - haven't revealed nonces. + and the ones that lost the consensus attesting rewards because they + haven't revealed nonces. - b) retrieves the [faulty_delegate] DAL participation at the end of the third cycle, minus one block (i.e "head~1") @@ -10683,36 +10686,12 @@ let test_e2e_trap_faulty_dal_node protocol dal_parameters _cryptobox node client let target_attested_level = (2 * blocks_per_cycle) + 1 in let faulty_delegate = Constant.bootstrap1.Account.public_key_hash in let proxy = - let routes = - [ - (let path = - Re.Str.regexp - @@ Format.sprintf - "/profiles/%s/attested_levels/%d/attestable_slots" - faulty_delegate - target_attested_level - in - Dal_node.Proxy.route ~path ~callback:(fun ~path:_ ~fetch_answer -> - let open Ezjsonm in - let* dal_node_answer = fetch_answer () in - let v = value dal_node_answer in - let kind = find v ["kind"] |> get_string in - let new_v = - if String.equal kind "attestable_slots_set" then - let attestable_slots_set = - (* Declare each slot attestable. *) - find v ["attestable_slots_set"] - |> get_list get_bool - |> List.map (fun _ -> true) - |> list bool - in - update v ["attestable_slots_set"] (Some attestable_slots_set) - else v - in - return (Some (`Response (value_to_string new_v))))); - ] - in - Dal_node.Proxy.make ~name:"proxy-dal-node" ~routes + Dal_node.Proxy.make + ~name:"proxy-dal-node" + ~attestation_lag:dal_parameters.Dal.Parameters.attestation_lag + ~number_of_slots:dal_parameters.Dal.Parameters.number_of_slots + ~faulty_delegate + ~target_attested_level in let faulty_dal_node = Dal_node.create -- GitLab