From c7f384ca03334acf65cfb4cc770d0b6751501033 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 25 Nov 2025 15:56:01 +0000 Subject: [PATCH 1/9] Teztale: Add assigned_shard_indices type Currently, this does not change anything semantically, just adds the field to be filled in later commits with information from the L1 context. --- teztale/bin_teztale_archiver/converter.ml | 16 ++++++- teztale/bin_teztale_archiver/json_archiver.ml | 21 +++++++-- teztale/bin_teztale_server/exporter.ml | 1 + teztale/lib_teztale_base/data.ml | 45 ++++++++++++++++--- 4 files changed, 71 insertions(+), 12 deletions(-) diff --git a/teztale/bin_teztale_archiver/converter.ml b/teztale/bin_teztale_archiver/converter.ml index f5900c85c098..a8d21ac288d3 100644 --- a/teztale/bin_teztale_archiver/converter.ml +++ b/teztale/bin_teztale_archiver/converter.ml @@ -20,7 +20,13 @@ let to_received_ops ctx endpoint auth level data = let received_ops = List.map (fun Data.Delegate_operations. - {delegate; first_slot; attesting_power; operations} + { + delegate; + first_slot; + attesting_power; + operations; + assigned_shard_indices = _; + } -> ( Consensus_ops.{address = delegate; first_slot; power = attesting_power}, List.flatten @@ -86,7 +92,13 @@ let included_ops_map level data = List.fold_left (fun acc Data.Delegate_operations. - {delegate; first_slot = _; attesting_power; operations} + { + delegate; + first_slot = _; + attesting_power; + operations; + assigned_shard_indices = _; + } -> List.fold_left (fun acc diff --git a/teztale/bin_teztale_archiver/json_archiver.ml b/teztale/bin_teztale_archiver/json_archiver.ml index 1f7334248ed0..8afc72926e95 100644 --- a/teztale/bin_teztale_archiver/json_archiver.ml +++ b/teztale/bin_teztale_archiver/json_archiver.ml @@ -109,7 +109,13 @@ let add_inclusion_in_block block_hash validators delegate_operations = List.fold_left (fun (acc, missing) Data.Delegate_operations.( - {delegate; first_slot; attesting_power; operations} as delegate_ops) + { + delegate; + first_slot; + attesting_power; + operations; + assigned_shard_indices; + } as delegate_ops) -> match List.partition @@ -133,6 +139,7 @@ let add_inclusion_in_block block_hash validators delegate_operations = op.op.kind ?ops_round:op.op.round operations; + assigned_shard_indices; } :: acc, missing' ) @@ -161,6 +168,7 @@ let add_inclusion_in_block block_hash validators delegate_operations = block_inclusion = [block_hash]; }; ]; + assigned_shard_indices = []; } :: acc) updated_known @@ -353,8 +361,13 @@ let dump_received logger path ?unaccurate level received_ops = List.fold_left (fun (acc, missing) Data.Delegate_operations.( - {delegate; first_slot; attesting_power; operations} as - delegate_ops) + { + delegate; + first_slot; + attesting_power; + operations; + assigned_shard_indices; + } as delegate_ops) -> match List.partition @@ -372,6 +385,7 @@ let dump_received logger path ?unaccurate level received_ops = first_slot; attesting_power; operations = merge_operations operations new_operations; + assigned_shard_indices; } :: acc, missing' ) @@ -415,6 +429,7 @@ let dump_received logger path ?unaccurate level received_ops = block_inclusion = []; }) ops; + assigned_shard_indices = []; } :: acc) updated_known diff --git a/teztale/bin_teztale_server/exporter.ml b/teztale/bin_teztale_server/exporter.ml index 8c2f98905c1c..50fd55c6d993 100644 --- a/teztale/bin_teztale_server/exporter.ml +++ b/teztale/bin_teztale_server/exporter.ml @@ -370,6 +370,7 @@ let translate_ops info = first_slot; attesting_power = power; operations = translate pkh_ops; + assigned_shard_indices = []; } :: acc) info diff --git a/teztale/lib_teztale_base/data.ml b/teztale/lib_teztale_base/data.ml index c6d0e986af7b..de6910e99cc3 100644 --- a/teztale/lib_teztale_base/data.ml +++ b/teztale/lib_teztale_base/data.ml @@ -101,6 +101,7 @@ module Delegate_operations = struct first_slot : int; attesting_power : int; operations : operation list; + assigned_shard_indices : int list; } let legacy_encoding = @@ -110,7 +111,13 @@ module Delegate_operations = struct (fun (delegate, reception_time, errors, block_inclusion) -> match (reception_time, block_inclusion) with | None, [] -> - {delegate; first_slot = 0; attesting_power = 0; operations = []} + { + delegate; + first_slot = 0; + attesting_power = 0; + operations = []; + assigned_shard_indices = []; + } | _, _ -> let mempool_inclusion = match reception_time with @@ -132,6 +139,7 @@ module Delegate_operations = struct block_inclusion; }; ]; + assigned_shard_indices = []; }) (obj4 (req "delegate" Tezos_crypto.Signature.Public_key_hash.encoding) @@ -142,15 +150,38 @@ module Delegate_operations = struct let encoding = let open Data_encoding in conv - (fun {delegate; first_slot; attesting_power; operations} -> - (delegate, first_slot, attesting_power, operations)) - (fun (delegate, first_slot, attesting_power, operations) -> - {delegate; first_slot; attesting_power; operations}) - (obj4 + (fun { + delegate; + first_slot; + attesting_power; + operations; + assigned_shard_indices; + } + -> + ( delegate, + first_slot, + attesting_power, + operations, + assigned_shard_indices )) + (fun ( delegate, + first_slot, + attesting_power, + operations, + assigned_shard_indices ) + -> + { + delegate; + first_slot; + attesting_power; + operations; + assigned_shard_indices; + }) + (obj5 (req "delegate" Tezos_crypto.Signature.Public_key_hash.encoding) (dft "first_slot" int16 0) (dft "endorsing_power" int16 0) - (dft "operations" (list operation_encoding) [])) + (dft "operations" (list operation_encoding) []) + (dft "assigned_shard_indices" (list int16) [])) let encoding = let open Data_encoding in -- GitLab From dcef6050d7dac9e334eec02aa4adebf3ff100ac1 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 25 Nov 2025 16:23:22 +0000 Subject: [PATCH 2/9] Teztale: Add SQL support for DAL shard assignments This commit only wires the SQL path; the archiver integration will be added in following commit(s). --- teztale/bin_teztale_server/exporter.ml | 70 ++++++++++++++++++--- teztale/bin_teztale_server/sql_requests.ml | 16 +++++ teztale/bin_teztale_server/sql_requests.mli | 2 + 3 files changed, 79 insertions(+), 9 deletions(-) diff --git a/teztale/bin_teztale_server/exporter.ml b/teztale/bin_teztale_server/exporter.ml index 50fd55c6d993..733a0e5c1ca7 100644 --- a/teztale/bin_teztale_server/exporter.ml +++ b/teztale/bin_teztale_server/exporter.ml @@ -150,6 +150,20 @@ let select_missing_blocks conf db_pool boundaries = Int32Map.empty) db_pool +let select_dal_shard_assignments = + Caqti_request.Infix.( + Caqti_type.(t2 int32 int32) + ->* Caqti_type.(t3 int32 Sql_requests.Type.public_key_hash int)) + "SELECT\n\ + \ er.level,\n\ + \ delegates.address,\n\ + \ dsa.shard_index\n\ + FROM dal_shard_assignments dsa\n\ + JOIN endorsing_rights er ON er.id = dsa.endorsing_right\n\ + JOIN delegates ON delegates.id = er.delegate\n\ + WHERE er.level >= $1\n\ + AND er.level <= $2" + let select_blocks conf db_pool boundaries = let block_request = Caqti_request.Infix.( @@ -257,7 +271,10 @@ let select_ops conf db_pool boundaries = let ops = Ops.add delegate - (first_slot, power, Tezos_crypto.Hashed.Operation_hash.Map.empty) + ( first_slot, + power, + Tezos_crypto.Hashed.Operation_hash.Map.empty, + [] (* assigned_shard_indices *) ) ops in Int32Map.add level ops info @@ -272,7 +289,7 @@ let select_ops conf db_pool boundaries = Ops.update delegate (function - | Some (first_slot, power, ops) -> + | Some (first_slot, power, ops, assigned_shard_indices) -> let op = match Tezos_crypto.Hashed.Operation_hash.Map.find_opt op_hash ops @@ -284,7 +301,7 @@ let select_ops conf db_pool boundaries = let ops' = Tezos_crypto.Hashed.Operation_hash.Map.add op_hash op ops in - Some (first_slot, power, ops') + Some (first_slot, power, ops', assigned_shard_indices) | None -> None) ops in @@ -305,7 +322,7 @@ let select_ops conf db_pool boundaries = Ops.update delegate (function - | Some (first_slot, power, ops) -> + | Some (first_slot, power, ops, assigned_shard_indices) -> let op = match Tezos_crypto.Hashed.Operation_hash.Map.find_opt op_hash ops @@ -318,12 +335,39 @@ let select_ops conf db_pool boundaries = let ops' = Tezos_crypto.Hashed.Operation_hash.Map.add op_hash op ops in - Some (first_slot, power, ops') + Some (first_slot, power, ops', assigned_shard_indices) | None -> None) ops in Int32Map.add level ops info in + let cb_shards (level, delegate, shard_index) info = + let ops = + match Int32Map.find_opt level info with Some m -> m | None -> Ops.empty + in + let ops = + Ops.update + delegate + (function + | Some (first_slot, power, op_map, assigned_shard_indices) -> + let assigned_shard_indices = + if List.mem shard_index assigned_shard_indices then + assigned_shard_indices + else shard_index :: assigned_shard_indices + in + Some (first_slot, power, op_map, assigned_shard_indices) + | None -> + (* Shards for a delegate we never saw in rights, so we + record them with dummy rights to export them. *) + Some + ( 0, + 0, + Tezos_crypto.Hashed.Operation_hash.Map.empty, + [shard_index] )) + ops + in + Int32Map.add level ops info + in let* out = Caqti_lwt_unix.Pool.use (fun (module Db : Caqti_lwt.CONNECTION) -> @@ -338,10 +382,17 @@ let select_ops conf db_pool boundaries = Db.fold q_included cb_included boundaries out) db_pool in + let* out = + Caqti_lwt_unix.Pool.use + (fun (module Db : Caqti_lwt.CONNECTION) -> + maybe_with_metrics conf "select_operations_reception" @@ fun () -> + Db.fold q_received cb_received boundaries out) + db_pool + in Caqti_lwt_unix.Pool.use (fun (module Db : Caqti_lwt.CONNECTION) -> - maybe_with_metrics conf "select_operations_reception" @@ fun () -> - Db.fold q_received cb_received boundaries out) + maybe_with_metrics conf "select_dal_shard_assignments" @@ fun () -> + Db.fold select_dal_shard_assignments cb_shards boundaries out) db_pool let translate_ops info = @@ -363,14 +414,15 @@ let translate_ops info = Int32Map.map (fun info -> Tezos_crypto.Signature.Public_key_hash.Map.fold - (fun pkh (first_slot, power, pkh_ops) acc -> + (fun pkh (first_slot, power, pkh_ops, assigned_shard_indices) acc -> Lib_teztale_base.Data.Delegate_operations. { delegate = pkh; first_slot; attesting_power = power; operations = translate pkh_ops; - assigned_shard_indices = []; + assigned_shard_indices = + List.sort_uniq compare assigned_shard_indices; } :: acc) info diff --git a/teztale/bin_teztale_server/sql_requests.ml b/teztale/bin_teztale_server/sql_requests.ml index 87946cfa9da2..a066e7d84bc6 100644 --- a/teztale/bin_teztale_server/sql_requests.ml +++ b/teztale/bin_teztale_server/sql_requests.ml @@ -117,6 +117,14 @@ let create_missing_blocks = \ FOREIGN KEY (baker) REFERENCES delegates(id),\n\ \ UNIQUE (source, level, round))" +let create_dal_shard_assignments = + "CREATE TABLE IF NOT EXISTS dal_shard_assignments(\n\ + \ id $(PRIMARY_INCREMENTING_INT) PRIMARY KEY,\n\ + \ endorsing_right $(PRIMARY_INCREMENTING_INT_REF) NOT NULL,\n\ + \ shard_index INTEGER NOT NULL,\n\ + \ FOREIGN KEY (endorsing_right) REFERENCES endorsing_rights(id),\n\ + \ UNIQUE (endorsing_right, shard_index))" + module Mutex = struct let delegates = Lwt_mutex.create () @@ -137,6 +145,8 @@ module Mutex = struct let cycles = Lwt_mutex.create () let missing_blocks = Lwt_mutex.create () + + let dal_shard_assignments = Lwt_mutex.create () end let create_endorsing_rights_level_idx = @@ -167,6 +177,10 @@ let create_cycles_level_idx = let create_missing_blocks_level_idx = "CREATE INDEX IF NOT EXISTS missing_blocks_level_idx ON missing_blocks(level)" +let create_dal_shard_assignments_endorsing_right_idx = + "CREATE INDEX IF NOT EXISTS dal_shard_assignments_endorsing_right_idx ON \ + dal_shard_assignments(endorsing_right)" + let create_tables = [ create_delegates; @@ -179,6 +193,7 @@ let create_tables = create_endorsing_rights; create_cycles; create_missing_blocks; + create_dal_shard_assignments; create_endorsing_rights_level_idx; create_blocks_level_idx; create_operations_level_idx; @@ -187,6 +202,7 @@ let create_tables = create_operations_inclusion_operation_idx; create_cycles_level_idx; create_missing_blocks_level_idx; + create_dal_shard_assignments_endorsing_right_idx; ] let alter_blocks = diff --git a/teztale/bin_teztale_server/sql_requests.mli b/teztale/bin_teztale_server/sql_requests.mli index e9293af51ee3..25926af5ecef 100644 --- a/teztale/bin_teztale_server/sql_requests.mli +++ b/teztale/bin_teztale_server/sql_requests.mli @@ -41,6 +41,8 @@ module Mutex : sig val cycles : Lwt_mutex.t val missing_blocks : Lwt_mutex.t + + val dal_shard_assignments : Lwt_mutex.t end val create_tables : string list -- GitLab From eeea396ac5a2bb57aca24835a51ddcc5fdc388b2 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Tue, 25 Nov 2025 16:59:47 +0000 Subject: [PATCH 3/9] Teztale: Add DAL shard assignment wire encoding This wire type complements Delegate_operations.assigned_shard_indices, which remains the aggregated per-level view used in exported data. --- teztale/bin_teztale_server/sql_requests.ml | 16 +++++ teztale/bin_teztale_server/sql_requests.mli | 6 ++ .../bin_teztale_server/teztale_server_main.ml | 64 ++++++++++++++++++- teztale/lib_teztale_base/data.ml | 22 +++++++ 4 files changed, 107 insertions(+), 1 deletion(-) diff --git a/teztale/bin_teztale_server/sql_requests.ml b/teztale/bin_teztale_server/sql_requests.ml index a066e7d84bc6..9cda688d3b91 100644 --- a/teztale/bin_teztale_server/sql_requests.ml +++ b/teztale/bin_teztale_server/sql_requests.ml @@ -415,5 +415,21 @@ let insert_received_block = COALESCE(blocks_reception.validation_timestamp, \ excluded.validation_timestamp)" +let insert_dal_shard_assignment = + Caqti_request.Infix.( + Caqti_type.( + t3 + (* level *) int32 + (* delegate *) Type.public_key_hash + (* shard_index *) int + ->. unit)) + "INSERT INTO dal_shard_assignments (endorsing_right, shard_index)\n\ + SELECT er.id AS endorsing_right, $3 AS shard_index\n\ + FROM endorsing_rights er\n\ + JOIN delegates ON er.delegate = delegates.id\n\ + WHERE er.level = $1\n\ + AND delegates.address = $2\n\ + ON CONFLICT DO NOTHING" + let maybe_with_metrics (c : Config.t) (name : string) (f : unit -> 'a Lwt.t) = if c.with_metrics then Metrics.sql name f else f () diff --git a/teztale/bin_teztale_server/sql_requests.mli b/teztale/bin_teztale_server/sql_requests.mli index 25926af5ecef..5d8dbe93f534 100644 --- a/teztale/bin_teztale_server/sql_requests.mli +++ b/teztale/bin_teztale_server/sql_requests.mli @@ -122,4 +122,10 @@ val insert_received_block : [`Zero] ) Caqti_request.t +val insert_dal_shard_assignment : + ( int32 * Tezos_crypto.Signature.public_key_hash * int, + unit, + [`Zero] ) + Caqti_request.t + val maybe_with_metrics : Config.t -> string -> (unit -> 'a Lwt.t) -> 'a Lwt.t diff --git a/teztale/bin_teztale_server/teztale_server_main.ml b/teztale/bin_teztale_server/teztale_server_main.ml index fc21d2ffc3a8..bf1b9a9ade5f 100644 --- a/teztale/bin_teztale_server/teztale_server_main.ml +++ b/teztale/bin_teztale_server/teztale_server_main.ml @@ -897,6 +897,20 @@ let import_callback ~logger conf db_pool g data = (level, first_slot, attesting_power, delegate)) data.Lib_teztale_base.Data.delegate_operations in + (* DAL shard assignments *) + let* () = + Tezos_lwt_result_stdlib.Lwtreslib.Bare.List.iter_es + (fun Lib_teztale_base.Data.Delegate_operations. + {delegate; assigned_shard_indices; _} + -> + Tezos_lwt_result_stdlib.Lwtreslib.Bare.List.iter_es + (fun shard_index -> + Db.exec + Sql_requests.insert_dal_shard_assignment + (level, delegate, shard_index)) + assigned_shard_indices) + data.Lib_teztale_base.Data.delegate_operations + in (* blocks *) let* () = Tezos_lwt_result_stdlib.Lwtreslib.Bare.List.iter_es @@ -1003,6 +1017,45 @@ let import_callback ~logger conf db_pool g data = ~body:"Level imported" ()) +let dal_shards_callback ~logger conf db_pool g shard_assignments = + let level = Int32.of_string (Re.Group.get g 1) in + let out = + Caqti_lwt_unix.Pool.use + (fun (module Db : Caqti_lwt.CONNECTION) -> + let open Tezos_lwt_result_stdlib.Lwtreslib.Bare.Monad.Lwt_result_syntax in + let* () = + let delegates = + List.map + (fun Lib_teztale_base.Data.Dal.{delegate; _} -> delegate) + shard_assignments + in + may_insert_delegates (module Db) conf delegates + in + let rows = + List.concat_map + (fun Lib_teztale_base.Data.Dal.{delegate; assigned_shard_indices} -> + List.map + (fun shard_index -> (level, delegate, shard_index)) + assigned_shard_indices) + shard_assignments + in + maybe_with_metrics conf "insert_dal_shard_assignments" @@ fun () -> + without_cache + Sql_requests.Mutex.dal_shard_assignments + Sql_requests.insert_dal_shard_assignment + (module Db) + conf + rows) + db_pool + in + with_caqti_error ~logger out (fun () -> + Cohttp_lwt_unix.Server.respond_string + ~headers: + (Cohttp.Header.init_with "content-type" "text/plain; charset=UTF-8") + ~status:`OK + ~body:"DAL shard assignments" + ()) + let extract_boundaries g = let min = Re.Group.get g 1 in let max = Re.Group.get g 2 in @@ -1016,7 +1069,9 @@ let extract_boundaries g = - //mempool Used by archiver to send data about consensus operations for a given level. - //import - Use by archiver to import past data recorded locally. + Used by archiver to import past data recorded locally. + - //dal_shards + Used by archiver to send DAL shard assignments per delegate for a given level. - /timestamp/ Get the levels (if any) before and after a given timestamp. - /ping @@ -1082,6 +1137,13 @@ let routes : Lib_teztale_base.Data.encoding body (import_callback ~logger conf db_pool g)) ); + ( Re.seq [Re.str "/"; Re.group (Re.rep1 Re.digit); Re.str "/dal_shards"], + fun g ~logger ~conf ~admins:_ ~users db_pool header meth body -> + post_only_endpoint !users header meth (fun _source -> + with_data + Lib_teztale_base.Data.Dal.shard_assignments_encoding + body + (dal_shards_callback ~logger conf db_pool g)) ); ( Re.seq [Re.str "/timestamp/"; Re.group (Re.rep1 Re.digit)], fun g ~logger ~conf:_ ~admins:_ ~users:_ db_pool _header meth _body -> get_only_endpoint meth (fun () -> diff --git a/teztale/lib_teztale_base/data.ml b/teztale/lib_teztale_base/data.ml index de6910e99cc3..779a6bb12dc3 100644 --- a/teztale/lib_teztale_base/data.ml +++ b/teztale/lib_teztale_base/data.ml @@ -391,3 +391,25 @@ module Archiver = struct (dft "preendorsements" (list Consensus_ops.block_op_encoding) []) (dft "baking_rights" (list baking_right_encoding) []))) end + +module Dal = struct + type shard_assignment = { + delegate : Tezos_crypto.Signature.public_key_hash; + assigned_shard_indices : int list; + } + + type shard_assignments = shard_assignment list + + let shard_assignment_encoding = + let open Data_encoding in + conv + (fun {delegate; assigned_shard_indices} -> + (delegate, assigned_shard_indices)) + (fun (delegate, assigned_shard_indices) -> + {delegate; assigned_shard_indices}) + (obj2 + (req "delegate" Tezos_crypto.Signature.Public_key_hash.encoding) + (dft "assigned_shard_indices" (list int16) [])) + + let shard_assignments_encoding = Data_encoding.list shard_assignment_encoding +end -- GitLab From cfaba2ccec293f5c9b9f1bf6556dd0d1159ad121 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 26 Nov 2025 13:57:16 +0000 Subject: [PATCH 4/9] Teztale: Add DAL shards in archiver interface --- teztale/bin_teztale_archiver/archiver.ml | 2 ++ teztale/bin_teztale_archiver/archiver.mli | 2 ++ teztale/bin_teztale_archiver/json_archiver.ml | 3 +++ teztale/bin_teztale_archiver/server_archiver.ml | 17 +++++++++++++++++ .../bin_teztale_archiver/server_archiver.mli | 1 + .../teztale_archiver_main.ml | 2 ++ 6 files changed, 27 insertions(+) diff --git a/teztale/bin_teztale_archiver/archiver.ml b/teztale/bin_teztale_archiver/archiver.ml index 3efe026672a7..df097473dd1a 100644 --- a/teztale/bin_teztale_archiver/archiver.ml +++ b/teztale/bin_teztale_archiver/archiver.ml @@ -35,4 +35,6 @@ module type S = sig unit val add_rights : level:Int32.t -> Consensus_ops.rights -> unit + + val add_dal_shards : level:Int32.t -> Data.Dal.shard_assignment list -> unit end diff --git a/teztale/bin_teztale_archiver/archiver.mli b/teztale/bin_teztale_archiver/archiver.mli index 3efe026672a7..df097473dd1a 100644 --- a/teztale/bin_teztale_archiver/archiver.mli +++ b/teztale/bin_teztale_archiver/archiver.mli @@ -35,4 +35,6 @@ module type S = sig unit val add_rights : level:Int32.t -> Consensus_ops.rights -> unit + + val add_dal_shards : level:Int32.t -> Data.Dal.shard_assignment list -> unit end diff --git a/teztale/bin_teztale_archiver/json_archiver.ml b/teztale/bin_teztale_archiver/json_archiver.ml index 8afc72926e95..47218b92e001 100644 --- a/teztale/bin_teztale_archiver/json_archiver.ml +++ b/teztale/bin_teztale_archiver/json_archiver.ml @@ -529,3 +529,6 @@ let add_block ~level (block, cycle_info, (endos, preendos), baking_rights) = (* not used *) let add_rights ~level:_ _rights = () + +(* not used *) +let add_dal_shards ~level:_ _shard_assignments = () diff --git a/teztale/bin_teztale_archiver/server_archiver.ml b/teztale/bin_teztale_archiver/server_archiver.ml index ff8e9bd38239..9a950cd2af43 100644 --- a/teztale/bin_teztale_archiver/server_archiver.ml +++ b/teztale/bin_teztale_archiver/server_archiver.ml @@ -19,6 +19,7 @@ type chunk = * Data.baking_right list) | Mempool of Int32.t (* level *) * Consensus_ops.delegate_ops | Rights of (Int32.t (* level *) * Consensus_ops.rights) + | Dal_shards of Int32.t (* level *) * Data.Dal.shard_assignment list type ctx = { cohttp_ctx : Cohttp_lwt_unix.Net.ctx; @@ -99,12 +100,25 @@ let send_mempool ctx level ops = let path = Int32.to_string level ^ "/mempool" in send_something ctx path body +let send_dal_shards ctx level shard_assignments = + let body = + `String + (Ezjsonm.value_to_string + (Data_encoding.Json.construct + Data.Dal.shard_assignments_encoding + shard_assignments)) + in + let path = Int32.to_string level ^ "/dal_shards" in + send_something ctx path body + let chunk_stream, chunk_feeder = Lwt_stream.create () let send actx = function | Block (level, block) -> send_block actx level block | Mempool (level, ops) -> send_mempool actx level ops | Rights (level, rights) -> send_rights actx level rights + | Dal_shards (level, shard_assignments) -> + send_dal_shards actx level shard_assignments let launch actx _source = Lwt_stream.iter_p @@ -120,3 +134,6 @@ let add_mempool ?unaccurate:(_ : bool option) ~level items = let add_block ~level block = chunk_feeder (Some (Block (level, block))) let add_rights ~level rights = chunk_feeder (Some (Rights (level, rights))) + +let add_dal_shards ~level shard_assignments = + chunk_feeder (Some (Dal_shards (level, shard_assignments))) diff --git a/teztale/bin_teztale_archiver/server_archiver.mli b/teztale/bin_teztale_archiver/server_archiver.mli index ca945541ceba..24c61502ab1d 100644 --- a/teztale/bin_teztale_archiver/server_archiver.mli +++ b/teztale/bin_teztale_archiver/server_archiver.mli @@ -19,6 +19,7 @@ type chunk = * Data.baking_right list) | Mempool of Int32.t (* level *) * Consensus_ops.delegate_ops | Rights of (Int32.t (* level *) * Consensus_ops.rights) + | Dal_shards of Int32.t (* level *) * Data.Dal.shard_assignment list type ctx = { cohttp_ctx : Cohttp_lwt_unix.Net.ctx; diff --git a/teztale/bin_teztale_archiver/teztale_archiver_main.ml b/teztale/bin_teztale_archiver/teztale_archiver_main.ml index d9478c201976..267b72b934d2 100644 --- a/teztale/bin_teztale_archiver/teztale_archiver_main.ml +++ b/teztale/bin_teztale_archiver/teztale_archiver_main.ml @@ -90,6 +90,7 @@ let server_to_json_chunk : Server_archiver.chunk -> Json_archiver.chunk option = baking_rights )) | Mempool (level, ops) -> Some (Mempool (None, level, ops)) | Rights (_, _) -> None + | Dal_shards (_, _) -> None let select_commands _ctxt Client_config.{chain; _} = return @@ -232,6 +233,7 @@ let select_commands _ctxt Client_config.{chain; _} = | Block (level, _) -> (level, "/block") | Mempool (level, _) -> (level, "/mempool") | Rights (level, _) -> (level, "/rights") + | Dal_shards (level, _) -> (level, "/dal_shards") in Printf.sprintf "(%ld) Failed to send %s data and --backup-dir is \ -- GitLab From 199a70a51305fe8fc3f13090acc34487b675db6b Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 26 Nov 2025 16:47:33 +0000 Subject: [PATCH 5/9] Teztale: General_archiver: Collect DAL shard assignments per level --- .../bin_teztale_archiver/general_archiver.ml | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/teztale/bin_teztale_archiver/general_archiver.ml b/teztale/bin_teztale_archiver/general_archiver.ml index 9b0bcb120f0a..edf779c43341 100644 --- a/teztale/bin_teztale_archiver/general_archiver.ml +++ b/teztale/bin_teztale_archiver/general_archiver.ml @@ -137,11 +137,15 @@ module Define (Services : Protocol_machinery.PROTOCOL_SERVICES) = struct (Some cycle_info) reception_times + let dal_shards_of _ctxt _level = + (* TODO: call a DAL RPC here and map to [Data.Dal.shard_assignment list]. *) + return_nil + let () = Protocol_hash.Table.add live_block_machine Services.hash - (rights_of, get_applied_block) + (rights_of, get_applied_block, dal_shards_of) let rec pack_by_slot i e = function | ((i', l) as x) :: t -> @@ -381,7 +385,8 @@ module Loops (Archiver : Archiver.S) = struct current_protocol) in Lwt.return ((fun _ _ _ _ _ -> return_unit), None) - | Some (rights_of, get_applied_block) -> + | Some (rights_of, get_applied_block, dal_shards_of) + -> let recorder cctx level hash header reception_time = let* (( _block_info, @@ -400,12 +405,21 @@ module Loops (Archiver : Archiver.S) = struct let* rights = rights_of cctx (Int32.pred level) in + let pred_level = Int32.pred level in let () = maybe_add_rights (module Archiver) - (Int32.pred level) + pred_level rights in + let* dal_shards = + dal_shards_of cctx pred_level + in + let () = + Archiver.add_dal_shards + ~level:pred_level + dal_shards + in return_unit in let* () = @@ -419,6 +433,12 @@ module Loops (Archiver : Archiver.S) = struct level rights in + let* dal_shards = + dal_shards_of cctx level + in + let () = + Archiver.add_dal_shards ~level dal_shards + in return_unit in let () = Archiver.add_block ~level block_data in -- GitLab From 0478265fa16b847ff1e4c70750f098d9a966ab0f Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 26 Nov 2025 16:50:26 +0000 Subject: [PATCH 6/9] Teztale: General_archiver: Refactor attestations and preattestations --- .../bin_teztale_archiver/general_archiver.ml | 60 +++++++------------ 1 file changed, 22 insertions(+), 38 deletions(-) diff --git a/teztale/bin_teztale_archiver/general_archiver.ml b/teztale/bin_teztale_archiver/general_archiver.ml index edf779c43341..0eb0df2b0a1b 100644 --- a/teztale/bin_teztale_archiver/general_archiver.ml +++ b/teztale/bin_teztale_archiver/general_archiver.ml @@ -337,6 +337,16 @@ module Loops (Archiver : Archiver.S) = struct in Lwt.return_unit + let maybe_register_rights_and_dal_shards ~rights_of ~dal_shards_of cctx ~level + ops = + if List.is_empty ops then return_unit + else + let* rights = rights_of cctx level in + let () = maybe_add_rights (module Archiver) level rights in + let* shard_assignments = dal_shards_of cctx level in + let () = Archiver.add_dal_shards ~level shard_assignments in + return_unit + let reception_blocks_loop cctx = let logger = Log.logger () in let*! block_stream = @@ -400,46 +410,20 @@ module Loops (Archiver : Archiver.S) = struct reception_time in let* () = - if List.is_empty attestations then return_unit - else - let* rights = - rights_of cctx (Int32.pred level) - in - let pred_level = Int32.pred level in - let () = - maybe_add_rights - (module Archiver) - pred_level - rights - in - let* dal_shards = - dal_shards_of cctx pred_level - in - let () = - Archiver.add_dal_shards - ~level:pred_level - dal_shards - in - return_unit + maybe_register_rights_and_dal_shards + ~rights_of + ~dal_shards_of + cctx + ~level:(Int32.pred level) + attestations in let* () = - if List.is_empty preattestations then - return_unit - else - let* rights = rights_of cctx level in - let () = - maybe_add_rights - (module Archiver) - level - rights - in - let* dal_shards = - dal_shards_of cctx level - in - let () = - Archiver.add_dal_shards ~level dal_shards - in - return_unit + maybe_register_rights_and_dal_shards + ~rights_of + ~dal_shards_of + cctx + ~level + preattestations in let () = Archiver.add_block ~level block_data in return_unit -- GitLab From 4b1d58a57af59ef0a48189959129eb4afcf98ab5 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 26 Nov 2025 15:46:45 +0000 Subject: [PATCH 7/9] Teztale: Protocol machines: Implement dal_shards_of --- .../Proxford_machine.real.ml | 2 ++ .../PsBabyM1_machine.real.ml | 2 ++ .../PsCARTHA_machine.real.ml | 2 ++ .../PsDELPH1_machine.real.ml | 2 ++ .../PsFLoren_machine.real.ml | 2 ++ .../PsParisC_machine.real.ml | 19 +++++++++++++++++++ .../PsQuebec_machine.real.ml | 19 +++++++++++++++++++ .../PsRiotum_machine.real.ml | 19 +++++++++++++++++++ .../PsYLVpVv_machine.real.ml | 2 ++ .../PsddFKi3_machine.real.ml | 2 ++ .../Psithaca_machine.real.ml | 2 ++ .../Pt24m4xi_machine.real.ml | 2 ++ .../PtCJ7pwo_machine.real.ml | 2 ++ .../PtEdo2Zk_machine.real.ml | 2 ++ .../PtGRANAD_machine.real.ml | 2 ++ .../PtHangz2_machine.real.ml | 2 ++ .../PtJakart_machine.real.ml | 2 ++ .../PtKathma_machine.real.ml | 2 ++ .../PtLimaPt_machine.real.ml | 2 ++ .../PtMumbai_machine.real.ml | 2 ++ .../PtNairob_machine.real.ml | 2 ++ .../PtParisB_machine.real.ml | 19 +++++++++++++++++++ .../PtSeouLo_machine.real.ml | 15 +++++++++++++++ .../PtTALLiN_machine.real.ml | 15 +++++++++++++++ .../alpha_machine.real.ml | 15 +++++++++++++++ .../bin_teztale_archiver/general_archiver.ml | 6 +++--- .../protocol_machinery.ml | 4 ++++ .../protocol_machinery.mli | 4 ++++ 28 files changed, 168 insertions(+), 3 deletions(-) diff --git a/teztale/bin_teztale_archiver/Proxford_machine.real.ml b/teztale/bin_teztale_archiver/Proxford_machine.real.ml index b248ccfdc044..f4bd381cc7b9 100644 --- a/teztale/bin_teztale_archiver/Proxford_machine.real.ml +++ b/teztale/bin_teztale_archiver/Proxford_machine.real.ml @@ -183,6 +183,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/PsBabyM1_machine.real.ml b/teztale/bin_teztale_archiver/PsBabyM1_machine.real.ml index 3d853064c72e..1bae8307acad 100644 --- a/teztale/bin_teztale_archiver/PsBabyM1_machine.real.ml +++ b/teztale/bin_teztale_archiver/PsBabyM1_machine.real.ml @@ -158,6 +158,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int priority = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let block_round header = match Data_encoding.Binary.of_bytes diff --git a/teztale/bin_teztale_archiver/PsCARTHA_machine.real.ml b/teztale/bin_teztale_archiver/PsCARTHA_machine.real.ml index 70bed6b30514..7a6faa08148d 100644 --- a/teztale/bin_teztale_archiver/PsCARTHA_machine.real.ml +++ b/teztale/bin_teztale_archiver/PsCARTHA_machine.real.ml @@ -158,6 +158,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int priority = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let block_round header = match Data_encoding.Binary.of_bytes diff --git a/teztale/bin_teztale_archiver/PsDELPH1_machine.real.ml b/teztale/bin_teztale_archiver/PsDELPH1_machine.real.ml index 71ce75bebe0c..1a33aa52d137 100644 --- a/teztale/bin_teztale_archiver/PsDELPH1_machine.real.ml +++ b/teztale/bin_teztale_archiver/PsDELPH1_machine.real.ml @@ -158,6 +158,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int priority = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let block_round header = match Data_encoding.Binary.of_bytes diff --git a/teztale/bin_teztale_archiver/PsFLoren_machine.real.ml b/teztale/bin_teztale_archiver/PsFLoren_machine.real.ml index 0d8862faefc8..9155f9d92168 100644 --- a/teztale/bin_teztale_archiver/PsFLoren_machine.real.ml +++ b/teztale/bin_teztale_archiver/PsFLoren_machine.real.ml @@ -172,6 +172,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int priority = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let block_round header = match Data_encoding.Binary.of_bytes diff --git a/teztale/bin_teztale_archiver/PsParisC_machine.real.ml b/teztale/bin_teztale_archiver/PsParisC_machine.real.ml index 8ee4ebc9c5af..f19e7600df8a 100644 --- a/teztale/bin_teztale_archiver/PsParisC_machine.real.ml +++ b/teztale/bin_teztale_archiver/PsParisC_machine.real.ml @@ -188,6 +188,25 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of cctxt level = + let raw_level = Protocol.Alpha_context.Raw_level.of_int32_exn level in + let* shard_assignments = + Plugin.RPC.Dal.dal_shards + cctxt + (cctxt#chain, `Level level) + ~level:raw_level + () + in + return + @@ List.map + (fun Plugin.RPC.Dal.S.{delegate; indexes} -> + Data.Dal. + { + delegate = Tezos_crypto.Signature.Of_V1.public_key_hash delegate; + assigned_shard_indices = indexes; + }) + shard_assignments + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/PsQuebec_machine.real.ml b/teztale/bin_teztale_archiver/PsQuebec_machine.real.ml index e1aaeb4369af..5538c0f6f0f1 100644 --- a/teztale/bin_teztale_archiver/PsQuebec_machine.real.ml +++ b/teztale/bin_teztale_archiver/PsQuebec_machine.real.ml @@ -188,6 +188,25 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of cctxt level = + let raw_level = Protocol.Alpha_context.Raw_level.of_int32_exn level in + let* shard_assignments = + Plugin.RPC.Dal.dal_shards + cctxt + (cctxt#chain, `Level level) + ~level:raw_level + () + in + return + @@ List.map + (fun Plugin.RPC.Dal.S.{delegate; indexes} -> + Data.Dal. + { + delegate = Tezos_crypto.Signature.Of_V1.public_key_hash delegate; + assigned_shard_indices = indexes; + }) + shard_assignments + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/PsRiotum_machine.real.ml b/teztale/bin_teztale_archiver/PsRiotum_machine.real.ml index 1bf3f3f0a42b..16272dd52f66 100644 --- a/teztale/bin_teztale_archiver/PsRiotum_machine.real.ml +++ b/teztale/bin_teztale_archiver/PsRiotum_machine.real.ml @@ -188,6 +188,25 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of cctxt level = + let raw_level = Protocol.Alpha_context.Raw_level.of_int32_exn level in + let* shard_assignments = + Plugin.RPC.Dal.dal_shards + cctxt + (cctxt#chain, `Level level) + ~level:raw_level + () + in + return + @@ List.map + (fun Plugin.RPC.Dal.S.{delegate; indexes} -> + Data.Dal. + { + delegate = Tezos_crypto.Signature.Of_V1.public_key_hash delegate; + assigned_shard_indices = indexes; + }) + shard_assignments + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/PsYLVpVv_machine.real.ml b/teztale/bin_teztale_archiver/PsYLVpVv_machine.real.ml index d733fa70e4a9..4d887c96cd69 100644 --- a/teztale/bin_teztale_archiver/PsYLVpVv_machine.real.ml +++ b/teztale/bin_teztale_archiver/PsYLVpVv_machine.real.ml @@ -158,6 +158,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int priority = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let block_round header = match Data_encoding.Binary.of_bytes diff --git a/teztale/bin_teztale_archiver/PsddFKi3_machine.real.ml b/teztale/bin_teztale_archiver/PsddFKi3_machine.real.ml index c27d42260118..cb087bae500b 100644 --- a/teztale/bin_teztale_archiver/PsddFKi3_machine.real.ml +++ b/teztale/bin_teztale_archiver/PsddFKi3_machine.real.ml @@ -158,6 +158,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int priority = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let block_round header = match Data_encoding.Binary.of_bytes diff --git a/teztale/bin_teztale_archiver/Psithaca_machine.real.ml b/teztale/bin_teztale_archiver/Psithaca_machine.real.ml index 93f3b93d7513..61922d0a42ae 100644 --- a/teztale/bin_teztale_archiver/Psithaca_machine.real.ml +++ b/teztale/bin_teztale_archiver/Psithaca_machine.real.ml @@ -186,6 +186,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/Pt24m4xi_machine.real.ml b/teztale/bin_teztale_archiver/Pt24m4xi_machine.real.ml index 742789b46c82..98474167ebd4 100644 --- a/teztale/bin_teztale_archiver/Pt24m4xi_machine.real.ml +++ b/teztale/bin_teztale_archiver/Pt24m4xi_machine.real.ml @@ -158,6 +158,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int priority = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let block_round header = match Data_encoding.Binary.of_bytes diff --git a/teztale/bin_teztale_archiver/PtCJ7pwo_machine.real.ml b/teztale/bin_teztale_archiver/PtCJ7pwo_machine.real.ml index a7b59b162a6c..44933f66a11d 100644 --- a/teztale/bin_teztale_archiver/PtCJ7pwo_machine.real.ml +++ b/teztale/bin_teztale_archiver/PtCJ7pwo_machine.real.ml @@ -158,6 +158,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int priority = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let block_round header = match Data_encoding.Binary.of_bytes diff --git a/teztale/bin_teztale_archiver/PtEdo2Zk_machine.real.ml b/teztale/bin_teztale_archiver/PtEdo2Zk_machine.real.ml index 6867a25add34..d897c2ed5237 100644 --- a/teztale/bin_teztale_archiver/PtEdo2Zk_machine.real.ml +++ b/teztale/bin_teztale_archiver/PtEdo2Zk_machine.real.ml @@ -158,6 +158,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int priority = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let block_round header = match Data_encoding.Binary.of_bytes diff --git a/teztale/bin_teztale_archiver/PtGRANAD_machine.real.ml b/teztale/bin_teztale_archiver/PtGRANAD_machine.real.ml index 035f46b97187..708099c5f9e6 100644 --- a/teztale/bin_teztale_archiver/PtGRANAD_machine.real.ml +++ b/teztale/bin_teztale_archiver/PtGRANAD_machine.real.ml @@ -172,6 +172,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int priority = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let block_round header = match Data_encoding.Binary.of_bytes diff --git a/teztale/bin_teztale_archiver/PtHangz2_machine.real.ml b/teztale/bin_teztale_archiver/PtHangz2_machine.real.ml index 532b155847af..5f40c4ca9576 100644 --- a/teztale/bin_teztale_archiver/PtHangz2_machine.real.ml +++ b/teztale/bin_teztale_archiver/PtHangz2_machine.real.ml @@ -172,6 +172,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int priority = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let block_round header = match Data_encoding.Binary.of_bytes diff --git a/teztale/bin_teztale_archiver/PtJakart_machine.real.ml b/teztale/bin_teztale_archiver/PtJakart_machine.real.ml index 336d92014683..8f9165ee1f69 100644 --- a/teztale/bin_teztale_archiver/PtJakart_machine.real.ml +++ b/teztale/bin_teztale_archiver/PtJakart_machine.real.ml @@ -186,6 +186,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/PtKathma_machine.real.ml b/teztale/bin_teztale_archiver/PtKathma_machine.real.ml index 9c7c2084e38d..935c66a8c356 100644 --- a/teztale/bin_teztale_archiver/PtKathma_machine.real.ml +++ b/teztale/bin_teztale_archiver/PtKathma_machine.real.ml @@ -186,6 +186,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/PtLimaPt_machine.real.ml b/teztale/bin_teztale_archiver/PtLimaPt_machine.real.ml index b67d6a929b2e..61af5e1fdf98 100644 --- a/teztale/bin_teztale_archiver/PtLimaPt_machine.real.ml +++ b/teztale/bin_teztale_archiver/PtLimaPt_machine.real.ml @@ -187,6 +187,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/PtMumbai_machine.real.ml b/teztale/bin_teztale_archiver/PtMumbai_machine.real.ml index 49b1da0c5037..d1bd33c43392 100644 --- a/teztale/bin_teztale_archiver/PtMumbai_machine.real.ml +++ b/teztale/bin_teztale_archiver/PtMumbai_machine.real.ml @@ -183,6 +183,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/PtNairob_machine.real.ml b/teztale/bin_teztale_archiver/PtNairob_machine.real.ml index ecfeebc08199..217ba7938735 100644 --- a/teztale/bin_teztale_archiver/PtNairob_machine.real.ml +++ b/teztale/bin_teztale_archiver/PtNairob_machine.real.ml @@ -183,6 +183,8 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of _cctxt _level = return_nil + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/PtParisB_machine.real.ml b/teztale/bin_teztale_archiver/PtParisB_machine.real.ml index 899876b0687e..23447e7c9ae5 100644 --- a/teztale/bin_teztale_archiver/PtParisB_machine.real.ml +++ b/teztale/bin_teztale_archiver/PtParisB_machine.real.ml @@ -188,6 +188,25 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of cctxt level = + let raw_level = Protocol.Alpha_context.Raw_level.of_int32_exn level in + let* shard_assignments = + Plugin.RPC.Dal.dal_shards + cctxt + (cctxt#chain, `Level level) + ~level:raw_level + () + in + return + @@ List.map + (fun Plugin.RPC.Dal.S.{delegate; indexes} -> + Data.Dal. + { + delegate = Tezos_crypto.Signature.Of_V1.public_key_hash delegate; + assigned_shard_indices = indexes; + }) + shard_assignments + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/PtSeouLo_machine.real.ml b/teztale/bin_teztale_archiver/PtSeouLo_machine.real.ml index eaddbfa2acdc..180d52a80a0e 100644 --- a/teztale/bin_teztale_archiver/PtSeouLo_machine.real.ml +++ b/teztale/bin_teztale_archiver/PtSeouLo_machine.real.ml @@ -188,6 +188,21 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of cctxt level = + let raw_level = Protocol.Alpha_context.Raw_level.of_int32_exn level in + let* shard_assignments = + Plugin.RPC.Dal.dal_shards + cctxt + (cctxt#chain, `Level level) + ~level:raw_level + () + in + return + @@ List.map + (fun Plugin.RPC.Dal.S.{delegate; indexes} -> + Data.Dal.{delegate; assigned_shard_indices = indexes}) + shard_assignments + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/PtTALLiN_machine.real.ml b/teztale/bin_teztale_archiver/PtTALLiN_machine.real.ml index b5a0faf178ac..ed842de7ba4d 100644 --- a/teztale/bin_teztale_archiver/PtTALLiN_machine.real.ml +++ b/teztale/bin_teztale_archiver/PtTALLiN_machine.real.ml @@ -188,6 +188,21 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of cctxt level = + let raw_level = Protocol.Alpha_context.Raw_level.of_int32_exn level in + let* shard_assignments = + Plugin.RPC.Dal.dal_shards + cctxt + (cctxt#chain, `Level level) + ~level:raw_level + () + in + return + @@ List.map + (fun Plugin.RPC.Dal.S.{delegate; indexes} -> + Data.Dal.{delegate; assigned_shard_indices = indexes}) + shard_assignments + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/alpha_machine.real.ml b/teztale/bin_teztale_archiver/alpha_machine.real.ml index 5c655ce7fd7a..cd31a5f9c488 100644 --- a/teztale/bin_teztale_archiver/alpha_machine.real.ml +++ b/teztale/bin_teztale_archiver/alpha_machine.real.ml @@ -188,6 +188,21 @@ module Services : Protocol_machinery.PROTOCOL_SERVICES = struct assert (Int32.of_int round = hd.round) ; return (hd.delegate, baking_rights) + let dal_shards_of cctxt level = + let raw_level = Protocol.Alpha_context.Raw_level.of_int32_exn level in + let* shard_assignments = + Plugin.RPC.Dal.dal_shards + cctxt + (cctxt#chain, `Level level) + ~level:raw_level + () + in + return + @@ List.map + (fun Plugin.RPC.Dal.S.{delegate; indexes} -> + Data.Dal.{delegate; assigned_shard_indices = indexes}) + shard_assignments + let raw_block_round shell_header = let wrap = Environment.wrap_tzresult in let open Result_syntax in diff --git a/teztale/bin_teztale_archiver/general_archiver.ml b/teztale/bin_teztale_archiver/general_archiver.ml index 0eb0df2b0a1b..d41341723cd9 100644 --- a/teztale/bin_teztale_archiver/general_archiver.ml +++ b/teztale/bin_teztale_archiver/general_archiver.ml @@ -137,9 +137,9 @@ module Define (Services : Protocol_machinery.PROTOCOL_SERVICES) = struct (Some cycle_info) reception_times - let dal_shards_of _ctxt _level = - (* TODO: call a DAL RPC here and map to [Data.Dal.shard_assignment list]. *) - return_nil + let dal_shards_of ctxt level = + let cctx = Services.wrap_full ctxt in + Services.dal_shards_of cctx level let () = Protocol_hash.Table.add diff --git a/teztale/bin_teztale_archiver/protocol_machinery.ml b/teztale/bin_teztale_archiver/protocol_machinery.ml index fd98bcb031fa..6743947fdded 100644 --- a/teztale/bin_teztale_archiver/protocol_machinery.ml +++ b/teztale/bin_teztale_archiver/protocol_machinery.ml @@ -41,6 +41,10 @@ module type PROTOCOL_SERVICES = sig (Tezos_crypto.Signature.public_key_hash * Data.baking_right list) tzresult Lwt.t + (* dal_shards_of _ level *) + val dal_shards_of : + wrap_full -> Int32.t -> Data.Dal.shard_assignment list tzresult Lwt.t + val baker_and_cycle : wrap_full -> Block_hash.t -> diff --git a/teztale/bin_teztale_archiver/protocol_machinery.mli b/teztale/bin_teztale_archiver/protocol_machinery.mli index fd98bcb031fa..6743947fdded 100644 --- a/teztale/bin_teztale_archiver/protocol_machinery.mli +++ b/teztale/bin_teztale_archiver/protocol_machinery.mli @@ -41,6 +41,10 @@ module type PROTOCOL_SERVICES = sig (Tezos_crypto.Signature.public_key_hash * Data.baking_right list) tzresult Lwt.t + (* dal_shards_of _ level *) + val dal_shards_of : + wrap_full -> Int32.t -> Data.Dal.shard_assignment list tzresult Lwt.t + val baker_and_cycle : wrap_full -> Block_hash.t -> -- GitLab From d0cba48ce5d60f96c969d75ac1d5ff8b506d5431 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 26 Nov 2025 17:09:37 +0000 Subject: [PATCH 8/9] Teztale: Json_archiver: Store DAL shard assignments per level --- teztale/bin_teztale_archiver/json_archiver.ml | 66 ++++++++++++++++++- .../bin_teztale_archiver/json_archiver.mli | 1 + 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/teztale/bin_teztale_archiver/json_archiver.ml b/teztale/bin_teztale_archiver/json_archiver.ml index 47218b92e001..ac1779a3c681 100644 --- a/teztale/bin_teztale_archiver/json_archiver.ml +++ b/teztale/bin_teztale_archiver/json_archiver.ml @@ -459,6 +459,65 @@ let dump_received logger path ?unaccurate level received_ops = err) ; Lwt.return_unit +let dump_dal_shards logger prefix level shard_assignments = + let filename = filename_of_level prefix level in + let mutex = get_file_mutex filename in + let*! out = + Lwt_mutex.with_lock mutex (fun () -> + let* infos = + load filename level_file_content_encoding level_file_content_empty + in + let delegate_operations = + List.fold_left + (fun acc Data.Dal.{delegate; assigned_shard_indices} -> + if assigned_shard_indices = [] then acc + else + let updated, rest = + List.partition + (fun Data.Delegate_operations.{delegate = d; _} -> + Tezos_crypto.Signature.Public_key_hash.equal d delegate) + acc + in + match updated with + | [ + ({Data.Delegate_operations.assigned_shard_indices = prev; _} as + dops); + ] -> + let merged = + List.sort_uniq compare (assigned_shard_indices @ prev) + in + {dops with assigned_shard_indices = merged} :: rest + | [] -> + let dummy = + Data.Delegate_operations. + { + delegate; + first_slot = 0; + attesting_power = 0; + operations = []; + assigned_shard_indices; + } + in + dummy :: acc + | _ -> assert false) + infos.delegate_operations + shard_assignments + in + let out_infos = {infos with delegate_operations} in + write filename level_file_content_encoding out_infos) + in + let () = drop_file_mutex filename in + match out with + | Ok () -> Lwt.return_unit + | Error err -> + Log.error logger (fun () -> + Format.asprintf + "@[Failed to dump DAL shards at level %li :@ @[%a@]@]" + level + Error_monad.pp_print_trace + err) ; + Lwt.return_unit + type chunk = | Block of Int32.t @@ -472,6 +531,7 @@ type chunk = * Consensus_ops.block_op list * Data.baking_right list | Mempool of bool option * Int32.t (* level *) * Consensus_ops.delegate_ops + | Dal_shards of Int32.t (* level *) * Data.Dal.shard_assignment list let chunk_stream, chunk_feeder = Lwt_stream.create () @@ -504,6 +564,8 @@ let dump prefix chunk = baking_rights | Mempool (unaccurate, level, items) -> dump_received logger prefix ?unaccurate level items + | Dal_shards (level, shard_assignments) -> + dump_dal_shards logger prefix level shard_assignments let launch _cctxt prefix = Lwt_stream.iter_p (dump prefix) chunk_stream @@ -530,5 +592,5 @@ let add_block ~level (block, cycle_info, (endos, preendos), baking_rights) = (* not used *) let add_rights ~level:_ _rights = () -(* not used *) -let add_dal_shards ~level:_ _shard_assignments = () +let add_dal_shards ~level shard_assignments = + chunk_feeder (Some (Dal_shards (level, shard_assignments))) diff --git a/teztale/bin_teztale_archiver/json_archiver.mli b/teztale/bin_teztale_archiver/json_archiver.mli index 88be73d9c3a9..d197fa23b841 100644 --- a/teztale/bin_teztale_archiver/json_archiver.mli +++ b/teztale/bin_teztale_archiver/json_archiver.mli @@ -20,6 +20,7 @@ type chunk = * Consensus_ops.block_op list * Data.baking_right list | Mempool of bool option * Int32.t (* level *) * Consensus_ops.delegate_ops + | Dal_shards of Int32.t (* level *) * Data.Dal.shard_assignment list val dump : string -> chunk -> unit Lwt.t -- GitLab From 1481d856cd786cbda98b5264ef89c69e0813fc1f Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 26 Nov 2025 17:09:59 +0000 Subject: [PATCH 9/9] Teztale: Converter: Send DAL shard assignments to server --- teztale/bin_teztale_archiver/converter.ml | 41 +++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/teztale/bin_teztale_archiver/converter.ml b/teztale/bin_teztale_archiver/converter.ml index a8d21ac288d3..0c52ef86c57b 100644 --- a/teztale/bin_teztale_archiver/converter.ml +++ b/teztale/bin_teztale_archiver/converter.ml @@ -170,6 +170,44 @@ let to_blocks ctx endpoint auth level pred_ops_map ops_map data = out) bodies) +let to_dal_shards ctx endpoint auth level data = + let shard_assignments = + data.Json_archiver.delegate_operations + |> List.filter (fun Data.Delegate_operations.{assigned_shard_indices; _} -> + assigned_shard_indices <> []) + |> List.map + (fun Data.Delegate_operations.{delegate; assigned_shard_indices; _} -> + Data.Dal.{delegate; assigned_shard_indices}) + in + if shard_assignments = [] then Lwt.return_unit + else + let body = + `String + (Ezjsonm.value_to_string + (Data_encoding.Json.construct + Data.Dal.shard_assignments_encoding + shard_assignments)) + in + let headers = + Cohttp.Header.init_with "content-type" "application/json; charset=UTF-8" + in + let headers = Cohttp.Header.add_authorization headers (`Basic auth) in + let*! resp, out = + Cohttp_lwt_unix.Client.post + ~ctx + ~body + ~headers + (Uri.with_path + endpoint + (Uri.path endpoint ^ "/" ^ level ^ "/dal_shards")) + in + let*! out = Cohttp_lwt.Body.to_string out in + Lwt_io.printlf + "%s (dal_shards): %s: %s" + level + (Cohttp.Code.string_of_status resp.status) + out + let poor_man_lexical_sort x y = if String.length x = String.length y then String.compare x y else Int.compare (String.length x) (String.length y) @@ -229,6 +267,9 @@ let main source password endpoint prefix = out data in + let*! _ = + to_dal_shards ctx endpoint (source, password) level data + in Lwt.return out else Lwt.return out | Error err -> -- GitLab