diff --git a/src/bin_dal_node/RPC_server.ml b/src/bin_dal_node/RPC_server.ml index 75bd288a52bfd3d9c20795332ebd499686d87575..9e32cca342d3071d54e170cd4f9ab9401fef07b2 100644 --- a/src/bin_dal_node/RPC_server.ml +++ b/src/bin_dal_node/RPC_server.ml @@ -259,6 +259,12 @@ module Slots_handlers = struct let slot_id : Types.slot_id = {slot_level; slot_index} in Slot_manager.get_slot_shard store slot_id shard_index) + let get_slot_shard_with_proof ctxt slot_level slot_index shard_index () () = + call_handler1 (fun () -> + let store = Node_context.get_store ctxt in + let slot_id : Types.slot_id = {slot_level; slot_index} in + Slot_manager.get_slot_shard_with_proof store slot_id shard_index) + let get_slot_pages ctxt slot_level slot_index () () = call_handler1 (fun () -> let slot_id : Types.slot_id = {slot_level; slot_index} in @@ -478,6 +484,79 @@ module Profile_handlers = struct store proto_parameters ~attested_level) + + (* TODO-POC: To monitor traps we could use the stream of finalized heads given to + us by the crawler (but note that the crawler is not yet in the DAL node's + context... so something needs to be done about it) *) + let get_traps ctxt attested_level slot_index () () = + let open Lwt_result_syntax in + let proto_parameters = Node_context.get_proto_parameters ctxt in + let number_of_shards = + proto_parameters.Dal_plugin.cryptobox_parameters.number_of_shards + in + let traps_fraction = proto_parameters.traps_fraction in + let attestation_lag = Int32.of_int proto_parameters.attestation_lag in + let attestation_level = Int32.pred attested_level in + call_handler1 (fun () -> + let store = Node_context.get_store ctxt in + let published_level = + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4612 + Correctly compute [published_level] in case of protocol changes, in + particular a change of the value of [attestation_lag]. *) + Int32.sub attested_level attestation_lag + in + let slot_id = + Types.Slot_id.{slot_level = published_level; slot_index} + in + let shards = + Store.Shards.read_all (Store.shards store) slot_id ~number_of_shards + |> Seq_s.filter_map (function + | _, index, Ok share -> Some Cryptobox.{index; share} + | _ -> None) + in + let* committee = + Node_context.fetch_committee ctxt ~level:attestation_level + |> Errors.other_lwt_result + in + let module Index_map = Map.Make (Int) in + let shard_indexes_map = + Signature.Public_key_hash.Map.fold + (fun pkh shard_indexes acc -> + List.fold_right + (fun index acc' -> Index_map.add index pkh acc') + shard_indexes + acc) + committee + Index_map.empty + in + let* traps = + Seq_es.ES.fold_left + (fun acc Cryptobox.{index; share} -> + match Index_map.find_opt index shard_indexes_map with + | None -> (* unreachable *) return acc + | Some delegate -> ( + let trap_res = + Trap.share_is_trap delegate share ~traps_fraction + in + match trap_res with + | Ok trap -> + if trap then + let* shard_with_proof = + Slot_manager.get_slot_shard_with_proof + store + slot_id + index + in + return @@ (shard_with_proof :: acc) + else return acc + | Error _ -> + (* TODO: emit warning *) + (* do not accuse if we're not sure *) + return acc)) + [] + (Seq_es.of_seqs shards) + in + return traps) end let version ctxt () () = @@ -627,6 +706,10 @@ let register : Tezos_rpc.Directory.opt_register2 Services.get_attestable_slots (Profile_handlers.get_attestable_slots ctxt) + |> add_service + Tezos_rpc.Directory.opt_register2 + Services.get_traps + (Profile_handlers.get_traps ctxt) |> add_service Tezos_rpc.Directory.opt_register2 Services.get_slot_pages diff --git a/src/bin_dal_node/slot_manager.ml b/src/bin_dal_node/slot_manager.ml index 375804771bc710345bdf0164c79bb2fd3f109215..a61267ce53de44dede29b502cbd9ec30df61ee7c 100644 --- a/src/bin_dal_node/slot_manager.ml +++ b/src/bin_dal_node/slot_manager.ml @@ -351,6 +351,25 @@ let get_slot_status ~slot_id node_store = let get_slot_shard (store : Store.t) (slot_id : Types.slot_id) shard_index = Store.Shards.read (Store.shards store) slot_id shard_index +let get_slot_shard_with_proof (store : Store.t) (slot_id : Types.slot_id) + shard_index = + let open Lwt_result_syntax in + let* shard = Store.Shards.read (Store.shards store) slot_id shard_index in + let commitment_opt = + Store.Slot_id_cache.find_opt (Store.finalized_commitments store) slot_id + in + match commitment_opt with + | None -> Lwt.return (Error `Not_found) + | Some commitment -> ( + let res = + Store.Commitment_indexed_cache.find_opt (Store.cache store) commitment + in + match res with + | None -> Lwt.return (Error `Not_found) + | Some (_slot, _shards, shard_proofs) -> + let proof = Array.get shard_proofs shard_index in + return (shard, proof)) + let get_slot_pages ~reconstruct_if_missing node_context slot_id = let open Lwt_result_syntax in let proto_parameters = Node_context.get_proto_parameters node_context in diff --git a/src/bin_dal_node/slot_manager.mli b/src/bin_dal_node/slot_manager.mli index bf6dae168fc440139fadd0572d7f4bac7227feb3..ec3b75fcefdab2557aa489bc5eec6079f63eea2d 100644 --- a/src/bin_dal_node/slot_manager.mli +++ b/src/bin_dal_node/slot_manager.mli @@ -197,3 +197,14 @@ val get_slot_shard : Types.slot_id -> Types.shard_index -> (Cryptobox.shard, [Errors.other | Errors.not_found]) result Lwt.t + +(** [get_slot_shard_with_proof store slot_id shard_index] returns the shard at + index [shard_index] of the slot given by [slot_id], and its proof. *) +val get_slot_shard_with_proof : + Store.t -> + Types.slot_id -> + Types.shard_index -> + ( Cryptobox.shard * Cryptobox.shard_proof, + [Errors.other | Errors.not_found] ) + result + Lwt.t diff --git a/src/lib_dal_node_services/services.ml b/src/lib_dal_node_services/services.ml index 5fe039ec51e1a31b6b3ba423ca2ba5624d0b97f8..04753be15ce2fafc20e52fe41e734b57381e078c 100644 --- a/src/lib_dal_node_services/services.ml +++ b/src/lib_dal_node_services/services.ml @@ -250,6 +250,32 @@ let get_attestable_slots : open_root / "profiles" /: Signature.Public_key_hash.rpc_arg / "attested_levels" /: Tezos_rpc.Arg.int32 / "attestable_slots") +let get_traps : + < meth : [`GET] + ; input : unit + ; output : + (Tezos_crypto_dal.Cryptobox.shard + * Tezos_crypto_dal.Cryptobox.shard_proof) + list + ; prefix : unit + ; params : (unit * level) * slot_index + ; query : unit > + service = + Tezos_rpc.Service.get_service + ~description: + "Get all the shards that are traps for a given slot index at the given \ + attested level." + ~query:Tezos_rpc.Query.empty + ~output: + Data_encoding.( + list + (obj2 + (req "shard" Cryptobox.shard_encoding) + (req "proof" Cryptobox.shard_proof_encoding))) + Tezos_rpc.Path.( + open_root / "attested_levels" /: Tezos_rpc.Arg.int32 / "slots" + /: Tezos_rpc.Arg.int / "traps") + let get_slot_shard : < meth : [`GET] ; input : unit diff --git a/src/lib_dal_node_services/services.mli b/src/lib_dal_node_services/services.mli index 3341780347abf641078a2afb8ef18fbb57e56d67..663940918e2edcdb5c2f27dded20e35c8fd4be4d 100644 --- a/src/lib_dal_node_services/services.mli +++ b/src/lib_dal_node_services/services.mli @@ -171,6 +171,21 @@ val get_attestable_slots : ; query : unit > service +(** For a given delegate (identified by its public key hash), attested level, + and slot index, return all the shards that are traps, together with the + associated shard proof. *) +val get_traps : + < meth : [`GET] + ; input : unit + ; output : + (Tezos_crypto_dal.Cryptobox.shard + * Tezos_crypto_dal.Cryptobox.shard_proof) + list + ; prefix : unit + ; params : (unit * Types.level) * Types.slot_index + ; query : unit > + service + (** Return the shard associated to the given index. *) val get_slot_shard : < meth : [`GET] diff --git a/src/proto_alpha/lib_delegate/baking_commands.ml b/src/proto_alpha/lib_delegate/baking_commands.ml index 4b9a271bbc0656ad3d2c8026fac2253453403015..7c91b2cd0b58d295094eb1828a66a45c7e35262a 100644 --- a/src/proto_alpha/lib_delegate/baking_commands.ml +++ b/src/proto_alpha/lib_delegate/baking_commands.ml @@ -355,9 +355,16 @@ let dal_node_endpoint_arg = Tezos_clic.arg ~long:"dal-node" ~placeholder:"uri" - ~doc:"endpoint of the DAL node, e.g. 'http://localhost:8933'" + ~doc:"endpoint of the DAL node, e.g. 'http://localhost:10732'" (Tezos_clic.parameter (fun _ s -> return @@ Uri.of_string s)) +let dal_slot_index_arg = + Tezos_clic.arg + ~long:"dal-slot-index" + ~placeholder:"DAL slot index" + ~doc:"the DAL slot index to track" + @@ Client_proto_args.non_negative_parameter () + let block_count_arg = Tezos_clic.default_arg ~long:"count" @@ -849,13 +856,26 @@ let accuser_commands () = command ~group ~desc:"Launch the accuser daemon" - (args3 pidfile_arg Client_proto_args.preserved_levels_arg keep_alive_arg) + (args5 + pidfile_arg + Client_proto_args.preserved_levels_arg + keep_alive_arg + dal_node_endpoint_arg + dal_slot_index_arg) (prefixes ["run"] @@ stop) - (fun (pidfile, preserved_levels, keep_alive) cctxt -> + (fun ( pidfile, + preserved_levels, + keep_alive, + dal_node_endpoint, + dal_slot_index ) + cctxt -> may_lock_pidfile pidfile @@ fun () -> Client_daemon.Accuser.run cctxt ~chain:cctxt#chain ~preserved_levels - ~keep_alive); + ~keep_alive + ?dal_node_endpoint + ?dal_slot_index + ()); ] diff --git a/src/proto_alpha/lib_delegate/client_baking_denunciation.ml b/src/proto_alpha/lib_delegate/client_baking_denunciation.ml index 49a3cb5aeafeda71f0288a1c2548b72a4fd768a0..dff4405742ec687b9cd5134f9adf4e3ce5be8384 100644 --- a/src/proto_alpha/lib_delegate/client_baking_denunciation.ml +++ b/src/proto_alpha/lib_delegate/client_baking_denunciation.ml @@ -68,6 +68,13 @@ type recorded_consensus_operations = { preattestation : Kind.preattestation recorded_consensus; } +type dal_config = { + (* the DAL node RPC context, if a DAL node endpoint was given *) + rpc_ctxt : Tezos_rpc.Context.generic; + (* the DAL node RPC context, if a DAL node endpoint was given *) + slot_index : int; (* the DAL slot index to track *) +} + type 'a state = { (* Validators rights for the last preserved levels *) validators_rights : public_key_hash Slot.Map.t Validators_cache.t; @@ -94,14 +101,43 @@ type 'a state = { mutable ops_stream : ops_stream; (* operatons stream stopper. Used when a q new *) mutable ops_stream_stopper : unit -> unit; + dal : dal_config option; } -let create_state ~preserved_levels blocks_stream ops_stream ops_stream_stopper = +let create_dal_node_rpc_ctxt endpoint = + let open Tezos_rpc_http_client_unix in + let rpc_config = + {Tezos_rpc_http_client_unix.RPC_client_unix.default_config with endpoint} + in + let media_types = + Tezos_rpc_http.Media_type.Command_line.of_command_line rpc_config.media_type + in + new RPC_client_unix.http_ctxt rpc_config media_types + +let create_state ~preserved_levels ?dal_node_endpoint ?dal_slot_index + blocks_stream ops_stream ops_stream_stopper = + let open Lwt_result_syntax in let clean_frequency = max 1 (preserved_levels / 10) in let validators_rights = Validators_cache.create (preserved_levels + 2) in (* We keep rights for [preserved_levels] in the past, and 2 levels in the future from [highest_level_encountered] *) - Lwt.return + let* dal = + match (dal_node_endpoint, dal_slot_index) with + | None, None -> return_none + | Some endpoint, Some slot_index -> + return_some {rpc_ctxt = create_dal_node_rpc_ctxt endpoint; slot_index} + | Some _, None -> + (* TODO-POC: proper error message *) + failwith + "If a DAL node endpoint is given, then a slot index needs to be \ + provided as well" + | None, Some _ -> + (* TODO-POC: proper error message *) + failwith + "If a DAL slot index is given, then DAL node endpoint needs to be \ + provided as well" + in + return { validators_rights; consensus_operations_table = HLevel.create preserved_levels; @@ -113,6 +149,7 @@ let create_state ~preserved_levels blocks_stream ops_stream ops_stream_stopper = blocks_stream; ops_stream; ops_stream_stopper; + dal; } (* We choose a previous offset (5 blocks from head) to ensure that the @@ -327,7 +364,7 @@ let process_consensus_op (type kind) state cctxt | _ -> return_unit) let process_operations (cctxt : #Protocol_client_context.full) state - (attestations : 'a list) ~packed_op chain_id = + (consensus_ops : 'a list) ~packed_op chain_id = let open Lwt_result_syntax in List.iter_es (fun op -> @@ -370,7 +407,7 @@ let process_operations (cctxt : #Protocol_client_context.full) state | _ -> (* not a consensus operation *) return_unit) - attestations + consensus_ops let context_block_header cctxt ~chain b_hash = let open Lwt_result_syntax in @@ -480,9 +517,118 @@ let cleanup_old_operations state = filter state.consensus_operations_table ; filter state.blocks_table) +let get_traps dal_node_rpc_ctxt ~attested_level ~slot_index = + Tezos_rpc.Context.make_call + Tezos_dal_node_services.Services.get_traps + dal_node_rpc_ctxt + (((), attested_level), slot_index) + () + () + +let get_shard_indexes tb_slot validators shard_assignment = + let pkh = Slot.Map.find tb_slot validators |> Stdlib.Option.get in + let shard_indexes_opt = + List.find_opt + (fun Plugin.RPC.Dal.S.{delegate; indexes = _} -> + Signature.Public_key_hash.equal pkh delegate) + shard_assignment + in + match shard_indexes_opt with None -> [] | Some v -> v.indexes + +let inject_entrapment_evidence cctxt ~attested_level attestation ~slot_index + ~shard_with_proof = + let open Lwt_result_syntax in + let chain = `Main in + (* this gets the block at attested_level - 5, which is ok *) + let*! block = get_block_offset attested_level in + let* block_hash = Alpha_block_services.hash cctxt ~chain ~block () in + let* bytes = + Plugin.RPC.Forge.dal_entrapment_evidence + cctxt + (chain, block) + ~branch:block_hash + ~attestation + ~slot_index + ~shard_with_proof + in + let bytes = Signature.concat bytes Signature.zero in + (* let*! () = Events.(emit dal_entrapment_detected) () in *) + let* _op_hash = Shell_services.Injection.operation cctxt ~chain bytes in + (* let*! () = Events.(emit dal_entrapment_denounced) (op_hash, bytes) in *) + return_unit + +let dal_check_attestation cctxt ~slot_index ~attested_level traps validators + shard_assignment op = + let open Lwt_result_syntax in + let inject = inject_entrapment_evidence cctxt ~attested_level in + let Alpha_block_services.{shell; protocol_data; _} = op in + let packed_op = {shell; protocol_data} in + match packed_op.protocol_data with + | Operation_data {contents = Single (Preattestation _); _} -> return_unit + | Operation_data + ({contents = Single (Attestation {consensus_content; dal_content; _}); _} + as protocol_data) -> ( + match dal_content with + | Some {attestation} + when Dal.Attestation.is_attested attestation slot_index -> + let delegate_shard_indexes = + get_shard_indexes consensus_content.slot validators shard_assignment + (* TODO: use a set instead *) + in + List.iter_es + (fun (shard, proof) -> + let Cryptobox.{index = shard_index; share = _} = shard in + if Stdlib.List.mem shard_index delegate_shard_indexes then + let shard_with_proof = Dal.Shard_with_proof.{shard; proof} in + inject {shell; protocol_data} ~slot_index ~shard_with_proof + else return_unit) + traps + | Some _ -> return_unit + | None -> + (* TODO: emit warning? *) + return_unit) + | _ -> (* not a consensus operation *) return_unit + +let check_dal_traps state cctxt dal_node_rpc_ctxt slot_index + (block_info : Alpha_block_services.block_info) consensus_ops = + let open Lwt_result_syntax in + let level = block_info.header.shell.level in + (* TODO-POC: check that this block was not already processed! *) + let final_level = Int32.sub level 2l in + let*? attestation_level = + Raw_level.of_int32 (Int32.pred level) |> Environment.wrap_tzresult + in + let attested_level = Raw_level.succ attestation_level in + let* traps = + get_traps dal_node_rpc_ctxt ~attested_level:final_level ~slot_index + in + let* shard_assignment = + Plugin.RPC.Dal.dal_shards + cctxt + (cctxt#chain, `Head 0) + ~level:attestation_level + () + in + let* validators = get_validator_rights state cctxt attestation_level in + let number_of_slots = (* TODO: get constant for this level *) 32 in + let*? slot_index = + Dal.Slot_index.of_int ~number_of_slots slot_index + |> Environment.wrap_tzresult + in + List.iter_es + (dal_check_attestation + cctxt + ~slot_index + ~attested_level + traps + validators + shard_assignment) + consensus_ops + (* Each new block is processed : - Check that every baker injected a proposal only once at the block's level and round - Check that every baker (pre)attested only once at the block's level and round + - Check that bakers correctly identify DAL traps *) let process_new_block (cctxt : #Protocol_client_context.full) state {hash; chain_id; level; protocol; next_protocol; _} = @@ -508,7 +654,20 @@ let process_new_block (cctxt : #Protocol_client_context.full) state let packed_op {Alpha_block_services.shell; protocol_data; _} = {shell; protocol_data} in - process_operations cctxt state consensus_ops ~packed_op chain_id + let* () = + process_operations cctxt state consensus_ops ~packed_op chain_id + in + (* Check the DAL traps *) + Option.iter_es + (fun dal -> + check_dal_traps + state + cctxt + dal.rpc_ctxt + dal.slot_index + block_info + consensus_ops) + state.dal | _ -> (* Should not happen as a block should contain 4 lists of operations, the first list being dedicated to consensus @@ -551,14 +710,16 @@ let start_ops_monitor cctxt = ~outdated:false () -let create (cctxt : #Protocol_client_context.full) ?canceler ~preserved_levels - valid_blocks_stream = +let create (cctxt : #Protocol_client_context.full) ?canceler ?dal_node_endpoint + ?dal_slot_index ~preserved_levels valid_blocks_stream = let open Lwt_result_syntax in let*! () = B_Events.(emit daemon_setup) name in let* ops_stream, ops_stream_stopper = start_ops_monitor cctxt in - let*! state = + let* state = create_state ~preserved_levels + ?dal_node_endpoint + ?dal_slot_index valid_blocks_stream ops_stream ops_stream_stopper diff --git a/src/proto_alpha/lib_delegate/client_baking_denunciation.mli b/src/proto_alpha/lib_delegate/client_baking_denunciation.mli index 46e784d132b49cef5f5e031d1d41bc55161c493a..65fafefd5752bb2914d5a4d2229f6276e61c9cf1 100644 --- a/src/proto_alpha/lib_delegate/client_baking_denunciation.mli +++ b/src/proto_alpha/lib_delegate/client_baking_denunciation.mli @@ -26,6 +26,8 @@ val create : #Protocol_client_context.full -> ?canceler:Lwt_canceler.t -> + ?dal_node_endpoint:Uri.t -> + ?dal_slot_index:int -> preserved_levels:int -> Client_baking_blocks.block_info tzresult Lwt_stream.t -> unit tzresult Lwt.t diff --git a/src/proto_alpha/lib_delegate/client_daemon.ml b/src/proto_alpha/lib_delegate/client_daemon.ml index f63ab6e2db236ae991f557e017ddcaf6f6326a9f..52eac647d096e6a8ac0f845b47b83e64af1562ba 100644 --- a/src/proto_alpha/lib_delegate/client_daemon.ml +++ b/src/proto_alpha/lib_delegate/client_daemon.ml @@ -165,7 +165,7 @@ end module Accuser = struct let run (cctxt : #Protocol_client_context.full) ~chain ~preserved_levels - ~keep_alive = + ~keep_alive ?dal_node_endpoint ?dal_slot_index () = let open Lwt_result_syntax in let process () = let*! () = @@ -194,6 +194,8 @@ module Accuser = struct Client_baking_denunciation.create cctxt ~canceler + ?dal_node_endpoint + ?dal_slot_index ~preserved_levels valid_blocks_stream in diff --git a/src/proto_alpha/lib_delegate/client_daemon.mli b/src/proto_alpha/lib_delegate/client_daemon.mli index bd24247fda99c5201fdb1ff8a1fae0b3dffebfea..639b2105d780878d4b8f6d60e9a93333717730a2 100644 --- a/src/proto_alpha/lib_delegate/client_daemon.mli +++ b/src/proto_alpha/lib_delegate/client_daemon.mli @@ -54,6 +54,9 @@ module Accuser : sig chain:Chain_services.chain -> preserved_levels:int -> keep_alive:bool -> + ?dal_node_endpoint:Uri.t -> + ?dal_slot_index:int -> + unit -> unit tzresult Lwt.t end diff --git a/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out b/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out index 7dfc832c9ba56f127c76e5dac1216210aa3680cb..ca59c64f73d229edb5293f9f2696e51d00f61402 100644 --- a/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out +++ b/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out @@ -3,6 +3,9 @@ Available services: + - GET /attested_levels//slots//traps + Get all the shards that are traps for a given slot index at the given + attested level. - GET /health Performs health checks on the DAL node, evaluating key components of the DAL node. Returns a health status indicating whether the DAL node @@ -118,6 +121,8 @@ Dynamic parameter description: int int int32 + int + int32 Warning: Failed to acquire the protocol version from the node diff --git a/tezt/tests/expected/dal.ml/Parisc- Testing DAL node (dal node list RPCs).out b/tezt/tests/expected/dal.ml/Parisc- Testing DAL node (dal node list RPCs).out index 7dfc832c9ba56f127c76e5dac1216210aa3680cb..ca59c64f73d229edb5293f9f2696e51d00f61402 100644 --- a/tezt/tests/expected/dal.ml/Parisc- Testing DAL node (dal node list RPCs).out +++ b/tezt/tests/expected/dal.ml/Parisc- Testing DAL node (dal node list RPCs).out @@ -3,6 +3,9 @@ Available services: + - GET /attested_levels//slots//traps + Get all the shards that are traps for a given slot index at the given + attested level. - GET /health Performs health checks on the DAL node, evaluating key components of the DAL node. Returns a health status indicating whether the DAL node @@ -118,6 +121,8 @@ Dynamic parameter description: int int int32 + int + int32 Warning: Failed to acquire the protocol version from the node diff --git a/tezt/tests/expected/dal.ml/Quebec- Testing DAL node (dal node list RPCs).out b/tezt/tests/expected/dal.ml/Quebec- Testing DAL node (dal node list RPCs).out index 7dfc832c9ba56f127c76e5dac1216210aa3680cb..ca59c64f73d229edb5293f9f2696e51d00f61402 100644 --- a/tezt/tests/expected/dal.ml/Quebec- Testing DAL node (dal node list RPCs).out +++ b/tezt/tests/expected/dal.ml/Quebec- Testing DAL node (dal node list RPCs).out @@ -3,6 +3,9 @@ Available services: + - GET /attested_levels//slots//traps + Get all the shards that are traps for a given slot index at the given + attested level. - GET /health Performs health checks on the DAL node, evaluating key components of the DAL node. Returns a health status indicating whether the DAL node @@ -118,6 +121,8 @@ Dynamic parameter description: int int int32 + int + int32 Warning: Failed to acquire the protocol version from the node