diff --git a/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml b/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml index 287c9597404f8a312778c8af1e75bac2c7f8d627..2e17f2c4ea31df616d89c84c2f96a73e748b135a 100644 --- a/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml +++ b/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml @@ -25,32 +25,55 @@ open Protocol open Alpha_context +open Client_baking_blocks module Events = Baking_events.VDF module D_Events = Delegate_events.Denunciator -open Client_baking_blocks -type status = Not_started | Started | Finished +type vdf_solution = Environment.Vdf.result * Environment.Vdf.proof + +type status = Not_started | Started | Finished of vdf_solution | Injected type 'a state = { cctxt : Protocol_client_context.full; constants : Constants.t; - block_stream : (block_info, 'a) result Lwt_stream.t; + mutable block_stream : (block_info, 'a) result Lwt_stream.t; + mutable stream_stopper : RPC_context.stopper; mutable cycle : Cycle_repr.t option; mutable computation_status : status; } -let rec wait_for_first_block ~name stream = - Lwt_stream.get stream >>= function - | None | Some (Error _) -> - Delegate_events.Baking_scheduling.(emit cannot_fetch_event) name - >>= fun () -> - (* NOTE: this is not a tight loop because of Lwt_stream.get *) - wait_for_first_block ~name stream - | Some (Ok bi) -> Lwt.return bi +let init_block_stream_with_stopper cctxt chain = + Client_baking_blocks.monitor_valid_blocks + ~next_protocols:(Some [Protocol.hash]) + cctxt + ~chains:[chain] + () + +let restart_block_stream cctxt chain state = + let open Lwt_result_syntax in + state.stream_stopper () ; + let retries_on_failure = 10 in + let rec try_start_block_stream cctxt chain state retries_on_failure = + let*! p = init_block_stream_with_stopper cctxt chain in + match p with + | Ok (block_stream, stream_stopper) -> + state.block_stream <- block_stream ; + state.stream_stopper <- stream_stopper ; + return_unit + | Error e -> + if retries_on_failure > 0 then + let*! () = Lwt_unix.sleep 10. in + try_start_block_stream cctxt chain state (retries_on_failure - 1) + else fail e + in + let* () = try_start_block_stream cctxt chain state retries_on_failure in + return_unit let log_errors_and_continue ~name p = - p >>= function - | Ok () -> Lwt.return_unit + let open Lwt_syntax in + let* p = p in + match p with + | Ok () -> return_unit | Error errs -> Events.(emit vdf_daemon_error) (name, errs) let cycle_of_level state level = @@ -70,7 +93,7 @@ let is_in_nonce_revelation_period state level = let position_in_cycle = Int32.(sub level (mul current_cycle blocks_per_cycle)) in - Int32.compare position_in_cycle nonce_revelation_threshold < 0 + Int32.compare position_in_cycle nonce_revelation_threshold <= 0 let check_new_cycle state level = let current_cycle = Cycle_repr.of_int32_exn (cycle_of_level state level) in @@ -81,99 +104,135 @@ let check_new_cycle state level = state.cycle <- Some current_cycle ; state.computation_status <- Not_started) +let inject_vdf_revelation cctxt hash chain_id solution = + let open Lwt_result_syntax in + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + let* bytes = + Plugin.RPC.Forge.vdf_revelation + cctxt + (chain, block) + ~branch:hash + ~solution + () + in + let bytes = Signature.concat bytes Signature.zero in + Shell_services.Injection.operation cctxt ~chain bytes + let process_new_block (cctxt : #Protocol_client_context.full) state {hash; chain_id; protocol; next_protocol; level; _} = let open Lwt_result_syntax in let level_str = Int32.to_string (Raw_level.to_int32 level) in check_new_cycle state level ; if Protocol_hash.(protocol <> next_protocol) then - D_Events.(emit protocol_change_detected) () >>= fun () -> return_unit + let*! () = D_Events.(emit protocol_change_detected) () in + return_unit else if is_in_nonce_revelation_period state level then - Events.(emit vdf_info) - ("Skipping, still in nonce revelation period (level " ^ level_str ^ ")") - >>= fun _ -> return_unit - else if state.computation_status = Finished then - Events.(emit vdf_info) - ("Skipping, computation finished (level " ^ level_str ^ ")") - >>= fun _ -> return_unit + let*! () = + Events.(emit vdf_info) + ("Skipping, still in nonce revelation period (level " ^ level_str ^ ")") + in + return_unit + (* enter main loop if we are not in the nonce revelation period and + the expected protocol has been activated *) else - let chain = `Hash chain_id in - let block = `Hash (hash, 0) in - Alpha_services.Seed_computation.get cctxt (chain, block) >>=? fun x -> - (match x with - | Vdf_revelation_stage {seed_discriminant; seed_challenge} -> - if state.computation_status = Started then + match state.computation_status with + | Started -> + let*! () = Events.(emit vdf_info) - ("Skipping, computation already started (level " ^ level_str ^ ")") - >>= fun () -> return_unit - else - Events.(emit vdf_info) - ("Started to compute VDF (level " ^ level_str ^ ")") - >>= fun () -> - state.computation_status <- Started ; - let discriminant, challenge = - Seed.generate_vdf_setup ~seed_discriminant ~seed_challenge - in - let solution = - Environment.Vdf.prove - discriminant - challenge - state.constants.parametric.vdf_difficulty - in - let* bytes = - Plugin.RPC.Forge.vdf_revelation - cctxt - (chain, block) - ~branch:hash - ~solution - () - in - let bytes = Signature.concat bytes Signature.zero in - let* op_hash = - Shell_services.Injection.operation cctxt ~chain bytes - in + ("Already started VDF (level " ^ level_str ^ ")") + in + return_unit + | Not_started -> ( + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + let* seed_computation = + Alpha_services.Seed_computation.get cctxt (chain, block) + in + match seed_computation with + | Vdf_revelation_stage {seed_discriminant; seed_challenge} -> + let*! () = + Events.(emit vdf_info) + ("Started to compute VDF (level " ^ level_str ^ ")") + in + state.computation_status <- Started ; + let discriminant, challenge = + Seed.generate_vdf_setup ~seed_discriminant ~seed_challenge + in + let solution = + Environment.Vdf.prove + discriminant + challenge + state.constants.parametric.vdf_difficulty + in + state.computation_status <- Finished solution ; + + (* `Vdf.prove` is a long computation. We reset the block stream in + * order to not process all the blocks added to the chain during + * this time and skip straight to the current head. *) + restart_block_stream cctxt chain state + | Nonce_revelation_stage | Computation_finished -> + (* this should never actually happen if computation + has not been started *) + assert false) + | Finished solution -> + let*! () = + Events.(emit vdf_info) ("Finished VDF (level " ^ level_str ^ ")") + in + let chain = `Hash chain_id in + let* op_hash = inject_vdf_revelation cctxt hash chain_id solution in + state.computation_status <- Injected ; + let*! () = Events.(emit vdf_revelation_injected) (cycle_of_level state level, Chain_services.to_string chain, op_hash) - >>= fun () -> return_unit - | Nonce_revelation_stage -> - (* this should never actually happen *) - Events.(emit vdf_info) - ("Nonce revelation stage (level " ^ level_str ^ ")") - >>= fun () -> return_unit - | Computation_finished -> - (* this should happen at most once per cycle *) - state.computation_status <- Finished ; - Events.(emit vdf_info) ("Computation finished (level " ^ level_str ^ ")") - >>= fun () -> return_unit) - >>= fun _ -> return_unit + in + return_unit + | Injected -> + let*! () = + Events.(emit vdf_info) + ("Skipping, already injected VDF (level " ^ level_str ^ ")") + in + return_unit let start_vdf_worker (cctxt : Protocol_client_context.full) ~canceler constants - (block_stream : Client_baking_blocks.block_info tzresult Lwt_stream.t) = + chain = + let open Lwt_result_syntax in + let* block_stream, stream_stopper = + init_block_stream_with_stopper cctxt chain + in let state = { cctxt; constants; block_stream; + stream_stopper; cycle = None; computation_status = Not_started; } in - Lwt_canceler.on_cancel canceler (fun () -> Lwt.return_unit) ; - wait_for_first_block ~name state.block_stream >>= fun _first_event -> + Lwt_canceler.on_cancel canceler (fun () -> + state.stream_stopper () ; + Lwt.return_unit) ; let rec worker_loop () = - Lwt.choose - [ - (Lwt_exit.clean_up_starts >|= fun _ -> `Termination); - (Lwt_stream.get state.block_stream >|= fun e -> `Block e); - ] - >>= function + let*! b = + Lwt.choose + [ + (Lwt_exit.clean_up_starts >|= fun _ -> `Termination); + (Lwt_stream.get state.block_stream >|= fun e -> `Block e); + ] + in + match b with | `Termination -> return_unit | `Block (None | Some (Error _)) -> (* exit when the node is unavailable *) - Events.(emit vdf_daemon_connection_lost) name >>= fun () -> - fail Baking_errors.Node_connection_lost + state.stream_stopper () ; + let*! () = Events.(emit vdf_daemon_connection_lost) name in + tzfail Baking_errors.Node_connection_lost | `Block (Some (Ok bi)) -> - log_errors_and_continue ~name @@ process_new_block cctxt state bi - >>= fun () -> worker_loop () + let*! () = + log_errors_and_continue ~name @@ process_new_block cctxt state bi + in + worker_loop () in - Events.(emit vdf_daemon_start) name >>= fun () -> worker_loop () + let*! () = Events.(emit vdf_daemon_start) name in + worker_loop () diff --git a/src/proto_014_PtKathma/lib_delegate/baking_vdf.mli b/src/proto_014_PtKathma/lib_delegate/baking_vdf.mli index 460a4b13b890b0d1ef96ed5b613c4442ef19e909..84751f723af86d8ab942bcba6f8e211f217162d0 100644 --- a/src/proto_014_PtKathma/lib_delegate/baking_vdf.mli +++ b/src/proto_014_PtKathma/lib_delegate/baking_vdf.mli @@ -29,5 +29,5 @@ val start_vdf_worker : Protocol_client_context.full -> canceler:Lwt_canceler.t -> Constants.t -> - Client_baking_blocks.block_info tzresult Lwt_stream.t -> + Chain_services.chain -> unit tzresult Lwt.t diff --git a/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.ml b/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.ml index a67b9c14bde35b45c6ca31ebed9157a6caa9f514..df7b2d7f977d79eabdbca6e31cd0ee17d493c135 100644 --- a/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.ml +++ b/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.ml @@ -145,18 +145,20 @@ end let monitor_valid_blocks cctxt ?chains ?protocols ~next_protocols () = Monitor_services.valid_blocks cctxt ?chains ?protocols ?next_protocols () - >>=? fun (block_stream, _stop) -> + >>=? fun (block_stream, stop) -> return - (Lwt_stream.map_s - (fun ((chain, block), header) -> - Block_seen_event.(Event.emit (make block header (`Valid_blocks chain))) - >>=? fun () -> - raw_info - cctxt - ~chain:(`Hash chain) - block - header.Tezos_base.Block_header.shell) - block_stream) + ( Lwt_stream.map_s + (fun ((chain, block), header) -> + Block_seen_event.( + Event.emit (make block header (`Valid_blocks chain))) + >>=? fun () -> + raw_info + cctxt + ~chain:(`Hash chain) + block + header.Tezos_base.Block_header.shell) + block_stream, + stop ) let monitor_heads cctxt ~next_protocols chain = Monitor_services.heads cctxt ?next_protocols chain diff --git a/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.mli b/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.mli index fdf29d613ddba3997332dd94376e2d4d565b70e2..e5b561aeb5b94060673fa228df8b6f73d8c0e759 100644 --- a/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.mli +++ b/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.mli @@ -51,7 +51,7 @@ val monitor_valid_blocks : ?protocols:Protocol_hash.t list -> next_protocols:Protocol_hash.t list option -> unit -> - block_info tzresult Lwt_stream.t tzresult Lwt.t + (block_info tzresult Lwt_stream.t * RPC_context.stopper) tzresult Lwt.t val monitor_heads : #Protocol_client_context.rpc_context -> diff --git a/src/proto_014_PtKathma/lib_delegate/client_daemon.ml b/src/proto_014_PtKathma/lib_delegate/client_daemon.ml index 246556e881bdf78c77eadd457cd40a04642e7ad2..ba2cfdac03b3b0acbefcc66990a0cca6383e5a8b 100644 --- a/src/proto_014_PtKathma/lib_delegate/client_daemon.ml +++ b/src/proto_014_PtKathma/lib_delegate/client_daemon.ml @@ -129,7 +129,7 @@ module Accuser = struct cctxt ~chains:[chain] () - >>=? fun valid_blocks_stream -> + >>=? fun (valid_blocks_stream, _) -> let canceler = Lwt_canceler.create () in let _ = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> @@ -166,13 +166,6 @@ module VDF = struct let* constants = Protocol.Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in - let* block_stream = - Client_baking_blocks.monitor_valid_blocks - ~next_protocols:(Some [Protocol.hash]) - cctxt - ~chains:[chain] - () - in let canceler = Lwt_canceler.create () in let _ = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> @@ -180,7 +173,7 @@ module VDF = struct let*! _ = Lwt_canceler.cancel canceler in Lwt.return_unit) in - Baking_vdf.start_vdf_worker cctxt ~canceler constants block_stream + Baking_vdf.start_vdf_worker cctxt ~canceler constants chain in let* () = Client_confirmations.wait_for_bootstrapped diff --git a/src/proto_alpha/lib_delegate/baking_vdf.ml b/src/proto_alpha/lib_delegate/baking_vdf.ml index 287c9597404f8a312778c8af1e75bac2c7f8d627..2e17f2c4ea31df616d89c84c2f96a73e748b135a 100644 --- a/src/proto_alpha/lib_delegate/baking_vdf.ml +++ b/src/proto_alpha/lib_delegate/baking_vdf.ml @@ -25,32 +25,55 @@ open Protocol open Alpha_context +open Client_baking_blocks module Events = Baking_events.VDF module D_Events = Delegate_events.Denunciator -open Client_baking_blocks -type status = Not_started | Started | Finished +type vdf_solution = Environment.Vdf.result * Environment.Vdf.proof + +type status = Not_started | Started | Finished of vdf_solution | Injected type 'a state = { cctxt : Protocol_client_context.full; constants : Constants.t; - block_stream : (block_info, 'a) result Lwt_stream.t; + mutable block_stream : (block_info, 'a) result Lwt_stream.t; + mutable stream_stopper : RPC_context.stopper; mutable cycle : Cycle_repr.t option; mutable computation_status : status; } -let rec wait_for_first_block ~name stream = - Lwt_stream.get stream >>= function - | None | Some (Error _) -> - Delegate_events.Baking_scheduling.(emit cannot_fetch_event) name - >>= fun () -> - (* NOTE: this is not a tight loop because of Lwt_stream.get *) - wait_for_first_block ~name stream - | Some (Ok bi) -> Lwt.return bi +let init_block_stream_with_stopper cctxt chain = + Client_baking_blocks.monitor_valid_blocks + ~next_protocols:(Some [Protocol.hash]) + cctxt + ~chains:[chain] + () + +let restart_block_stream cctxt chain state = + let open Lwt_result_syntax in + state.stream_stopper () ; + let retries_on_failure = 10 in + let rec try_start_block_stream cctxt chain state retries_on_failure = + let*! p = init_block_stream_with_stopper cctxt chain in + match p with + | Ok (block_stream, stream_stopper) -> + state.block_stream <- block_stream ; + state.stream_stopper <- stream_stopper ; + return_unit + | Error e -> + if retries_on_failure > 0 then + let*! () = Lwt_unix.sleep 10. in + try_start_block_stream cctxt chain state (retries_on_failure - 1) + else fail e + in + let* () = try_start_block_stream cctxt chain state retries_on_failure in + return_unit let log_errors_and_continue ~name p = - p >>= function - | Ok () -> Lwt.return_unit + let open Lwt_syntax in + let* p = p in + match p with + | Ok () -> return_unit | Error errs -> Events.(emit vdf_daemon_error) (name, errs) let cycle_of_level state level = @@ -70,7 +93,7 @@ let is_in_nonce_revelation_period state level = let position_in_cycle = Int32.(sub level (mul current_cycle blocks_per_cycle)) in - Int32.compare position_in_cycle nonce_revelation_threshold < 0 + Int32.compare position_in_cycle nonce_revelation_threshold <= 0 let check_new_cycle state level = let current_cycle = Cycle_repr.of_int32_exn (cycle_of_level state level) in @@ -81,99 +104,135 @@ let check_new_cycle state level = state.cycle <- Some current_cycle ; state.computation_status <- Not_started) +let inject_vdf_revelation cctxt hash chain_id solution = + let open Lwt_result_syntax in + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + let* bytes = + Plugin.RPC.Forge.vdf_revelation + cctxt + (chain, block) + ~branch:hash + ~solution + () + in + let bytes = Signature.concat bytes Signature.zero in + Shell_services.Injection.operation cctxt ~chain bytes + let process_new_block (cctxt : #Protocol_client_context.full) state {hash; chain_id; protocol; next_protocol; level; _} = let open Lwt_result_syntax in let level_str = Int32.to_string (Raw_level.to_int32 level) in check_new_cycle state level ; if Protocol_hash.(protocol <> next_protocol) then - D_Events.(emit protocol_change_detected) () >>= fun () -> return_unit + let*! () = D_Events.(emit protocol_change_detected) () in + return_unit else if is_in_nonce_revelation_period state level then - Events.(emit vdf_info) - ("Skipping, still in nonce revelation period (level " ^ level_str ^ ")") - >>= fun _ -> return_unit - else if state.computation_status = Finished then - Events.(emit vdf_info) - ("Skipping, computation finished (level " ^ level_str ^ ")") - >>= fun _ -> return_unit + let*! () = + Events.(emit vdf_info) + ("Skipping, still in nonce revelation period (level " ^ level_str ^ ")") + in + return_unit + (* enter main loop if we are not in the nonce revelation period and + the expected protocol has been activated *) else - let chain = `Hash chain_id in - let block = `Hash (hash, 0) in - Alpha_services.Seed_computation.get cctxt (chain, block) >>=? fun x -> - (match x with - | Vdf_revelation_stage {seed_discriminant; seed_challenge} -> - if state.computation_status = Started then + match state.computation_status with + | Started -> + let*! () = Events.(emit vdf_info) - ("Skipping, computation already started (level " ^ level_str ^ ")") - >>= fun () -> return_unit - else - Events.(emit vdf_info) - ("Started to compute VDF (level " ^ level_str ^ ")") - >>= fun () -> - state.computation_status <- Started ; - let discriminant, challenge = - Seed.generate_vdf_setup ~seed_discriminant ~seed_challenge - in - let solution = - Environment.Vdf.prove - discriminant - challenge - state.constants.parametric.vdf_difficulty - in - let* bytes = - Plugin.RPC.Forge.vdf_revelation - cctxt - (chain, block) - ~branch:hash - ~solution - () - in - let bytes = Signature.concat bytes Signature.zero in - let* op_hash = - Shell_services.Injection.operation cctxt ~chain bytes - in + ("Already started VDF (level " ^ level_str ^ ")") + in + return_unit + | Not_started -> ( + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + let* seed_computation = + Alpha_services.Seed_computation.get cctxt (chain, block) + in + match seed_computation with + | Vdf_revelation_stage {seed_discriminant; seed_challenge} -> + let*! () = + Events.(emit vdf_info) + ("Started to compute VDF (level " ^ level_str ^ ")") + in + state.computation_status <- Started ; + let discriminant, challenge = + Seed.generate_vdf_setup ~seed_discriminant ~seed_challenge + in + let solution = + Environment.Vdf.prove + discriminant + challenge + state.constants.parametric.vdf_difficulty + in + state.computation_status <- Finished solution ; + + (* `Vdf.prove` is a long computation. We reset the block stream in + * order to not process all the blocks added to the chain during + * this time and skip straight to the current head. *) + restart_block_stream cctxt chain state + | Nonce_revelation_stage | Computation_finished -> + (* this should never actually happen if computation + has not been started *) + assert false) + | Finished solution -> + let*! () = + Events.(emit vdf_info) ("Finished VDF (level " ^ level_str ^ ")") + in + let chain = `Hash chain_id in + let* op_hash = inject_vdf_revelation cctxt hash chain_id solution in + state.computation_status <- Injected ; + let*! () = Events.(emit vdf_revelation_injected) (cycle_of_level state level, Chain_services.to_string chain, op_hash) - >>= fun () -> return_unit - | Nonce_revelation_stage -> - (* this should never actually happen *) - Events.(emit vdf_info) - ("Nonce revelation stage (level " ^ level_str ^ ")") - >>= fun () -> return_unit - | Computation_finished -> - (* this should happen at most once per cycle *) - state.computation_status <- Finished ; - Events.(emit vdf_info) ("Computation finished (level " ^ level_str ^ ")") - >>= fun () -> return_unit) - >>= fun _ -> return_unit + in + return_unit + | Injected -> + let*! () = + Events.(emit vdf_info) + ("Skipping, already injected VDF (level " ^ level_str ^ ")") + in + return_unit let start_vdf_worker (cctxt : Protocol_client_context.full) ~canceler constants - (block_stream : Client_baking_blocks.block_info tzresult Lwt_stream.t) = + chain = + let open Lwt_result_syntax in + let* block_stream, stream_stopper = + init_block_stream_with_stopper cctxt chain + in let state = { cctxt; constants; block_stream; + stream_stopper; cycle = None; computation_status = Not_started; } in - Lwt_canceler.on_cancel canceler (fun () -> Lwt.return_unit) ; - wait_for_first_block ~name state.block_stream >>= fun _first_event -> + Lwt_canceler.on_cancel canceler (fun () -> + state.stream_stopper () ; + Lwt.return_unit) ; let rec worker_loop () = - Lwt.choose - [ - (Lwt_exit.clean_up_starts >|= fun _ -> `Termination); - (Lwt_stream.get state.block_stream >|= fun e -> `Block e); - ] - >>= function + let*! b = + Lwt.choose + [ + (Lwt_exit.clean_up_starts >|= fun _ -> `Termination); + (Lwt_stream.get state.block_stream >|= fun e -> `Block e); + ] + in + match b with | `Termination -> return_unit | `Block (None | Some (Error _)) -> (* exit when the node is unavailable *) - Events.(emit vdf_daemon_connection_lost) name >>= fun () -> - fail Baking_errors.Node_connection_lost + state.stream_stopper () ; + let*! () = Events.(emit vdf_daemon_connection_lost) name in + tzfail Baking_errors.Node_connection_lost | `Block (Some (Ok bi)) -> - log_errors_and_continue ~name @@ process_new_block cctxt state bi - >>= fun () -> worker_loop () + let*! () = + log_errors_and_continue ~name @@ process_new_block cctxt state bi + in + worker_loop () in - Events.(emit vdf_daemon_start) name >>= fun () -> worker_loop () + let*! () = Events.(emit vdf_daemon_start) name in + worker_loop () diff --git a/src/proto_alpha/lib_delegate/baking_vdf.mli b/src/proto_alpha/lib_delegate/baking_vdf.mli index 460a4b13b890b0d1ef96ed5b613c4442ef19e909..84751f723af86d8ab942bcba6f8e211f217162d0 100644 --- a/src/proto_alpha/lib_delegate/baking_vdf.mli +++ b/src/proto_alpha/lib_delegate/baking_vdf.mli @@ -29,5 +29,5 @@ val start_vdf_worker : Protocol_client_context.full -> canceler:Lwt_canceler.t -> Constants.t -> - Client_baking_blocks.block_info tzresult Lwt_stream.t -> + Chain_services.chain -> unit tzresult Lwt.t diff --git a/src/proto_alpha/lib_delegate/client_baking_blocks.ml b/src/proto_alpha/lib_delegate/client_baking_blocks.ml index a67b9c14bde35b45c6ca31ebed9157a6caa9f514..df7b2d7f977d79eabdbca6e31cd0ee17d493c135 100644 --- a/src/proto_alpha/lib_delegate/client_baking_blocks.ml +++ b/src/proto_alpha/lib_delegate/client_baking_blocks.ml @@ -145,18 +145,20 @@ end let monitor_valid_blocks cctxt ?chains ?protocols ~next_protocols () = Monitor_services.valid_blocks cctxt ?chains ?protocols ?next_protocols () - >>=? fun (block_stream, _stop) -> + >>=? fun (block_stream, stop) -> return - (Lwt_stream.map_s - (fun ((chain, block), header) -> - Block_seen_event.(Event.emit (make block header (`Valid_blocks chain))) - >>=? fun () -> - raw_info - cctxt - ~chain:(`Hash chain) - block - header.Tezos_base.Block_header.shell) - block_stream) + ( Lwt_stream.map_s + (fun ((chain, block), header) -> + Block_seen_event.( + Event.emit (make block header (`Valid_blocks chain))) + >>=? fun () -> + raw_info + cctxt + ~chain:(`Hash chain) + block + header.Tezos_base.Block_header.shell) + block_stream, + stop ) let monitor_heads cctxt ~next_protocols chain = Monitor_services.heads cctxt ?next_protocols chain diff --git a/src/proto_alpha/lib_delegate/client_baking_blocks.mli b/src/proto_alpha/lib_delegate/client_baking_blocks.mli index fdf29d613ddba3997332dd94376e2d4d565b70e2..e5b561aeb5b94060673fa228df8b6f73d8c0e759 100644 --- a/src/proto_alpha/lib_delegate/client_baking_blocks.mli +++ b/src/proto_alpha/lib_delegate/client_baking_blocks.mli @@ -51,7 +51,7 @@ val monitor_valid_blocks : ?protocols:Protocol_hash.t list -> next_protocols:Protocol_hash.t list option -> unit -> - block_info tzresult Lwt_stream.t tzresult Lwt.t + (block_info tzresult Lwt_stream.t * RPC_context.stopper) tzresult Lwt.t val monitor_heads : #Protocol_client_context.rpc_context -> diff --git a/src/proto_alpha/lib_delegate/client_daemon.ml b/src/proto_alpha/lib_delegate/client_daemon.ml index 246556e881bdf78c77eadd457cd40a04642e7ad2..ba2cfdac03b3b0acbefcc66990a0cca6383e5a8b 100644 --- a/src/proto_alpha/lib_delegate/client_daemon.ml +++ b/src/proto_alpha/lib_delegate/client_daemon.ml @@ -129,7 +129,7 @@ module Accuser = struct cctxt ~chains:[chain] () - >>=? fun valid_blocks_stream -> + >>=? fun (valid_blocks_stream, _) -> let canceler = Lwt_canceler.create () in let _ = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> @@ -166,13 +166,6 @@ module VDF = struct let* constants = Protocol.Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in - let* block_stream = - Client_baking_blocks.monitor_valid_blocks - ~next_protocols:(Some [Protocol.hash]) - cctxt - ~chains:[chain] - () - in let canceler = Lwt_canceler.create () in let _ = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> @@ -180,7 +173,7 @@ module VDF = struct let*! _ = Lwt_canceler.cancel canceler in Lwt.return_unit) in - Baking_vdf.start_vdf_worker cctxt ~canceler constants block_stream + Baking_vdf.start_vdf_worker cctxt ~canceler constants chain in let* () = Client_confirmations.wait_for_bootstrapped diff --git a/tezt/tests/vdf_test.ml b/tezt/tests/vdf_test.ml index bdb602836f1733f6fa7bc0e145ae1a455526033a..df7fce213ac72cce49c2d83f998d1539ca9c59ed 100644 --- a/tezt/tests/vdf_test.ml +++ b/tezt/tests/vdf_test.ml @@ -39,19 +39,27 @@ type seed_computation_status = | Vdf_revelation_stage | Computation_finished -let get_seed_computation_status client level = +let get_seed_computation_status ?(info = false) client level = let* seed_status = RPC.Seed.get_seed_status ~block:(string_of_int level) client in - return - (match List.map fst (JSON.as_object seed_status) with + let status = + match List.map fst (JSON.as_object seed_status) with | ["nonce_revelation_stage"] -> Nonce_revelation_stage | ["seed_discriminant"; "seed_challenge"] -> Vdf_revelation_stage | ["computation_finished"] -> Computation_finished - | _ -> assert false) + | _ -> assert false + in + let pp_status = function + | Nonce_revelation_stage -> "nonce revelation stage" + | Vdf_revelation_stage -> "vdf revelation stage" + | Computation_finished -> "computation finished" + in + if info then Log.info "At level %d we are in %s" level (pp_status status) ; + return status -let assert_computation_status client level status = - let* current_status = get_seed_computation_status client level in +let assert_computation_status ?(info = false) client level status = + let* current_status = get_seed_computation_status ~info client level in return @@ assert (current_status = status) let assert_level actual expected = @@ -60,6 +68,10 @@ let assert_level actual expected = assert false) else () +let init_vdf_event_listener vdf_baker injected = + Vdf.on_event vdf_baker (fun Vdf.{name; _} -> + if name = "vdf_revelation_injected.v0" then injected := true) + (* Bakes at most `max_level - min_starting_level + 1` blocks, starting from * a level not lower than `min_starting_level` and finishing exactly * at `max_level`. @@ -90,12 +102,10 @@ let bake_until min_starting_level max_level client node status_check = * sets an event handler for the VDF revelation operation, then bakes * the whole of the VDF revelation period. Checks that (at least one) * VDF revelation operation has been injected *) -let bake_vdf_revelation_period level max_level client node vdf_baker = - let injected = ref false in +let bake_vdf_revelation_period level max_level client node injected = + injected := false ; let* () = assert_computation_status client level Nonce_revelation_stage in - Vdf.on_event vdf_baker (fun Vdf.{name; _} -> - if name = "vdf_revelation_injected.v0" then injected := true) ; let* () = Client.bake_for client in let* level = Node.wait_for_level node (level + 1) in let* () = assert_computation_status client level Vdf_revelation_stage in @@ -104,7 +114,7 @@ let bake_vdf_revelation_period level max_level client node vdf_baker = return @@ assert !injected let check_cycle (blocks_per_cycle, nonce_revelation_threshold) starting_level - client node vdf_baker = + client node injected = (* Check that at the beginning of the cycle we are in the nonce revelation period *) let* level = Node.wait_for_level node starting_level in @@ -130,7 +140,7 @@ let check_cycle (blocks_per_cycle, nonce_revelation_threshold) starting_level (starting_level + blocks_per_cycle - 1) client node - vdf_baker + injected in let* level = Node.wait_for_level node (starting_level + blocks_per_cycle - 1) @@ -145,10 +155,10 @@ let check_cycle (blocks_per_cycle, nonce_revelation_threshold) starting_level assert_level level (starting_level + blocks_per_cycle) ; return level -let check_n_cycles n constants starting_level client node vdf_baker = +let check_n_cycles n constants starting_level client node injected = let rec loop n level = if n > 0 then - let* level = check_cycle constants level client node vdf_baker in + let* level = check_cycle constants level client node injected in loop (n - 1) level else return level in @@ -169,6 +179,12 @@ let test_vdf : Protocol.t list -> unit = let* () = Client.activate_protocol ~protocol client and* vdf_baker = Vdf.init ~protocol node in + (* Track whether a VDF revelation has been injected during the correct period. + * It is set to `false` at the beginning of [bake_vdf_revelation_period] and + * to `true` by a listener for `vdf_revelation_injected` events. *) + let injected = ref false in + init_vdf_event_listener vdf_baker injected ; + let* constants = RPC.get_constants client in let* blocks_per_cycle = return JSON.(constants |-> "blocks_per_cycle" |> as_int) @@ -189,7 +205,7 @@ let test_vdf : Protocol.t list -> unit = operation was injected during the VDF revelation period and that the computation status is set to finished at the end of the cycle *) let* () = - bake_vdf_revelation_period level blocks_per_cycle client node vdf_baker + bake_vdf_revelation_period level blocks_per_cycle client node injected in let* level = Node.wait_for_level node blocks_per_cycle in let* () = assert_computation_status client level Computation_finished in @@ -207,7 +223,7 @@ let test_vdf : Protocol.t list -> unit = level client node - vdf_baker + injected in (* Kill the VDF daemon and bake one cycle with no VDF submission. @@ -221,6 +237,8 @@ let test_vdf : Protocol.t list -> unit = (* Restart a VDF daemon and check correct behaviour after a RANDAO cycle *) let* vdf_baker = Vdf.init ~protocol node in + init_vdf_event_listener vdf_baker injected ; + let* _level = check_n_cycles n_cycles @@ -228,7 +246,7 @@ let test_vdf : Protocol.t list -> unit = level client node - vdf_baker + injected in Vdf.terminate vdf_baker