diff --git a/src/bin_agnostic_baker/README.md b/src/bin_agnostic_baker/README.md index 3dff0205eaa21e5509b0da54dd58f7f357901b36..16f828c25c309cace14499aa1fca3b2eb8b0dbf2 100644 --- a/src/bin_agnostic_baker/README.md +++ b/src/bin_agnostic_baker/README.md @@ -45,6 +45,10 @@ baker run the baker process for ``, information obtained from the node Notice that the two types of arguments are separated by a clear `--`. +With the introduction of the agnostic baker, a unification of the CLI has also been +achieved, therefore there will not be incompatibilities between two consecutive protocol +baking commands. + ## How it works 1. **Initialization**: The daemon starts and connects to the specified Tezos node. @@ -53,4 +57,5 @@ Notice that the two types of arguments are separated by a clear `--`. 4. **Baker Execution**: The chosen baker process is executed with the specified arguments (`[OCTEZ-BAKER-COMMANDS]`). 5. **Chain Monitoring**: The daemon continuously monitors the chain for new blocks and protocol changes (based on the voting period). -6. **Protocol Updates**: If a new protocol is encountered, the daemon stops the current baker process and starts a new one matching the new protocol. This is currently done via an Lwt cancelling mechanism. +6. **Protocol Updates**: If a new protocol is encountered, the daemon stops the old baker process after a few levels have +passed since the migration (for safety purposes), and starts a new baker process for the new protocol. This is currently done via an Lwt cancelling mechanism. diff --git a/src/lib_agnostic_baker/agnostic_baker_events.ml b/src/lib_agnostic_baker/agnostic_baker_events.ml index cdb6585bf353e41550575bd349f0aae8fce65124..4a591d8ffb79728468358dc44a061a038340637f 100644 --- a/src/lib_agnostic_baker/agnostic_baker_events.ml +++ b/src/lib_agnostic_baker/agnostic_baker_events.ml @@ -70,6 +70,18 @@ let protocol_encountered = ("proto_hash", Protocol_hash.encoding) ~pp2:Protocol_hash.pp_short +let become_old_baker = + declare_2 + ~section + ~level:Notice + ~name:"become_old_baker" + ~msg: + "The old baker for protocol {proto_hash} will be shut down at level \ + {level_to_kill}." + ("proto_hash", Protocol_hash.encoding) + ~pp1:Protocol_hash.pp_short + ("level_to_kill", int31) + let waiting_for_active_protocol = declare_0 ~section diff --git a/src/lib_agnostic_baker/daemon.ml b/src/lib_agnostic_baker/daemon.ml index 13900981762543e630c19a9fcafac382e44ff400..f396b39727e8a7d577ea8538d2db7346c6ea0784 100644 --- a/src/lib_agnostic_baker/daemon.ml +++ b/src/lib_agnostic_baker/daemon.ml @@ -8,16 +8,15 @@ open Agnostic_baker_errors +(* Number of extra levels to keep the old baker alive before shutting it down. + This extra time is used to avoid halting the chain in cases such as + reorganization or high round migration blocks. *) +let extra_levels_for_old_baker = 3 + type process = {thread : int Lwt.t; canceller : int Lwt.u} type baker = {protocol_hash : Protocol_hash.t; process : process} -let shutdown baker = - let open Lwt_syntax in - let* () = Agnostic_baker_events.(emit stopping_baker) baker.protocol_hash in - Lwt.wakeup baker.process.canceller 0 ; - return_unit - (** [run_thread ~protocol_hash ~baker_commands ~baker_args ~cancel_promise ~logs_path] returns the main running thread for the baker given its protocol [~procol_hash], corresponding commands [~baker_commands], with the command line arguments given by @@ -58,8 +57,8 @@ let run_thread ~protocol_hash ~baker_commands ~baker_args ~cancel_promise cancel_promise; ] -(** [spawn_baker protocol_hash ~baker_args] spawns a baker for the given [protocol_hash] - with [~baker_args] as command line arguments. *) +(** [spawn_baker protocol_hash ~baker_args] spawns a new baker process for the + given [protocol_hash] with command-line arguments [~baker_args]. *) let spawn_baker protocol_hash ~baker_args = let open Lwt_result_syntax in let args_as_string = @@ -73,8 +72,8 @@ let spawn_baker protocol_hash ~baker_args = let*! () = Agnostic_baker_events.(emit starting_baker) (protocol_hash, args_as_string) in - (* The mocked binary argument is necessary for command line parsing done in the running - baker function below. It will be discarded, so its value is not important. *) + (* Prepend a dummy binary argument required for command-line parsing. The argument + will be discarded, so its value is not important. *) let baker_args = "./mock-binary" :: baker_args in let cancel_promise, canceller = Lwt.wait () in let* thread = @@ -91,10 +90,13 @@ let spawn_baker protocol_hash ~baker_args = let*! () = Agnostic_baker_events.(emit baker_running) protocol_hash in return {protocol_hash; process = {thread; canceller}} +type baker_to_kill = {baker : baker; level_to_kill : int} + type 'a state = { node_endpoint : string; baker_args : string list; mutable current_baker : baker option; + mutable old_baker : baker_to_kill option; } type 'a t = 'a state @@ -126,11 +128,11 @@ let monitor_heads ~node_addr = ignore (loop () : unit Lwt.t) ; return stream -(** [hot_swap_baker ~state ~next_protocol_hash] performs a swap in the current - [~state] of the agnostic baker, exchanging the current baker with the one - corresponding to [~next_protocol_hash]. This is done by stopping the - current baking process and spawning a new process instead. *) -let hot_swap_baker ~state ~next_protocol_hash = +(** [hot_swap_baker ~state ~current_protocol_hash ~next_protocol_hash + ~level_to_kill_old_baker] moves the current baker into the old baker slot + (to be killed later) and spawns a new baker for [~next_protocol_hash] *) +let hot_swap_baker ~state ~current_protocol_hash ~next_protocol_hash + ~level_to_kill_old_baker = let open Lwt_result_syntax in let* current_baker = match state.current_baker with @@ -142,8 +144,12 @@ let hot_swap_baker ~state ~next_protocol_hash = Agnostic_baker_events.(emit protocol_encountered) (next_proto_status, next_protocol_hash) in - (* Shutdown previous baker *) - let*! () = shutdown current_baker in + let*! () = + Agnostic_baker_events.(emit become_old_baker) + (current_protocol_hash, level_to_kill_old_baker) + in + state.old_baker <- + Some {baker = current_baker; level_to_kill = level_to_kill_old_baker} ; state.current_baker <- None ; let* new_baker = spawn_baker next_protocol_hash ~baker_args:state.baker_args @@ -151,41 +157,71 @@ let hot_swap_baker ~state ~next_protocol_hash = state.current_baker <- Some new_baker ; return_unit -(** [monitor_voting_periods ~state head_stream] creates a process which listens - to the [head_stream] stream (which returns the data of the heads of the network - chain) in order to know when to "hot swap" (fork) the current protocol baking - binary with the one associated with the next protocol. *) +(** [parse_level head_info] retrieves the ["level"] field information from the + json information of the chain from [head_info]. *) +let parse_level head_info = + let json = Ezjsonm.from_string head_info in + Ezjsonm.find json ["level"] |> Ezjsonm.get_int + +(** [maybe_kill_old_baker state head_info] checks whether the [old_baker] process + from the [state] of the agnostic baker has surpassed its lifetime and it stops + it if that is the case. *) +let maybe_kill_old_baker state head_info = + let open Lwt_syntax in + match state.old_baker with + | None -> return_unit + | Some {baker; level_to_kill} -> + let head_level = parse_level head_info in + if head_level >= level_to_kill then ( + let* () = + Agnostic_baker_events.(emit stopping_baker) baker.protocol_hash + in + Lwt.wakeup baker.process.canceller 0 ; + state.old_baker <- None ; + return_unit) + else return_unit + +(** [monitor_voting_periods ~state head_stream] continuously monitors [heads_stream] + to detect protocol changes. It will: + - Shut down an old baker it its time has come; + - Spawn and "hot-swap" to a new baker if the next protocol hash is different. *) let monitor_voting_periods ~state head_stream = let open Lwt_result_syntax in let node_addr = state.node_endpoint in let rec loop () = - let*! v = Lwt_stream.get head_stream in - match v with - | Some _tick -> + let*! head_info_opt = Lwt_stream.get head_stream in + match head_info_opt with + | None -> tzfail Lost_node_connection + | Some head_info -> let* period_kind, remaining = Rpc_services.get_current_period ~node_addr in let*! () = Agnostic_baker_events.(emit period_status) (period_kind, remaining) in + let*! () = maybe_kill_old_baker state head_info in let* next_protocol_hash = Rpc_services.get_next_protocol_hash ~node_addr in let* current_protocol_hash = match state.current_baker with | None -> tzfail Missing_current_baker - | Some v -> return v.protocol_hash + | Some baker -> return baker.protocol_hash in let* () = if not (Protocol_hash.equal current_protocol_hash next_protocol_hash) - then hot_swap_baker ~state ~next_protocol_hash + then + hot_swap_baker + ~state + ~current_protocol_hash + ~next_protocol_hash + ~level_to_kill_old_baker: + (parse_level head_info + extra_levels_for_old_baker) else return_unit in loop () - | None -> tzfail Lost_node_connection in - let* () = loop () in - return_unit + loop () (** [baker_thread ~state] monitors the current baker thread for any potential error, and it propagates any error that can appear. *) @@ -198,11 +234,9 @@ let baker_thread ~state = in if retcode = 0 then return_unit else tzfail Baker_process_error -(** [may_start_initial_baker state] aims to start the baker associated - to the current protocol. If the protocol is considered as [frozen] (not - [active] anymore), and there is thus no actual baker binary anymore, the - initial phase consists in waiting until an [active] protocol is observed on - monitored heads function. *) +(** [may_start_initial_baker state] recursively waits for an [active] protocol + and spawns a baker for it. If the protocol is [frozen] (not [active] anymore), it + waits for a head with an [active] protocol. *) let may_start_initial_baker state = let open Lwt_result_syntax in let*! () = Agnostic_baker_events.(emit experimental_binary) () in @@ -253,7 +287,7 @@ let may_start_initial_baker state = may_start ~head_stream:None () let create ~node_endpoint ~baker_args = - {node_endpoint; baker_args; current_baker = None} + {node_endpoint; baker_args; current_baker = None; old_baker = None} let run state = let open Lwt_result_syntax in @@ -265,11 +299,7 @@ let run state = Lwt.return_unit) in let* () = may_start_initial_baker state in - let* _protocol_proposal = Rpc_services.get_current_proposal ~node_addr in let* head_stream = monitor_heads ~node_addr in (* Monitoring voting periods through heads monitoring to avoid missing UAUs. *) - let* () = - Lwt.pick [monitor_voting_periods ~state head_stream; baker_thread ~state] - in - return_unit + Lwt.pick [monitor_voting_periods ~state head_stream; baker_thread ~state] diff --git a/src/lib_agnostic_baker/rpc_services.ml b/src/lib_agnostic_baker/rpc_services.ml index 725f47615932994d3c7b851ac9a7eb3e96c685f3..bde5622bb7787374406ca2fe6ff1a03ded597edd 100644 --- a/src/lib_agnostic_baker/rpc_services.ml +++ b/src/lib_agnostic_baker/rpc_services.ml @@ -43,6 +43,27 @@ let call_and_wrap_rpc ~node_addr ~uri ~f = in raise Not_found +let get_level ~node_addr = + let open Lwt_result_syntax in + let f json = + (* Level field in the RPC result *) + let name = "level" in + let* v = + match json with + | `O fields -> ( + match List.assoc_opt ~equal:( = ) name fields with + | None -> tzfail (Cannot_decode_node_data ("missing field " ^ name)) + | Some node -> return node) + | _ -> tzfail (Cannot_decode_node_data "not an object") + in + let level = Ezjsonm.get_int v in + return level + in + let uri = + Format.sprintf "%s/chains/main/blocks/head/header/shell" node_addr + in + call_and_wrap_rpc ~node_addr ~uri ~f + let get_next_protocol_hash ~node_addr = let open Lwt_result_syntax in let f json = @@ -62,19 +83,6 @@ let get_next_protocol_hash ~node_addr = let uri = Format.sprintf "%s/chains/main/blocks/head/metadata" node_addr in call_and_wrap_rpc ~node_addr ~uri ~f -let get_current_proposal ~node_addr = - let open Lwt_result_syntax in - let f json = - match json with - | `Null -> return_none - | `String s -> return_some @@ Protocol_hash.of_b58check_exn s - | _ -> tzfail (Cannot_decode_node_data "not an object") - in - let uri = - Format.sprintf "%s/chains/main/blocks/head/votes/current_proposal" node_addr - in - call_and_wrap_rpc ~node_addr ~uri ~f - let get_current_period ~node_addr = let open Lwt_result_syntax in let voting_period_field = "voting_period" in diff --git a/src/lib_agnostic_baker/rpc_services.mli b/src/lib_agnostic_baker/rpc_services.mli index bdf44fb647fb1216ffd51a49038d5922479db174..063adf0e11f00b4e1b428c9ed24b1882b7bb3dd1 100644 --- a/src/lib_agnostic_baker/rpc_services.mli +++ b/src/lib_agnostic_baker/rpc_services.mli @@ -13,16 +13,14 @@ val request_uri : uri:string -> (Cohttp_lwt_unix.Response.t * Cohttp_lwt.Body.t) tzresult Lwt.t +(** [get_level ~node_addr] returns the level of the block. *) +val get_level : node_addr:string -> (int, error trace) result Lwt.t + (** [get_next_protocol_hash ~node_addr] returns the protocol hash contained in the [next_protocol] field of the metadata of a block. *) val get_next_protocol_hash : node_addr:string -> Protocol_hash.t tzresult Lwt.t -(** [get_current_proposal ~node_addr] returns the protocol hash of - the current voting period, if any. *) -val get_current_proposal : - node_addr:string -> Protocol_hash.t option tzresult Lwt.t - (** [get_current_period ~node_addr] returns the current voting period in addition to the number of remaining blocks until the end of the period. *) diff --git a/tezt/lib_tezos/agnostic_baker.ml b/tezt/lib_tezos/agnostic_baker.ml index e1f280dc5da92b34b2e82a892cfc530ecca2d285..7255ce07c082cd179643189b65ea0b0c6bc7b841 100644 --- a/tezt/lib_tezos/agnostic_baker.ml +++ b/tezt/lib_tezos/agnostic_baker.ml @@ -22,6 +22,10 @@ type protocol_status = Active | Frozen | Ignore let protocol_status = function Protocol.Alpha -> Ignore | _ -> Active +(* This is hard-coded after the same value from [Daemon] module from + [src/lib_agnostic_baker]. *) +let extra_levels_for_old_baker = 3 + module Parameters = struct type persistent_state = { delegates : string list; diff --git a/tezt/lib_tezos/agnostic_baker.mli b/tezt/lib_tezos/agnostic_baker.mli index 4109b16f324c85a9f8eb370c978bb6921613614e..62c00048f0602caf5a7fae094c6b122e0c56d483 100644 --- a/tezt/lib_tezos/agnostic_baker.mli +++ b/tezt/lib_tezos/agnostic_baker.mli @@ -80,6 +80,11 @@ type protocol_status = Active | Frozen | Ignore (** Returns the protocol status given the full protocol value. *) val protocol_status : Protocol.t -> protocol_status +(** Number of extra levels to keep the old baker alive before shutting it down. + This extra time is used to avoid halting the chain in cases such as + reorganization or high round migration blocks. *) +val extra_levels_for_old_baker : int + (** Create a agnostic baker. This function just creates a value of type [t], it does not call {!val:run}. diff --git a/tezt/tests/agnostic_baker_test.ml b/tezt/tests/agnostic_baker_test.ml index 60fb3d5f97398f70e8984f590bb785c0fc885bff..b026e779a97de84949416ec43d45a7756d06241d 100644 --- a/tezt/tests/agnostic_baker_test.ml +++ b/tezt/tests/agnostic_baker_test.ml @@ -29,6 +29,26 @@ let wait_for_active_protocol_waiting agnostic_baker = "waiting_for_active_protocol.v0" (fun _json -> Some ()) +let wait_for_become_old_baker agnostic_baker = + Agnostic_baker.wait_for agnostic_baker "become_old_baker.v0" (fun _json -> + Some ()) + +let wait_for_stopping_baker agnostic_baker = + Agnostic_baker.wait_for agnostic_baker "stopping_baker.v0" (fun _json -> + Some ()) + +let remote_sign client = + let* () = Client.forget_all_keys client in + let keys = Constant.activator :: (Account.Bootstrap.keys |> Array.to_list) in + let* signer = Signer.init ~keys () in + (* tell the baker to ask the signer for the bootstrap keys *) + let uri = Signer.uri signer in + Lwt_list.iter_s + (fun account -> + let Account.{alias; public_key_hash; _} = account in + Client.import_signer_key client ~alias ~public_key_hash ~signer:uri) + keys + (* Performs a protocol migration thanks to a UAU and the agnostic baker. *) let perform_protocol_migration ?node_name ?client_name ?parameter_file ?(use_remote_signer = false) ~blocks_per_cycle ~migration_level @@ -43,22 +63,7 @@ let perform_protocol_migration ?node_name ?client_name ?parameter_file ~migrate_to () in - let* () = - if use_remote_signer then - let* () = Client.forget_all_keys client in - let keys = - Constant.activator :: (Account.Bootstrap.keys |> Array.to_list) - in - let* signer = Signer.init ~keys () in - (* tell the baker to ask the signer for the bootstrap keys *) - let uri = Signer.uri signer in - Lwt_list.iter_s - (fun account -> - let Account.{alias; public_key_hash; _} = account in - Client.import_signer_key client ~alias ~public_key_hash ~signer:uri) - keys - else unit - in + let* () = if use_remote_signer then remote_sign client else unit in Log.info "Node %s initialized" (Node.name node) ; let baker = Agnostic_baker.create node client in let wait_for_active_protocol_waiting = @@ -78,6 +83,11 @@ let perform_protocol_migration ?node_name ?client_name ?parameter_file Log.info "Protocol %s activated" (Protocol.hash migrate_from) ; Log.info "Baking at least %d blocks to trigger migration" migration_level ; let* _level = Node.wait_for_level node migration_level in + let wait_for_become_old_baker = wait_for_become_old_baker baker in + Log.info + "Check that the baking process for %s is not killed" + (Protocol.tag migrate_from) ; + let* () = wait_for_become_old_baker in (* Ensure that the block before migration is consistent *) Log.info "Checking migration block consistency" ; let* () = @@ -99,6 +109,17 @@ let perform_protocol_migration ?node_name ?client_name ?parameter_file ~migrate_to ~level:(migration_level + 1) in + Log.info + "Check that baker for protocol %s is killed after %d levels" + (Protocol.tag migrate_from) + Agnostic_baker.extra_levels_for_old_baker ; + let wait_for_stopping_baker = wait_for_stopping_baker baker in + let* _level = + Node.wait_for_level + node + (migration_level + Agnostic_baker.extra_levels_for_old_baker) + in + let* () = wait_for_stopping_baker in (* Test that we can still bake after migration *) let* _level = Node.wait_for_level node baked_blocks_after_migration in let* () = Agnostic_baker.terminate baker in