diff --git a/src/bin_dal_node/RPC_server.ml b/src/bin_dal_node/RPC_server.ml index 233401b28d7dc893d467bd481ef096ceb7c58403..001b17d2930203abbf18b5ea5de3d16690531139 100644 --- a/src/bin_dal_node/RPC_server.ml +++ b/src/bin_dal_node/RPC_server.ml @@ -229,7 +229,9 @@ module Profile_handlers = struct operator_profiles with | None -> fail Errors.[Profile_incompatibility] - | Some pctxt -> return @@ Node_context.set_profile_ctxt ctxt pctxt) + | Some pctxt -> + let*! () = Node_context.set_profile_ctxt ctxt pctxt in + return_unit) let get_profiles ctxt () () = let open Lwt_result_syntax in diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index d55a64d4279a8f927d36172ac8a5cd5e0fd8c38e..9bb4709445b34fdb57718d46ae3102a13155cd7d 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -236,6 +236,40 @@ module Handler = struct the message revalidated? *) other + (* Set the profile context once we have the protocol plugin. This is supposed + to be called only once. *) + let set_profile_context ctxt config proto_parameters = + let open Lwt_result_syntax in + let*! pctxt_opt = Node_context.load_profile_ctxt ctxt in + let pctxt_opt = + match pctxt_opt with + | None -> + Profile_manager.add_profiles + Profile_manager.empty + proto_parameters + (Node_context.get_gs_worker ctxt) + config.Configuration_file.profiles + | Some pctxt -> + let profiles = Profile_manager.get_profiles pctxt in + (* The profiles from the loaded context are prioritized over the + profiles provided in the config file. *) + let merged_profiles = + Types.merge_profiles + ~lower_prio:config.Configuration_file.profiles + ~higher_prio:profiles + in + Profile_manager.add_profiles + Profile_manager.empty + proto_parameters + (Node_context.get_gs_worker ctxt) + merged_profiles + in + match pctxt_opt with + | None -> fail Errors.[Profile_incompatibility] + | Some pctxt -> + let*! () = Node_context.set_profile_ctxt ctxt pctxt in + return_unit + let resolve_plugin_and_set_ready config dal_config ctxt cctxt = (* Monitor heads and try resolve the DAL protocol plugin corresponding to the protocol of the targeted node. *) @@ -259,37 +293,7 @@ module Handler = struct in Store.Value_size_hooks.set_share_size (Cryptobox.Internal_for_tests.encoded_share_size cryptobox) ; - let* () = - let* pctxt = - let pctxt = Node_context.get_profile_ctxt ctxt in - match config.Configuration_file.profiles with - | Bootstrap -> return @@ Profile_manager.bootstrap_profile - | Random_observer -> ( - let slot_index = - Random.int proto_parameters.number_of_slots - in - match - Profile_manager.add_operator_profiles - pctxt - proto_parameters - (Node_context.get_gs_worker ctxt) - [Observer {slot_index}] - with - | None -> fail Errors.[Profile_incompatibility] - | Some pctxt -> return pctxt) - | Operator operator_profiles -> ( - match - Profile_manager.add_operator_profiles - pctxt - proto_parameters - (Node_context.get_gs_worker ctxt) - operator_profiles - with - | None -> fail Errors.[Profile_incompatibility] - | Some pctxt -> return pctxt) - in - return @@ Node_context.set_profile_ctxt ctxt pctxt - in + let* () = set_profile_context ctxt config proto_parameters in let*? () = Node_context.set_ready ctxt @@ -565,31 +569,6 @@ let resolve points = ~default_port:(Configuration_file.default.listen_addr |> snd)) points -(* This function ensures the persistence of attester profiles - to the configuration file at shutdown. - - This is especially important for attesters as the pkh they track - is supplied by the baker through `PATCH /profile` API calls. - As these profiles are added dynamically at run time, they do not exist - in the initial configuration file. Therefore, in order to retain these - profiles between restarts, we store them to the configuration file at shutdown. *) -let store_profiles_finalizer ctxt data_dir = - let open Lwt_syntax in - Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun _exit_status -> - let profiles = - Profile_manager.get_profiles (Node_context.get_profile_ctxt ctxt) - in - let* r = Configuration_file.load ~data_dir in - match r with - | Ok config -> ( - let* r = Configuration_file.save {config with profiles} in - match r with - | Ok () -> return_unit - | Error e -> Event.(emit failed_to_persist_profiles (profiles, e))) - | Error e -> - let* () = Event.(emit failed_to_persist_profiles (profiles, e)) in - return_unit - (* FIXME: https://gitlab.com/tezos/tezos/-/issues/3605 Improve general architecture, handle L1 disconnection etc *) @@ -710,9 +689,6 @@ let run ~data_dir configuration_override = cctxt metrics_server in - let (_ : Lwt_exit.clean_up_callback_id) = - store_profiles_finalizer ctxt data_dir - in let* rpc_server = RPC_server.(start config ctxt) in connect_gossipsub_with_p2p gs_worker transport_layer store ; (* activate the p2p instance. *) diff --git a/src/bin_dal_node/event.ml b/src/bin_dal_node/event.ml index 97e8d40bc76ad69babd588808d1a1ec510682eb7..91512a881d3fe8b0de8e19bcc5cd62415e76e4f2 100644 --- a/src/bin_dal_node/event.ml +++ b/src/bin_dal_node/event.ml @@ -238,3 +238,19 @@ let metrics_server_is_ready = ~level:Notice ("host", Data_encoding.string) ("port", Data_encoding.uint16) + +let loading_profiles_failed = + declare_1 + ~section + ~name:"loading_profiles_failed" + ~msg:"loading profiles failed: {error}" + ~level:Error + ("error", Error_monad.trace_encoding) + +let saving_profiles_failed = + declare_1 + ~section + ~name:"saving_profiles_failed" + ~msg:"saving profiles failed: {error}" + ~level:Error + ("error", Error_monad.trace_encoding) diff --git a/src/bin_dal_node/main.ml b/src/bin_dal_node/main.ml index 0229ebce297ddda998655e166e94a32580698c8c..afb3a78fdcf15f3560c1cc803c70376d3189d3c0 100644 --- a/src/bin_dal_node/main.ml +++ b/src/bin_dal_node/main.ml @@ -23,24 +23,6 @@ (* *) (*****************************************************************************) -(* TODO: https://gitlab.com/tezos/tezos/-/issues/6110 - Improve profile configuration UX for when we have conflicting CLI and config file. *) -let merge_profiles ~from_config_file ~from_cli = - let open Types in - match from_cli with - | None -> from_config_file - | Some from_cli -> ( - (* Note that the profile from the CLI is prioritized over - the profile provided from the config file. *) - match (from_config_file, from_cli) with - | Bootstrap, Bootstrap -> Bootstrap - | Operator _, Bootstrap -> Bootstrap - | Bootstrap, Operator op -> Operator op - | Operator op1, Operator op2 -> Operator (op1 @ op2) - | Random_observer, Random_observer -> Random_observer - | Random_observer, ((Operator _ | Bootstrap) as profile) -> profile - | (Operator _ | Bootstrap), Random_observer -> Random_observer) - let merge Cli. { @@ -54,24 +36,30 @@ let merge profiles; peers; } configuration = - Configuration_file. - { - configuration with - data_dir = Option.value ~default:configuration.data_dir data_dir; - rpc_addr = Option.value ~default:configuration.rpc_addr rpc_addr; - listen_addr = Option.value ~default:configuration.listen_addr listen_addr; - public_addr = Option.value ~default:configuration.public_addr public_addr; - expected_pow = - Option.value ~default:configuration.expected_pow expected_pow; - endpoint = Option.value ~default:configuration.endpoint endpoint; - profiles = - merge_profiles - ~from_cli:profiles - ~from_config_file:configuration.profiles; - metrics_addr = - Option.value ~default:configuration.metrics_addr metrics_addr; - peers = peers @ configuration.peers; - } + let profiles = + match profiles with + | None -> configuration.Configuration_file.profiles + | Some from_cli -> + (* Note that the profile from the CLI is prioritized over + the profile provided in the config file. *) + (* TODO: https://gitlab.com/tezos/tezos/-/issues/6110 + Improve profile configuration UX for when we have conflicting CLI and config file. *) + Types.merge_profiles + ~lower_prio:configuration.profiles + ~higher_prio:from_cli + in + { + configuration with + data_dir = Option.value ~default:configuration.data_dir data_dir; + rpc_addr = Option.value ~default:configuration.rpc_addr rpc_addr; + listen_addr = Option.value ~default:configuration.listen_addr listen_addr; + public_addr = Option.value ~default:configuration.public_addr public_addr; + expected_pow = Option.value ~default:configuration.expected_pow expected_pow; + endpoint = Option.value ~default:configuration.endpoint endpoint; + profiles; + metrics_addr = Option.value ~default:configuration.metrics_addr metrics_addr; + peers = peers @ configuration.peers; + } let wrap_with_error main_promise = let open Lwt_syntax in diff --git a/src/bin_dal_node/node_context.ml b/src/bin_dal_node/node_context.ml index 8f71c7aed40c9a7bdc135ad88be024f71f244dd1..d696185346a739d50b8dba57db9e545270594b36 100644 --- a/src/bin_dal_node/node_context.ml +++ b/src/bin_dal_node/node_context.ml @@ -134,7 +134,26 @@ let update_last_processed_level ctxt ~level = let get_profile_ctxt ctxt = ctxt.profile_ctxt -let set_profile_ctxt ctxt pctxt = ctxt.profile_ctxt <- pctxt +let load_profile_ctxt ctxt = + let open Lwt_syntax in + let base_dir = Configuration_file.store_path ctxt.config in + let* res = Profile_manager.load_profile_ctxt ~base_dir in + match res with + | Ok pctxt -> return_some pctxt + | Error err -> + let* () = Event.(emit loading_profiles_failed err) in + return_none + +let set_profile_ctxt ctxt ?(save = true) pctxt = + let open Lwt_syntax in + ctxt.profile_ctxt <- pctxt ; + if save then + let base_dir = Configuration_file.store_path ctxt.config in + let* res = Profile_manager.save_profile_ctxt ctxt.profile_ctxt ~base_dir in + match res with + | Ok () -> return_unit + | Error err -> Event.(emit saving_profiles_failed err) + else return_unit let get_config ctxt = ctxt.config diff --git a/src/bin_dal_node/node_context.mli b/src/bin_dal_node/node_context.mli index fb16089a43df8dca356d38076396f84dadbc1be9..e1b5b2231ecf4c3fece5455a6bbf0e286e4772f1 100644 --- a/src/bin_dal_node/node_context.mli +++ b/src/bin_dal_node/node_context.mli @@ -95,8 +95,13 @@ val get_ready : t -> ready_ctxt tzresult (** [get_profile_ctxt ctxt] returns the profile context. *) val get_profile_ctxt : t -> Profile_manager.t -(** [set_profile_ctxt ctxt pctxt] sets the profile context. *) -val set_profile_ctxt : t -> Profile_manager.t -> unit +(** [load_profile_ctxt ctxt] tries to load the profile context from disk. *) +val load_profile_ctxt : t -> Profile_manager.t option Lwt.t + +(** [set_profile_ctxt ctxt ?save pctxt] sets the profile context. If [save] is + set, which is [true] by default, the profile context is saved on + disk. *) +val set_profile_ctxt : t -> ?save:bool -> Profile_manager.t -> unit Lwt.t (** [get_config ctxt] returns the dal node configuration *) val get_config : t -> Configuration_file.t diff --git a/src/bin_dal_node/profile_manager.ml b/src/bin_dal_node/profile_manager.ml index cc7c22abb689f585616b264d974c4ba771423c41..e8bfbc1211c1d249799288fae5670c814d49eb76 100644 --- a/src/bin_dal_node/profile_manager.ml +++ b/src/bin_dal_node/profile_manager.ml @@ -28,8 +28,8 @@ module Slot_set = Set.Make (Int) module Pkh_set = Signature.Public_key_hash.Set (** An operator DAL node can play three different roles: - - attester for some pkh: checks that the shards assigned to this pkh are published - - slot producer for some slot index: splits slots into shards and publishes the shards + - attester for some pkh: checks that the shards assigned to this pkh are published + - slot producer for some slot index: splits slots into shards and publishes the shards - slot observer for some slot index: collects the shards corresponding to some slot index, reconstructs slots when enough shards are seen, and republishes missing shards. @@ -42,9 +42,51 @@ type operator_sets = { observers : Slot_set.t; } +(* TODO: https://gitlab.com/tezos/tezos/-/issues/6958 + Unify this type with the one from `src/lib_dal_node_services/types.ml` *) + (** A profile context stores profile-specific data used by the daemon. *) type t = Bootstrap | Operator of operator_sets +let operator_sets_encoding = + let open Data_encoding in + conv + (fun {producers; observers; attesters} -> + ( Slot_set.elements producers, + Slot_set.elements observers, + Pkh_set.elements attesters )) + (fun (producers, observers, attesters) -> + { + producers = Slot_set.of_list producers; + observers = Slot_set.of_list observers; + attesters = Pkh_set.of_list attesters; + }) + (obj3 + (req "producers" (list int16)) + (req "observers" (list int16)) + (req "attesters" (list Signature.Public_key_hash.encoding))) + +let encoding = + let open Data_encoding in + union + ~tag_size:`Uint8 + [ + case + ~title:"Bootstrap" + (Tag 0) + (obj1 (req "kind" (constant "bootstrap"))) + (function Bootstrap -> Some () | _ -> None) + (fun () -> Bootstrap); + case + ~title:"Operator" + (Tag 1) + (obj2 + (req "kind" (constant "operator")) + (req "operator_profiles" operator_sets_encoding)) + (function Operator s -> Some ((), s) | _ -> None) + (function (), s -> Operator s); + ] + let empty = Operator { @@ -53,6 +95,13 @@ let empty = observers = Slot_set.empty; } +let is_empty = function + | Bootstrap -> false + | Operator {producers; attesters; observers} -> + Slot_set.is_empty producers + && Pkh_set.is_empty attesters + && Slot_set.is_empty observers + let is_bootstrap_profile = function Bootstrap -> true | Operator _ -> false let bootstrap_profile = Bootstrap @@ -100,6 +149,14 @@ let add_operator_profiles t proto_parameters gs_worker in Some (Operator operator_sets) +let add_profiles t proto_parameters gs_worker = function + | Types.Bootstrap -> if is_empty t then Some bootstrap_profile else None + | Random_observer -> + let slot_index = Random.int proto_parameters.Dal_plugin.number_of_slots in + add_operator_profiles t proto_parameters gs_worker [Observer {slot_index}] + | Operator operator_profiles -> + add_operator_profiles t proto_parameters gs_worker operator_profiles + let validate_slot_indexes t ~number_of_slots = let open Result_syntax in match t with @@ -206,3 +263,47 @@ let get_attestable_slots ~shard_indices store proto_parameters ~attested_level = in let* flags = List.map_es are_shards_stored all_slot_indexes in return (Types.Attestable_slots {slots = flags; published_level}) + +let profiles_filename = "profiles.json" + +(* TODO https://gitlab.com/tezos/tezos/-/issues/7033 + One could use a promise to wait the completion of the previous operation, if + any. *) +let lock = Lwt_mutex.create () + +let load_profile_ctxt ~base_dir = + let open Lwt_result_syntax in + Lwt_mutex.with_lock lock @@ fun () -> + Lwt.catch + (fun () -> + let file_path = Filename.concat base_dir profiles_filename in + let*! ic = Lwt_io.open_file ~mode:Lwt_io.Input file_path in + let*! json_str = Lwt_io.read ic in + let*! () = Lwt_io.close ic in + match Data_encoding.Json.from_string json_str with + | Error _ -> + failwith "DAL node. Failed to load profile, error parsing JSON value" + | Ok json -> Data_encoding.Json.destruct encoding json |> return) + (fun exn -> + failwith + "DAL node: failed to load the profile context. Exception: %s" + (Printexc.to_string exn)) + +let save_profile_ctxt ctxt ~base_dir = + let open Lwt_result_syntax in + Lwt_mutex.with_lock lock @@ fun () -> + Lwt.catch + (fun () -> + let value = + Data_encoding.Json.construct encoding ctxt + |> Data_encoding.Json.to_string + in + let file_path = Filename.concat base_dir profiles_filename in + let*! oc = Lwt_io.open_file ~mode:Lwt_io.Output file_path in + let*! () = Lwt_io.write_line oc value in + let*! () = Lwt_io.close oc in + return_unit) + (fun exn -> + failwith + "DAL node: failed to save the profile context. Exception: %s" + (Printexc.to_string exn)) diff --git a/src/bin_dal_node/profile_manager.mli b/src/bin_dal_node/profile_manager.mli index 3fca1c7f2505de476cfaceb53027d62aecbd718b..87ab8d88becb3544eab5a4b1a0ceeaeb82f88309 100644 --- a/src/bin_dal_node/profile_manager.mli +++ b/src/bin_dal_node/profile_manager.mli @@ -49,6 +49,16 @@ val add_operator_profiles : Types.operator_profiles -> t option +(** [add_profiles t proto_parameters gs_worker profiles] registers [profiles]. + If the current profiles are incompatible with provided [profiles], it + returns [None]. *) +val add_profiles : + t -> + Dal_plugin.proto_parameters -> + Gossipsub.Worker.t -> + Types.profiles -> + t option + (** Checks that each producer profile only refers to slot indexes strictly smaller than [number_of_slots]. This may not be the case when the profile context is first built because there is no information about the number of @@ -74,3 +84,13 @@ val get_attestable_slots : Dal_plugin.proto_parameters -> attested_level:int32 -> (Types.attestable_slots, [Errors.decoding | Errors.other]) result Lwt.t + +(** Load the profile context from disk. The file where the context is loaded + from is relative to the given [base_dir]. An error is returned in case of an + IO failure or an ill formatted file. *) +val load_profile_ctxt : base_dir:string -> t tzresult Lwt.t + +(** Save the profile context to disk. The file where the context is saved is + relative to the given [base_dir]. An error is returned in case of an + IO failure. *) +val save_profile_ctxt : t -> base_dir:string -> unit tzresult Lwt.t diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index 3b2cf4a15825874e3db1a66134c02885eb12522e..47f1ce7311c55d92dbe2cd5f86d7026e66ffd6d8 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -303,6 +303,16 @@ type operator_profiles = operator_profile list type profiles = Bootstrap | Operator of operator_profiles | Random_observer +let merge_profiles ~lower_prio ~higher_prio = + match (lower_prio, higher_prio) with + | Bootstrap, Bootstrap -> Bootstrap + | Operator _, Bootstrap -> Bootstrap + | Bootstrap, Operator op -> Operator op + | Operator op1, Operator op2 -> Operator (op1 @ op2) + | Random_observer, Random_observer -> Random_observer + | Random_observer, ((Operator _ | Bootstrap) as profile) -> profile + | (Operator _ | Bootstrap), Random_observer -> Random_observer + type with_proof = {with_proof : bool} (* Auxiliary functions. *) diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index f43443ccaf7e5d690d998edd19c562090b14e5ec..df85400f7b7ed84901a4dbc23023a4ed335fc405 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -237,6 +237,9 @@ type operator_profile = provided by the user in unprocessed form. *) type operator_profiles = operator_profile list +(* TODO: https://gitlab.com/tezos/tezos/-/issues/6958 + Unify the {profiles} type with the one from `src/bin_dal_node/profile_manager.ml` *) + (** DAL node can track one or many profiles that correspond to various modes that the DAL node would operate in. *) type profiles = @@ -248,6 +251,11 @@ type profiles = | Operator of operator_profiles | Random_observer +(* Merge the two sets of profiles. In case of incompatibility (that is, case + [Bootstrap] vs the other kinds), the profiles from [higher_prio] take + priority. *) +val merge_profiles : lower_prio:profiles -> higher_prio:profiles -> profiles + (** Information associated to a slot header in the RPC services of the DAL node. *) type slot_header = { diff --git a/tezt/lib_tezos/dal_node.mli b/tezt/lib_tezos/dal_node.mli index a22cf6e3ffd3b33c47c781ea56d874c9803c07c1..8932bec9e91d102644fbb12fe33226ae0e5e78e0 100644 --- a/tezt/lib_tezos/dal_node.mli +++ b/tezt/lib_tezos/dal_node.mli @@ -106,6 +106,9 @@ val terminate : ?timeout:float -> t -> unit Lwt.t (** Send SIGKILL and wait for the process to terminate. *) val kill : t -> unit Lwt.t +(** Send SIGSTOP to a daemon. Do not wait for the process to terminate. *) +val stop : t -> unit Lwt.t + (** Shows in stdout every events sent by the node *) val log_events : ?max_length:int -> t -> unit diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index 28c1612159e8faee3385034d865dcc0ab0802317..ab8d0edad79b5a321825a1490323c1e7f428fee5 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -2315,10 +2315,26 @@ let test_dal_node_test_patch_profile _protocol _parameters _cryptobox _node let* () = check_profiles ~__LOC__ dal_node ~expected:(Operator [profile1; profile2]) in - (* Test that the patched profiles are persisted after restart. *) + (* Test that the patched profiles are persisted after restart using SIGTERM. *) let* () = Dal_node.terminate dal_node in + Log.info "restart DAL node (1)" ; let* () = Dal_node.run dal_node ~wait_ready:true in - check_profiles ~__LOC__ dal_node ~expected:(Operator [profile1; profile2]) + + let* () = + check_profiles ~__LOC__ dal_node ~expected:(Operator [profile1; profile2]) + in + (* Test whether the patched profiles persist after a restart using SIGSTOP + (that is, even if we stop the DAL node abruptly). *) + let profile3 = Dal_RPC.Attester Constant.bootstrap3.public_key_hash in + let* () = patch_profile_rpc profile3 in + let* () = Dal_node.stop dal_node in + let* () = Dal_node.kill dal_node in + Log.info "restart DAL node (2)" ; + let* () = Dal_node.run dal_node ~wait_ready:true in + check_profiles + ~__LOC__ + dal_node + ~expected:(Operator [profile1; profile2; profile3]) (* Check that result of the DAL node's GET /profiles//attested_levels//assigned_shard_indices