diff --git a/src/bin_agnostic_baker/main_agnostic_baker.ml b/src/bin_agnostic_baker/main_agnostic_baker.ml index c50e2d66888becb19acba22e5784f0a8a670a165..0c0f9910d15876dfb2433febde5cec6ba9b076f1 100644 --- a/src/bin_agnostic_baker/main_agnostic_baker.ml +++ b/src/bin_agnostic_baker/main_agnostic_baker.ml @@ -8,6 +8,7 @@ type error += | Cannot_connect_to_node of string | Cannot_decode_node_data of string + | Lost_node_connection let () = Error_monad.register_error_kind @@ -31,7 +32,16 @@ let () = ~pp:(fun ppf err -> Format.fprintf ppf "Cannot decode node data: %s" err) Data_encoding.(obj1 (req "err" string)) (function Cannot_decode_node_data err -> Some err | _ -> None) - (fun err -> Cannot_decode_node_data err) + (fun err -> Cannot_decode_node_data err) ; + Error_monad.register_error_kind + `Permanent + ~id:"agnostic_baker.lost_node_connection" + ~title:"Lost node connection" + ~description:"Connection with node lost." + ~pp:(fun ppf () -> Format.fprintf ppf "Connection with node was lost") + Data_encoding.(unit) + (function Lost_node_connection -> Some () | _ -> None) + (fun () -> Lost_node_connection) module Events = struct include Internal_event.Simple @@ -66,6 +76,24 @@ module Events = struct ~msg:"starting agnostic daemon" () + let protocol_encountered = + declare_2 + ~section + ~level:Notice + ~name:"protocol_encountered" + ~msg:"the {status} protocol {proto_hash} was encountered" + ("status", string) + ("proto_hash", Protocol_hash.encoding) + ~pp2:Protocol_hash.pp_short + + let waiting_for_active_protocol = + declare_0 + ~section + ~level:Notice + ~name:"waiting_for_active_protocol" + ~msg:"waiting for active protocol" + () + let period_status = declare_2 ~section @@ -82,8 +110,14 @@ module Parameters = struct type status = Active | Frozen + let pp_status fmt status = + Format.fprintf + fmt + "%s" + (match status with Active -> "active" | Frozen -> "frozen") + (* From Manifest/Product_octez/Protocol*) - let protocol_status = function + let protocol_info = function | ( "ProtoGenesisGenesisGenesisGenesisGenesisGenesk612im" | "Ps9mPmXaRzmzk35gbAYNCAw6UXdE2qoABTHbN2oEEc1qM7CwT9P" | "PtCJ7pwoxe8JasnHY8YonnLYjcVHmhiARPJvqcC6VfHT5s8k8sY" @@ -113,12 +147,18 @@ module Parameters = struct (String.sub full_hash 0 8, Active) | "ProtoALphaALphaALphaALphaALphaALphaALphaALphaDdp3zK" -> ("alpha", Active) | _ -> (*We assume that unmatched protocols are beta ones*) ("beta", Active) + + let protocol_short_hash h = fst (protocol_info h) + + let protocol_status h = snd (protocol_info h) end module Baker = struct + type t = Lwt_process.process_none option + let baker_path ?(user_path = "./") proto_hash = - let short_name, _status = - Parameters.protocol_status (Protocol_hash.to_b58check proto_hash) + let short_name = + Parameters.protocol_short_hash (Protocol_hash.to_b58check proto_hash) in Format.sprintf "%soctez-baker-%s" user_path short_name @@ -182,9 +222,35 @@ module RPC = struct (Cohttp.Code.code_of_status resp.status) in raise Not_found + + let get_next_protocol_hash ~node_addr = + let open Lwt_result_syntax in + let f json = + (* Next_protocol hash field in the RPC result *) + let name = "next_protocol" in + let* v = + match json with + | `O fields -> ( + match List.assoc_opt ~equal:( = ) name fields with + | None -> tzfail (Cannot_decode_node_data ("missing field " ^ name)) + | Some node -> return node) + | _ -> tzfail (Cannot_decode_node_data "not an object") + in + let hash = Protocol_hash.of_b58check_exn (Ezjsonm.get_string v) in + return hash + in + let uri = Format.sprintf "%s/chains/main/blocks/head/metadata" node_addr in + call_and_wrap_rpc ~node_addr ~uri ~f end module Daemon = struct + type state = { + binaries_directory : string option; + node_endpoint : string; + baker_args : string list; + mutable current_baker : Baker.t; + } + let get_current_proposal ~node_addr = let open Lwt_result_syntax in let f json = @@ -281,9 +347,69 @@ module Daemon = struct let* () = loop () in return_unit - let run ~node_addr = + (* Aims to start the baker associated to the current protocol. If + the protocol is considered as frozen (not active anymore), and + there is thus no actual baker binary anymore, the initial phase + consist in waiting until an active protocol is observed on + monitored heads. *) + let may_start_initial_baker state = + let open Lwt_result_syntax in + let rec may_start ?last_known_proto ~head_stream () = + let* protocol_hash = + RPC.get_next_protocol_hash ~node_addr:state.node_endpoint + in + let proto_status = + Parameters.protocol_status (Protocol_hash.to_b58check protocol_hash) + in + let*! () = + match last_known_proto with + | None -> Lwt.return_unit + | Some h -> + if not (Protocol_hash.equal h protocol_hash) then + Events.(emit protocol_encountered) + ( Format.asprintf "%a" Parameters.pp_status proto_status, + protocol_hash ) + else Lwt.return_unit + in + match proto_status with + | Active -> + let* current_baker = + Baker.spawn_baker + protocol_hash + ~binaries_directory:state.binaries_directory + ~baker_args:state.baker_args + in + state.current_baker <- Some current_baker ; + return_unit + | Frozen -> ( + let* head_stream = + match head_stream with + | Some v -> return v + | None -> + let*! () = + Events.(emit protocol_encountered) + ( Format.asprintf "%a" Parameters.pp_status proto_status, + protocol_hash ) + in + let*! () = Events.(emit waiting_for_active_protocol) () in + monitor_heads ~node_addr:state.node_endpoint + in + let*! v = Lwt_stream.get head_stream in + match v with + | Some _tick -> + may_start + ~last_known_proto:protocol_hash + ~head_stream:(Some head_stream) + () + | None -> tzfail Lost_node_connection) + in + may_start ~head_stream:None () + + let run ~state = let open Lwt_result_syntax in + let node_addr = state.node_endpoint in let*! () = Events.(emit starting_daemon) () in + let* () = may_start_initial_baker state in let* _protocol_proposal = get_current_proposal ~node_addr in let* head_stream = monitor_heads ~node_addr in (* Monitoring voting periods through heads monitoring to avoid @@ -292,25 +418,6 @@ module Daemon = struct return_unit end -let get_next_protocol_hash ~node_addr = - let open Lwt_result_syntax in - let f json = - (* Next_protocol hash field in the RPC result *) - let name = "next_protocol" in - let* v = - match json with - | `O fields -> ( - match List.assoc_opt ~equal:( = ) name fields with - | None -> tzfail (Cannot_decode_node_data ("missing field " ^ name)) - | Some node -> return node) - | _ -> tzfail (Cannot_decode_node_data "not an object") - in - let hash = Protocol_hash.of_b58check_exn (Ezjsonm.get_string v) in - return hash - in - let uri = Format.sprintf "%s/chains/main/blocks/head/metadata" node_addr in - RPC.call_and_wrap_rpc ~node_addr ~uri ~f - module Args = struct let binaries_directory_arg = "--binaries-directory" @@ -385,10 +492,14 @@ end let run () = let open Lwt_result_syntax in let*! () = Tezos_base_unix.Internal_event_unix.init () in - let endpoint, binaries_directory, baker_args = Args.parse_args Sys.argv in - let* proto_hash = get_next_protocol_hash ~node_addr:endpoint in - let (_daemon : unit tzresult Lwt.t) = Daemon.run ~node_addr:endpoint in - let* _baker = Baker.spawn_baker proto_hash ~binaries_directory ~baker_args in + let node_endpoint, binaries_directory, baker_args = + Args.parse_args Sys.argv + in + let* _daemon = + Daemon.run + ~state: + {binaries_directory; node_endpoint; baker_args; current_baker = None} + in let*! () = Lwt_utils.never_ending () in return_unit