From cc8f3904b23ee42a999a28605e400860da9ba74b Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Thu, 27 Nov 2025 14:48:58 +0000 Subject: [PATCH 1/5] Dal_node: Add trap_slot_ids in the backfill payload --- src/lib_dal_node/attestable_slots.ml | 27 +++++++++++++------ src/lib_dal_node_services/types.ml | 12 +++++---- src/lib_dal_node_services/types.mli | 2 ++ .../dal_attestable_slots_worker.ml | 14 +++++++++- tezt/lib_tezos/dal_node.ml | 9 +++---- 5 files changed, 45 insertions(+), 19 deletions(-) diff --git a/src/lib_dal_node/attestable_slots.ml b/src/lib_dal_node/attestable_slots.ml index 8a0744b07d01..9c20cfa04f9a 100644 --- a/src/lib_dal_node/attestable_slots.ml +++ b/src/lib_dal_node/attestable_slots.ml @@ -186,6 +186,7 @@ let may_notify_not_in_committee ctxt committee ~attestation_level = The payload can be used to pre-populate a structure that requires information from the past of the creation of the stream corresponding to [~pkh] with: - all attestable [slot_id]'s observed in a recent window; + - all [slot_id]'s with traps; - attestation levels where [pkh] has no assigned shards. The "backfill" window is calculated in the following way: @@ -195,7 +196,7 @@ let may_notify_not_in_committee ctxt committee ~attestation_level = - `stop = L`, as this is the newest level where we did not have time to obtain the information about the published slots. - We include [L+1] in backfill to cover possible races between updating the + We include [L + 1] in backfill to cover possible races between updating the last-finalized level and stream subscription. This keeps the client's cache consistent even if the first slot was published before the stream was fully established. @@ -269,9 +270,9 @@ let get_backfill_payload ctxt ~pkh = fetch_assigned_shard_indices ctxt ~pkh ~level:attestation_level in let number_of_assigned_shards = List.length shard_indices in - let* new_slot_ids = - List.filter_map_ep - (fun slot_index -> + let* new_slot_ids, new_trap_slot_ids = + List.fold_left_es + (fun (slot_ids_acc, trap_slot_ids_acc) slot_index -> let slot_id = Types.Slot_id.{slot_level = published_level; slot_index} in @@ -285,12 +286,22 @@ let get_backfill_payload ctxt ~pkh = ~number_of_assigned_shards in match is_attestable_slot_or_trap with - | Some `Attestable_slot -> return_some slot_id - | Some `Trap | None -> return_none) + | Some `Attestable_slot -> + return (slot_id :: slot_ids_acc, trap_slot_ids_acc) + | Some `Trap -> + return (slot_ids_acc, slot_id :: trap_slot_ids_acc) + | None -> return (slot_ids_acc, trap_slot_ids_acc)) + ([], []) candidate_slot_indices in - return E.{acc with slot_ids = List.append new_slot_ids acc.slot_ids}) - {slot_ids = []; no_shards_attestation_levels = []} + return + E. + { + slot_ids = List.append new_slot_ids acc.slot_ids; + trap_slot_ids = List.append new_trap_slot_ids acc.trap_slot_ids; + no_shards_attestation_levels = acc.no_shards_attestation_levels; + }) + {slot_ids = []; trap_slot_ids = []; no_shards_attestation_levels = []} published_levels (** [create_new_stream_with_backfill stream backfill_payload_opt] builds a new diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index 496098f069b5..32b452bdb887 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -791,6 +791,7 @@ module SlotIdSet = module Attestable_event = struct type backfill_payload = { slot_ids : slot_id list; + trap_slot_ids : slot_id list; no_shards_attestation_levels : level list; } @@ -803,12 +804,13 @@ module Attestable_event = struct let backfill_payload_encoding = let open Data_encoding in conv - (fun {slot_ids; no_shards_attestation_levels} -> - (slot_ids, no_shards_attestation_levels)) - (fun (slot_ids, no_shards_attestation_levels) -> - {slot_ids; no_shards_attestation_levels}) - (obj2 + (fun {slot_ids; trap_slot_ids; no_shards_attestation_levels} -> + (slot_ids, trap_slot_ids, no_shards_attestation_levels)) + (fun (slot_ids, trap_slot_ids, no_shards_attestation_levels) -> + {slot_ids; trap_slot_ids; no_shards_attestation_levels}) + (obj3 (req "slot_ids" (list slot_id_encoding)) + (req "trap_slot_ids" (list slot_id_encoding)) (req "no_shards_attestation_levels" (list int32))) let encoding = diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index 375fd8a50997..21e13ca68c4c 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -415,6 +415,8 @@ module Attestable_event : sig type backfill_payload = { slot_ids : slot_id list; (** All slots that should be marked attestable for this delegate *) + trap_slot_ids : slot_id list; + (* All slots that should be marked as traps for this delegate *) no_shards_attestation_levels : level list; (** All attestation levels where this delegate has no shards *) } diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index 1fbd736af672..9a9daf303bb7 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -157,7 +157,9 @@ let update_cache_no_shards_assigned state ~delegate_id ~attestation_level = merges a DAL [Backfill] event for [~delegate_id] into the in-memory cache. *) let update_cache_backfill_payload state ~delegate_id ~backfill_payload = let module E = Types.Attestable_event in - let E.{slot_ids; no_shards_attestation_levels} = backfill_payload in + let E.{slot_ids; trap_slot_ids; no_shards_attestation_levels} = + backfill_payload + in List.iter (fun slot_id -> (update_cache_with_attestable_slot @@ -167,6 +169,16 @@ let update_cache_backfill_payload state ~delegate_id ~backfill_payload = [@profiler.record_f {verbosity = Debug} "update_cache_with_attestable_slot"])) slot_ids ; + List.iter + (fun slot_id -> + (update_cache_with_attestable_slot + state + ~is_trap:true + ~delegate_id + ~slot_id + [@profiler.record_f + {verbosity = Debug} "update_cache_with_attestable_slot"])) + trap_slot_ids ; List.iter (fun attestation_level -> update_cache_no_shards_assigned state ~delegate_id ~attestation_level) diff --git a/tezt/lib_tezos/dal_node.ml b/tezt/lib_tezos/dal_node.ml index 0b8adc800fa1..2ebef5afd3c9 100644 --- a/tezt/lib_tezos/dal_node.ml +++ b/tezt/lib_tezos/dal_node.ml @@ -864,7 +864,7 @@ end module Mockup_for_baker = struct type t = Mockup.t - let json_backfill_event ~slot_ids ~no_shards_levels = + let json_backfill_event ~slot_ids = let open Ezjsonm in let slot_id_to_json (level, index) = dict [("slot_level", int level); ("slot_index", int index)] @@ -877,7 +877,8 @@ module Mockup_for_baker = struct dict [ ("slot_ids", list slot_id_to_json slot_ids); - ("no_shards_attestation_levels", list int no_shards_levels); + ("trap_slot_ids", list slot_id_to_json []); + ("no_shards_attestation_levels", list int []); ] ); ]) @@ -896,9 +897,7 @@ module Mockup_for_baker = struct | None -> Test.fail "failed to extract pkh from %s" path in let slot_ids = List.init published_levels (fun i -> (i + 1, 0)) in - let body_line = - json_backfill_event ~slot_ids ~no_shards_levels:[] ^ "\n" - in + let body_line = json_backfill_event ~slot_ids ^ "\n" in let stream = Lwt_stream.of_list [body_line] in Lwt.return_some (`Stream stream)) -- GitLab From 905f17abdf11403ac576af086f60c6f95c2fff80 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 24 Nov 2025 12:29:09 +0000 Subject: [PATCH 2/5] Tezt: Dal: Extract function to create low stake --- tezt/tests/dal.ml | 64 ++++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index 8e97db141b19..803a32aa7933 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -11004,6 +11004,38 @@ let create_account_and_reveal ?source ~amount ~alias client = let* () = bake_for client in return fresh_account +let create_low_stake node dal_parameters proto_params blocks_per_cycle = + let minimal_stake = JSON.(proto_params |-> "minimal_stake" |> as_int) in + let number_of_shards = + dal_parameters.Dal.Parameters.cryptobox.number_of_shards + in + let* bootstrap_info = + Node.RPC.call node + @@ RPC.get_chain_block_context_delegate Constant.bootstrap1.public_key_hash + in + let bootstrap_baking_power = + JSON.(bootstrap_info |-> "baking_power" |> as_string) |> int_of_string + in + let total_baking_power = + bootstrap_baking_power * Array.length Account.Bootstrap.keys + in + (* The amount is such that this baker has only one assigned shard per cycles + (on average). Note that if the baker has no assigned shard, the test + should still pass, because we just check that it gets its DAL rewards. + + Let [t] be total_baking_power, [s] be [small_baker_stake], and [1/n] + desired shards fraction for the small baker. We want [s / (t + s) = 1 / + n], ie [t + s = n * s], ie [t = (n-1) * s] ie, [s = t / (n-1)] *) + let desired_stake = + total_baking_power / ((blocks_per_cycle * number_of_shards) - 1) + in + let small_baker_stake = max desired_stake minimal_stake in + Log.info + "total_baking_power = %d, small_baker_stake = %d" + total_baking_power + small_baker_stake ; + return small_baker_stake + (** [test_dal_rewards_distribution _protocol dal_parameters cryptobox node client _dal_node] verifies the correct distribution of DAL rewards among delegates based on their participation in DAL attestations activity. @@ -11071,42 +11103,12 @@ let test_dal_rewards_distribution protocol dal_parameters cryptobox node client let consensus_rights_delay = JSON.(proto_params |-> "consensus_rights_delay" |> as_int) in - let minimal_stake = JSON.(proto_params |-> "minimal_stake" |> as_int) in let nb_slots = dal_parameters.Dal.Parameters.number_of_slots in let all_slots = List.init nb_slots Fun.id in - let number_of_shards = - dal_parameters.Dal.Parameters.cryptobox.number_of_shards - in (* Compute the stake of the small baker. *) let* small_baker_stake = - let* bootstrap_info = - Node.RPC.call node - @@ RPC.get_chain_block_context_delegate - Constant.bootstrap1.public_key_hash - in - let bootstrap_baking_power = - JSON.(bootstrap_info |-> "baking_power" |> as_string) |> int_of_string - in - let total_baking_power = - bootstrap_baking_power * Array.length Account.Bootstrap.keys - in - (* The amount is such that this baker has only one assigned shard per cycles - (on average). Note that if the baker has no assigned shard, the test - should still pass, because we just check that it gets its DAL rewards. - - Let [t] be total_baking_power, [s] be [small_baker_stake], and [1/n] - desired shards fraction for the small baker. We want [s / (t + s) = 1 / - n], ie [t + s = n * s], ie [t = (n-1) * s] ie, [s = t / (n-1)] *) - let desired_stake = - total_baking_power / ((blocks_per_cycle * number_of_shards) - 1) - in - let small_baker_stake = max desired_stake minimal_stake in - Log.info - "total_baking_power = %d, small_baker_stake = %d" - total_baking_power - small_baker_stake ; - return small_baker_stake + create_low_stake node dal_parameters proto_params blocks_per_cycle in (* Each of the 5 bootstrap accounts contributes equally to the new baker's -- GitLab From 71810b9248e500c35742c9a5094d4569af595e74 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Thu, 27 Nov 2025 14:50:41 +0000 Subject: [PATCH 3/5] Tezt: Dal_common: Add is_trap function --- tezt/lib_tezos/dal_common.ml | 19 +++++++++++++++++++ tezt/lib_tezos/dal_common.mli | 11 +++++++++++ 2 files changed, 30 insertions(+) diff --git a/tezt/lib_tezos/dal_common.ml b/tezt/lib_tezos/dal_common.ml index 332aa7c7cc6a..ae2b4f4db59c 100644 --- a/tezt/lib_tezos/dal_common.ml +++ b/tezt/lib_tezos/dal_common.ml @@ -395,6 +395,25 @@ module Dal_RPC = struct ] (fun json -> JSON.(json |> as_list |> List.map as_int)) + type trap = {delegate : string; slot_index : int} + + let trap_of_json json = + JSON. + { + delegate = json |-> "delegate" |> as_string; + slot_index = json |-> "slot_index" |> as_int; + } + + let get_published_level_known_traps ~published_level ~pkh ~slot_index = + let query_string = + [("delegate", pkh); ("slot_index", string_of_int slot_index)] + in + make + GET + ["published_levels"; string_of_int published_level; "known_traps"] + ~query_string + (fun json -> JSON.(json |> as_list |> List.map trap_of_json)) + type slot_set = bool list type attestable_slots = Not_in_committee | Attestable_slots of slot_set diff --git a/tezt/lib_tezos/dal_common.mli b/tezt/lib_tezos/dal_common.mli index 57d0f5b77c09..dff4df321de4 100644 --- a/tezt/lib_tezos/dal_common.mli +++ b/tezt/lib_tezos/dal_common.mli @@ -270,6 +270,17 @@ module RPC : sig val get_assigned_shard_indices : level:int -> pkh:string -> int list RPC_core.t + type trap = {delegate : string; slot_index : int} + + (** Call RPC + "GET /published_levels//known_traps?delegate=&slot_index=" + and returns the list of traps recorded by the DAL node at [~published_level]. *) + val get_published_level_known_traps : + published_level:int -> + pkh:commitment_proof -> + slot_index:int -> + trap list RPC_core.t + type slot_set = bool list type attestable_slots = Not_in_committee | Attestable_slots of slot_set -- GitLab From 990f546441848d34b78133bd6f1cbb3335ccfd8e Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Thu, 27 Nov 2025 10:38:37 +0000 Subject: [PATCH 4/5] Tezt: Dal: Add low stake traps test Co-authored-by: Eugen Zalinescu --- tezt/tests/dal.ml | 244 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 244 insertions(+) diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index 803a32aa7933..27c5f5e8df89 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -12108,6 +12108,233 @@ let test_statuses_backfill_at_restart _protocol dal_parameters _cryptobox in unit +let test_dal_low_stake_attester_attestable_slots _protocol dal_parameters + _cryptobox node client dal_node = + let {log_step} = init_logger () in + let* proto_params = + Node.RPC.call node @@ RPC.get_chain_block_context_constants () + in + let consensus_rights_delay = + JSON.(proto_params |-> "consensus_rights_delay" |> as_int) + in + let blocks_per_cycle = JSON.(proto_params |-> "blocks_per_cycle" |> as_int) in + let slot_index = 0 in + let slot_size = dal_parameters.Dal.Parameters.cryptobox.slot_size in + + let* low_stake = + create_low_stake node dal_parameters proto_params blocks_per_cycle + in + let* new_account = Client.gen_and_show_keys client in + let amount = Tez.of_mutez_int (low_stake + 3_000_000) in + let* () = + Client.transfer + ~giver:Constant.bootstrap1.alias + ~receiver:new_account.alias + ~amount + ~burn_cap:Tez.one + client + in + let* () = bake_for client in + let*! () = Client.reveal ~fee:Tez.one ~src:new_account.alias client in + let* () = bake_for client in + let* _ = Client.register_delegate ~delegate:new_account.alias client in + let* () = bake_for client in + let* () = + Client.stake + (Tez.of_mutez_int low_stake) + ~staker:new_account.public_key_hash + client + in + let* () = bake_for client in + let client = Client.with_dal_node client ~dal_node in + log_step "Attester DAL node started (operator + attester)." ; + + let* current_level = Node.get_level node in + log_step + "Bake (almost) %d cycles to activate the delegate" + consensus_rights_delay ; + let* () = + bake_for + ~count: + ((blocks_per_cycle * (1 + consensus_rights_delay)) - current_level - 1) + client + in + + (* Find DAL attestation bitset in the last block for the delegate, if an + [attestation_with_dal] exists. *) + let get_delegate_dal_attestation_opt () = + let* json = + Node.RPC.call node + @@ RPC.get_chain_block_operations_validation_pass ~validation_pass:0 () + in + let dal_attestation_opt = + List.find_map + (fun json -> + let contents = JSON.(json |-> "contents" |> as_list) |> List.hd in + let delegate = + JSON.(contents |-> "metadata" |-> "delegate" |> as_string) + in + let kind = JSON.(contents |-> "kind" |> as_string) in + if + String.equal delegate new_account.public_key_hash + && String.equal kind "attestation_with_dal" + then Some JSON.(contents |-> "dal_attestation" |> as_string) + else None) + (JSON.as_list json) + in + return dal_attestation_opt + in + + (* Check if a bit is set in a decimal bitset string *) + let is_bit_set str bit = + let n = int_of_string str in + n land (1 lsl bit) <> 0 + in + + let count_not_in_committee = ref 0 in + let count_attestable_slots = ref 0 in + let count_traps = ref 0 in + + let* first_level = Client.level client in + + let check_one_level () = + (* Publish a slot, then bake a block with all delegates. *) + let* _ = + Helpers.publish_and_store_slot + client + dal_node + Constant.bootstrap2 + ~index:slot_index + (Helpers.make_slot ~slot_size "SLOTDATA") + in + let* () = bake_for client in + let* attested_level = Client.level client in + let published_level = attested_level - dal_parameters.attestation_lag in + + let* expected_dal_attestable_slots = + Dal_RPC.( + call dal_node + @@ get_attestable_slots ~attester:new_account ~attested_level) + in + let* actual_dal_attestable_slots_opt = + get_delegate_dal_attestation_opt () + in + let expected_bit_opt = + match expected_dal_attestable_slots with + | Not_in_committee -> None + | Attestable_slots slots -> + Some + (match List.nth_opt slots slot_index with + | Some b -> b + | None -> false) + in + (match expected_bit_opt with + | None -> incr count_not_in_committee + | Some true -> incr count_attestable_slots + | _ -> ()) ; + + match (expected_dal_attestable_slots, actual_dal_attestable_slots_opt) with + | Not_in_committee, None -> + log_step "Level %d: Not_in_committee" attested_level ; + unit + | Not_in_committee, Some dal_attestation -> + Test.fail + "Level %d: Not_in_committee expected, but found dal_attestation=%s \ + for %s" + attested_level + dal_attestation + new_account.public_key_hash + | Attestable_slots _slots, None -> + if published_level > first_level then + Test.fail + "Level %d: delegate %s is in the DAL committee (attestable slots \ + available) but no DAL attestation operation was found in the \ + block. This is invalid: when in committee, the baker must include \ + DAL content." + attested_level + new_account.public_key_hash + else unit + | Attestable_slots _slots, Some actual_dal_attestable_slots -> + let expected_bit = + match expected_bit_opt with Some b -> b | None -> false + in + let actual_bit = is_bit_set actual_dal_attestable_slots slot_index in + if actual_bit <> expected_bit then + Test.fail + "Level %d: DAL node says bit(%d)=%b, but chain dal_attestation=%s \ + (bit=%b)" + attested_level + slot_index + expected_bit + actual_dal_attestable_slots + actual_bit + else + let* has_traps = + let* traps = + Dal_RPC.( + call dal_node + @@ get_published_level_known_traps + ~published_level + ~pkh:new_account.public_key_hash + ~slot_index) + in + return @@ not @@ List.is_empty traps + in + if has_traps then incr count_traps ; + if published_level <= first_level then ( + let prefix_msg = sf "Level %d" attested_level in + Check.( + (actual_bit = false) + ~__LOC__ + bool + ~error_msg:(prefix_msg ^ "Expected false, got true")) ; + unit) + else if actual_bit <> not has_traps then + Test.fail + "Level %d, slot %d: chain dal_attestation bit is [%b], but \ + has_traps = %b" + attested_level + slot_index + actual_bit + has_traps + else unit + in + + (* Run until we've seen all three cases, or give up after a limit. *) + let max_steps = 10 * blocks_per_cycle in + log_step + "Running measurement for at most %d steps (up to %d cycles)." + max_steps + (max_steps / blocks_per_cycle) ; + + let rec loop step = + if + !count_not_in_committee > 0 + && !count_attestable_slots > 0 + && !count_traps > 0 + then unit + else if step > max_steps then + Test.fail + "Reached max_steps=%d without seeing all event kinds. Summary: \ + Not_in_committee=%d, Attestable_slots=%d, Traps=%d" + max_steps + !count_not_in_committee + !count_attestable_slots + !count_traps + else + let* () = check_one_level () in + loop (step + 1) + in + let* () = loop 1 in + + log_step + "Final summary: Not_in_committee = %d, Attestable_slots = %d, Traps = %d" + !count_not_in_committee + !count_attestable_slots + !count_traps ; + + unit + let register ~protocols = (* Tests with Layer1 node only *) scenario_with_layer1_node @@ -12442,6 +12669,23 @@ let register ~protocols = "new attester attests" test_new_attester_attests protocols ; + scenario_with_layer1_and_dal_nodes + ~operator_profiles:[0] + ~l1_history_mode:Default_with_refutation + ~traps_fraction:(Q.of_float 0.5) + ~number_of_slots:1 + "low stake attester attests (with traps)" + test_dal_low_stake_attester_attestable_slots + protocols ; + scenario_with_layer1_and_dal_nodes + ~operator_profiles:[0] + ~l1_history_mode:Default_with_refutation + ~traps_fraction:(Q.of_float 0.5) + ~all_bakers_attest_activation_threshold:Q.zero + ~number_of_slots:1 + "low stake attester attests (with traps and ABA activated)" + test_dal_low_stake_attester_attestable_slots + protocols ; scenario_with_layer1_and_dal_nodes ~bootstrap_profile:true ~l1_history_mode:Default_with_refutation -- GitLab From 4b37eec3be959ded117156ff7472130a6bc08ce7 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Fri, 28 Nov 2025 15:02:08 +0000 Subject: [PATCH 5/5] Tezt: Dal_node: Fix proxy route --- tezt/lib_tezos/dal_node.ml | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/tezt/lib_tezos/dal_node.ml b/tezt/lib_tezos/dal_node.ml index 2ebef5afd3c9..9c2bd1c54d97 100644 --- a/tezt/lib_tezos/dal_node.ml +++ b/tezt/lib_tezos/dal_node.ml @@ -629,6 +629,27 @@ module Proxy = struct in result ^ "\n" + let json_backfill ~published_level ~number_of_slots = + let open Ezjsonm in + let slot_id_to_json (level, index) = + dict [("slot_level", int level); ("slot_index", int index)] + in + let slot_ids = List.init number_of_slots (fun i -> (published_level, i)) in + let backfill = + dict + [ + ("kind", string "backfill"); + ( "backfill_payload", + dict + [ + ("slot_ids", list slot_id_to_json slot_ids); + ("trap_slot_ids", list slot_id_to_json []); + ("no_shards_attestation_levels", list int []); + ] ); + ] + in + Ezjsonm.value_to_string backfill ^ "\n" + let routes ~attestation_lag ~number_of_slots ~faulty_delegate ~target_attested_level = let open Ezjsonm in @@ -680,8 +701,12 @@ module Proxy = struct faulty_delegate in route ~path ~callback:(fun ~path:_ ~fetch_answer:_ ~fetch_stream -> - let* _resp, body = fetch_stream () in let published_level = target_attested_level - attestation_lag in + let backfill_stream = + let line = json_backfill ~published_level ~number_of_slots in + Lwt_stream.of_list [line] + in + let* _resp, body = fetch_stream () in let lines = json_lines_of_body body in let rewritten_lines = Lwt_stream.map @@ -690,7 +715,10 @@ module Proxy = struct ~published_level) lines in - let rewritten_body = Cohttp_lwt.Body.of_stream rewritten_lines in + let rewritten_stream = + Lwt_stream.append backfill_stream rewritten_lines + in + let rewritten_body = Cohttp_lwt.Body.of_stream rewritten_stream in return (Some (`Stream rewritten_body)))); (let path = Re.Str.regexp -- GitLab