From 21c6a37826cf1e7a394bc6902e0bc5bc66cc0d55 Mon Sep 17 00:00:00 2001 From: Victor Dumitrescu Date: Thu, 21 Jul 2022 16:50:10 +0200 Subject: [PATCH 1/5] Tezt: small refactor for VDF test --- tezt/tests/vdf_test.ml | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/tezt/tests/vdf_test.ml b/tezt/tests/vdf_test.ml index bdb602836f17..a23f1277c5d0 100644 --- a/tezt/tests/vdf_test.ml +++ b/tezt/tests/vdf_test.ml @@ -60,6 +60,10 @@ let assert_level actual expected = assert false) else () +let init_vdf_event_listener vdf_baker injected = + Vdf.on_event vdf_baker (fun Vdf.{name; _} -> + if name = "vdf_revelation_injected.v0" then injected := true) + (* Bakes at most `max_level - min_starting_level + 1` blocks, starting from * a level not lower than `min_starting_level` and finishing exactly * at `max_level`. @@ -90,12 +94,10 @@ let bake_until min_starting_level max_level client node status_check = * sets an event handler for the VDF revelation operation, then bakes * the whole of the VDF revelation period. Checks that (at least one) * VDF revelation operation has been injected *) -let bake_vdf_revelation_period level max_level client node vdf_baker = - let injected = ref false in +let bake_vdf_revelation_period level max_level client node injected = + injected := false ; let* () = assert_computation_status client level Nonce_revelation_stage in - Vdf.on_event vdf_baker (fun Vdf.{name; _} -> - if name = "vdf_revelation_injected.v0" then injected := true) ; let* () = Client.bake_for client in let* level = Node.wait_for_level node (level + 1) in let* () = assert_computation_status client level Vdf_revelation_stage in @@ -104,7 +106,7 @@ let bake_vdf_revelation_period level max_level client node vdf_baker = return @@ assert !injected let check_cycle (blocks_per_cycle, nonce_revelation_threshold) starting_level - client node vdf_baker = + client node injected = (* Check that at the beginning of the cycle we are in the nonce revelation period *) let* level = Node.wait_for_level node starting_level in @@ -130,7 +132,7 @@ let check_cycle (blocks_per_cycle, nonce_revelation_threshold) starting_level (starting_level + blocks_per_cycle - 1) client node - vdf_baker + injected in let* level = Node.wait_for_level node (starting_level + blocks_per_cycle - 1) @@ -145,10 +147,10 @@ let check_cycle (blocks_per_cycle, nonce_revelation_threshold) starting_level assert_level level (starting_level + blocks_per_cycle) ; return level -let check_n_cycles n constants starting_level client node vdf_baker = +let check_n_cycles n constants starting_level client node injected = let rec loop n level = if n > 0 then - let* level = check_cycle constants level client node vdf_baker in + let* level = check_cycle constants level client node injected in loop (n - 1) level else return level in @@ -169,6 +171,12 @@ let test_vdf : Protocol.t list -> unit = let* () = Client.activate_protocol ~protocol client and* vdf_baker = Vdf.init ~protocol node in + (* Track whether a VDF revelation has been injected during the correct period. + * It is set to `false` at the beginning of [bake_vdf_revelation_period] and + * to `true` by a listener for `vdf_revelation_injected` events. *) + let injected = ref false in + init_vdf_event_listener vdf_baker injected ; + let* constants = RPC.get_constants client in let* blocks_per_cycle = return JSON.(constants |-> "blocks_per_cycle" |> as_int) @@ -189,7 +197,7 @@ let test_vdf : Protocol.t list -> unit = operation was injected during the VDF revelation period and that the computation status is set to finished at the end of the cycle *) let* () = - bake_vdf_revelation_period level blocks_per_cycle client node vdf_baker + bake_vdf_revelation_period level blocks_per_cycle client node injected in let* level = Node.wait_for_level node blocks_per_cycle in let* () = assert_computation_status client level Computation_finished in @@ -207,7 +215,7 @@ let test_vdf : Protocol.t list -> unit = level client node - vdf_baker + injected in (* Kill the VDF daemon and bake one cycle with no VDF submission. @@ -221,6 +229,8 @@ let test_vdf : Protocol.t list -> unit = (* Restart a VDF daemon and check correct behaviour after a RANDAO cycle *) let* vdf_baker = Vdf.init ~protocol node in + init_vdf_event_listener vdf_baker injected ; + let* _level = check_n_cycles n_cycles @@ -228,7 +238,7 @@ let test_vdf : Protocol.t list -> unit = level client node - vdf_baker + injected in Vdf.terminate vdf_baker -- GitLab From 739aab825ab74c460400563e8f1718dbd2d8be89 Mon Sep 17 00:00:00 2001 From: Victor Dumitrescu Date: Wed, 13 Jul 2022 14:40:48 +0200 Subject: [PATCH 2/5] Tezt: more logging for VDF test --- tezt/tests/vdf_test.ml | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/tezt/tests/vdf_test.ml b/tezt/tests/vdf_test.ml index a23f1277c5d0..df7fce213ac7 100644 --- a/tezt/tests/vdf_test.ml +++ b/tezt/tests/vdf_test.ml @@ -39,19 +39,27 @@ type seed_computation_status = | Vdf_revelation_stage | Computation_finished -let get_seed_computation_status client level = +let get_seed_computation_status ?(info = false) client level = let* seed_status = RPC.Seed.get_seed_status ~block:(string_of_int level) client in - return - (match List.map fst (JSON.as_object seed_status) with + let status = + match List.map fst (JSON.as_object seed_status) with | ["nonce_revelation_stage"] -> Nonce_revelation_stage | ["seed_discriminant"; "seed_challenge"] -> Vdf_revelation_stage | ["computation_finished"] -> Computation_finished - | _ -> assert false) + | _ -> assert false + in + let pp_status = function + | Nonce_revelation_stage -> "nonce revelation stage" + | Vdf_revelation_stage -> "vdf revelation stage" + | Computation_finished -> "computation finished" + in + if info then Log.info "At level %d we are in %s" level (pp_status status) ; + return status -let assert_computation_status client level status = - let* current_status = get_seed_computation_status client level in +let assert_computation_status ?(info = false) client level status = + let* current_status = get_seed_computation_status ~info client level in return @@ assert (current_status = status) let assert_level actual expected = -- GitLab From f32165beb49b1ea7388943b7897953f0f86419ea Mon Sep 17 00:00:00 2001 From: Victor Dumitrescu Date: Wed, 20 Jul 2022 10:03:16 +0200 Subject: [PATCH 3/5] Proto/lib_delegate: restructure VDF daemon --- .../lib_delegate/baking_vdf.ml | 113 +++++++++--------- src/proto_alpha/lib_delegate/baking_vdf.ml | 113 +++++++++--------- 2 files changed, 116 insertions(+), 110 deletions(-) diff --git a/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml b/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml index 287c9597404f..283a4799b7a4 100644 --- a/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml +++ b/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml @@ -29,7 +29,9 @@ module Events = Baking_events.VDF module D_Events = Delegate_events.Denunciator open Client_baking_blocks -type status = Not_started | Started | Finished +type vdf_solution = Environment.Vdf.result * Environment.Vdf.proof + +type status = Not_started | Started | Finished of vdf_solution | Injected type 'a state = { cctxt : Protocol_client_context.full; @@ -70,7 +72,7 @@ let is_in_nonce_revelation_period state level = let position_in_cycle = Int32.(sub level (mul current_cycle blocks_per_cycle)) in - Int32.compare position_in_cycle nonce_revelation_threshold < 0 + Int32.compare position_in_cycle nonce_revelation_threshold <= 0 let check_new_cycle state level = let current_cycle = Cycle_repr.of_int32_exn (cycle_of_level state level) in @@ -81,6 +83,21 @@ let check_new_cycle state level = state.cycle <- Some current_cycle ; state.computation_status <- Not_started) +let inject_vdf_revelation cctxt hash chain_id solution = + let open Lwt_result_syntax in + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + let* bytes = + Plugin.RPC.Forge.vdf_revelation + cctxt + (chain, block) + ~branch:hash + ~solution + () + in + let bytes = Signature.concat bytes Signature.zero in + Shell_services.Injection.operation cctxt ~chain bytes + let process_new_block (cctxt : #Protocol_client_context.full) state {hash; chain_id; protocol; next_protocol; level; _} = let open Lwt_result_syntax in @@ -92,60 +109,46 @@ let process_new_block (cctxt : #Protocol_client_context.full) state Events.(emit vdf_info) ("Skipping, still in nonce revelation period (level " ^ level_str ^ ")") >>= fun _ -> return_unit - else if state.computation_status = Finished then - Events.(emit vdf_info) - ("Skipping, computation finished (level " ^ level_str ^ ")") - >>= fun _ -> return_unit + (* enter main loop if we are not in the nonce revelation period and + the expected protocol has been activated *) else - let chain = `Hash chain_id in - let block = `Hash (hash, 0) in - Alpha_services.Seed_computation.get cctxt (chain, block) >>=? fun x -> - (match x with - | Vdf_revelation_stage {seed_discriminant; seed_challenge} -> - if state.computation_status = Started then - Events.(emit vdf_info) - ("Skipping, computation already started (level " ^ level_str ^ ")") - >>= fun () -> return_unit - else - Events.(emit vdf_info) - ("Started to compute VDF (level " ^ level_str ^ ")") - >>= fun () -> - state.computation_status <- Started ; - let discriminant, challenge = - Seed.generate_vdf_setup ~seed_discriminant ~seed_challenge - in - let solution = - Environment.Vdf.prove - discriminant - challenge - state.constants.parametric.vdf_difficulty - in - let* bytes = - Plugin.RPC.Forge.vdf_revelation - cctxt - (chain, block) - ~branch:hash - ~solution - () - in - let bytes = Signature.concat bytes Signature.zero in - let* op_hash = - Shell_services.Injection.operation cctxt ~chain bytes - in - Events.(emit vdf_revelation_injected) - (cycle_of_level state level, Chain_services.to_string chain, op_hash) - >>= fun () -> return_unit - | Nonce_revelation_stage -> - (* this should never actually happen *) - Events.(emit vdf_info) - ("Nonce revelation stage (level " ^ level_str ^ ")") - >>= fun () -> return_unit - | Computation_finished -> - (* this should happen at most once per cycle *) - state.computation_status <- Finished ; - Events.(emit vdf_info) ("Computation finished (level " ^ level_str ^ ")") - >>= fun () -> return_unit) - >>= fun _ -> return_unit + match state.computation_status with + | Started -> return_unit + | Not_started -> ( + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + Alpha_services.Seed_computation.get cctxt (chain, block) >>=? fun x -> + match x with + | Vdf_revelation_stage {seed_discriminant; seed_challenge} -> + Events.(emit vdf_info) + ("Started to compute VDF (level " ^ level_str ^ ")") + >>= fun () -> + state.computation_status <- Started ; + let discriminant, challenge = + Seed.generate_vdf_setup ~seed_discriminant ~seed_challenge + in + let solution = + Environment.Vdf.prove + discriminant + challenge + state.constants.parametric.vdf_difficulty + in + state.computation_status <- Finished solution ; + Events.(emit vdf_info) + ("Finished to compute VDF (level " ^ level_str ^ ")") + >>= fun _ -> return_unit + | Nonce_revelation_stage | Computation_finished -> + (* this should never actually happen if computation + has not been started *) + assert false) + | Finished solution -> + let chain = `Hash chain_id in + let* op_hash = inject_vdf_revelation cctxt hash chain_id solution in + state.computation_status <- Injected ; + Events.(emit vdf_revelation_injected) + (cycle_of_level state level, Chain_services.to_string chain, op_hash) + >>= fun _ -> return_unit + | Injected -> return_unit let start_vdf_worker (cctxt : Protocol_client_context.full) ~canceler constants (block_stream : Client_baking_blocks.block_info tzresult Lwt_stream.t) = diff --git a/src/proto_alpha/lib_delegate/baking_vdf.ml b/src/proto_alpha/lib_delegate/baking_vdf.ml index 287c9597404f..283a4799b7a4 100644 --- a/src/proto_alpha/lib_delegate/baking_vdf.ml +++ b/src/proto_alpha/lib_delegate/baking_vdf.ml @@ -29,7 +29,9 @@ module Events = Baking_events.VDF module D_Events = Delegate_events.Denunciator open Client_baking_blocks -type status = Not_started | Started | Finished +type vdf_solution = Environment.Vdf.result * Environment.Vdf.proof + +type status = Not_started | Started | Finished of vdf_solution | Injected type 'a state = { cctxt : Protocol_client_context.full; @@ -70,7 +72,7 @@ let is_in_nonce_revelation_period state level = let position_in_cycle = Int32.(sub level (mul current_cycle blocks_per_cycle)) in - Int32.compare position_in_cycle nonce_revelation_threshold < 0 + Int32.compare position_in_cycle nonce_revelation_threshold <= 0 let check_new_cycle state level = let current_cycle = Cycle_repr.of_int32_exn (cycle_of_level state level) in @@ -81,6 +83,21 @@ let check_new_cycle state level = state.cycle <- Some current_cycle ; state.computation_status <- Not_started) +let inject_vdf_revelation cctxt hash chain_id solution = + let open Lwt_result_syntax in + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + let* bytes = + Plugin.RPC.Forge.vdf_revelation + cctxt + (chain, block) + ~branch:hash + ~solution + () + in + let bytes = Signature.concat bytes Signature.zero in + Shell_services.Injection.operation cctxt ~chain bytes + let process_new_block (cctxt : #Protocol_client_context.full) state {hash; chain_id; protocol; next_protocol; level; _} = let open Lwt_result_syntax in @@ -92,60 +109,46 @@ let process_new_block (cctxt : #Protocol_client_context.full) state Events.(emit vdf_info) ("Skipping, still in nonce revelation period (level " ^ level_str ^ ")") >>= fun _ -> return_unit - else if state.computation_status = Finished then - Events.(emit vdf_info) - ("Skipping, computation finished (level " ^ level_str ^ ")") - >>= fun _ -> return_unit + (* enter main loop if we are not in the nonce revelation period and + the expected protocol has been activated *) else - let chain = `Hash chain_id in - let block = `Hash (hash, 0) in - Alpha_services.Seed_computation.get cctxt (chain, block) >>=? fun x -> - (match x with - | Vdf_revelation_stage {seed_discriminant; seed_challenge} -> - if state.computation_status = Started then - Events.(emit vdf_info) - ("Skipping, computation already started (level " ^ level_str ^ ")") - >>= fun () -> return_unit - else - Events.(emit vdf_info) - ("Started to compute VDF (level " ^ level_str ^ ")") - >>= fun () -> - state.computation_status <- Started ; - let discriminant, challenge = - Seed.generate_vdf_setup ~seed_discriminant ~seed_challenge - in - let solution = - Environment.Vdf.prove - discriminant - challenge - state.constants.parametric.vdf_difficulty - in - let* bytes = - Plugin.RPC.Forge.vdf_revelation - cctxt - (chain, block) - ~branch:hash - ~solution - () - in - let bytes = Signature.concat bytes Signature.zero in - let* op_hash = - Shell_services.Injection.operation cctxt ~chain bytes - in - Events.(emit vdf_revelation_injected) - (cycle_of_level state level, Chain_services.to_string chain, op_hash) - >>= fun () -> return_unit - | Nonce_revelation_stage -> - (* this should never actually happen *) - Events.(emit vdf_info) - ("Nonce revelation stage (level " ^ level_str ^ ")") - >>= fun () -> return_unit - | Computation_finished -> - (* this should happen at most once per cycle *) - state.computation_status <- Finished ; - Events.(emit vdf_info) ("Computation finished (level " ^ level_str ^ ")") - >>= fun () -> return_unit) - >>= fun _ -> return_unit + match state.computation_status with + | Started -> return_unit + | Not_started -> ( + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + Alpha_services.Seed_computation.get cctxt (chain, block) >>=? fun x -> + match x with + | Vdf_revelation_stage {seed_discriminant; seed_challenge} -> + Events.(emit vdf_info) + ("Started to compute VDF (level " ^ level_str ^ ")") + >>= fun () -> + state.computation_status <- Started ; + let discriminant, challenge = + Seed.generate_vdf_setup ~seed_discriminant ~seed_challenge + in + let solution = + Environment.Vdf.prove + discriminant + challenge + state.constants.parametric.vdf_difficulty + in + state.computation_status <- Finished solution ; + Events.(emit vdf_info) + ("Finished to compute VDF (level " ^ level_str ^ ")") + >>= fun _ -> return_unit + | Nonce_revelation_stage | Computation_finished -> + (* this should never actually happen if computation + has not been started *) + assert false) + | Finished solution -> + let chain = `Hash chain_id in + let* op_hash = inject_vdf_revelation cctxt hash chain_id solution in + state.computation_status <- Injected ; + Events.(emit vdf_revelation_injected) + (cycle_of_level state level, Chain_services.to_string chain, op_hash) + >>= fun _ -> return_unit + | Injected -> return_unit let start_vdf_worker (cctxt : Protocol_client_context.full) ~canceler constants (block_stream : Client_baking_blocks.block_info tzresult Lwt_stream.t) = -- GitLab From 205d60dfc441aa2e78c2fcfaeda57fa64b19dcbe Mon Sep 17 00:00:00 2001 From: Victor Dumitrescu Date: Thu, 21 Jul 2022 17:03:27 +0200 Subject: [PATCH 4/5] Proto/lib_delegate: VDF daemon injects operation at correct level Before, the main loop operated on the stream of blocks. The lengthy VDF computation blocks this loop and all the blocks added to the chain in that interval are consumed at once after it is finished. This resulted in the reveal operation being based on the block after the one where the computation started, which, for high difficulties, would be too old. The stream is now reinitialised after the VDF computation. --- .../lib_delegate/baking_vdf.ml | 71 ++++++++++++++----- .../lib_delegate/baking_vdf.mli | 2 +- .../lib_delegate/client_baking_blocks.ml | 24 ++++--- .../lib_delegate/client_baking_blocks.mli | 2 +- .../lib_delegate/client_daemon.ml | 11 +-- src/proto_alpha/lib_delegate/baking_vdf.ml | 71 ++++++++++++++----- src/proto_alpha/lib_delegate/baking_vdf.mli | 2 +- .../lib_delegate/client_baking_blocks.ml | 24 ++++--- .../lib_delegate/client_baking_blocks.mli | 2 +- src/proto_alpha/lib_delegate/client_daemon.ml | 11 +-- 10 files changed, 140 insertions(+), 80 deletions(-) diff --git a/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml b/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml index 283a4799b7a4..063eb7e5f601 100644 --- a/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml +++ b/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml @@ -36,19 +36,38 @@ type status = Not_started | Started | Finished of vdf_solution | Injected type 'a state = { cctxt : Protocol_client_context.full; constants : Constants.t; - block_stream : (block_info, 'a) result Lwt_stream.t; + mutable block_stream : (block_info, 'a) result Lwt_stream.t; + mutable stream_stopper : RPC_context.stopper; mutable cycle : Cycle_repr.t option; mutable computation_status : status; } -let rec wait_for_first_block ~name stream = - Lwt_stream.get stream >>= function - | None | Some (Error _) -> - Delegate_events.Baking_scheduling.(emit cannot_fetch_event) name - >>= fun () -> - (* NOTE: this is not a tight loop because of Lwt_stream.get *) - wait_for_first_block ~name stream - | Some (Ok bi) -> Lwt.return bi +let init_block_stream_with_stopper cctxt chain = + Client_baking_blocks.monitor_valid_blocks + ~next_protocols:(Some [Protocol.hash]) + cctxt + ~chains:[chain] + () + +let restart_block_stream cctxt chain state = + let open Lwt_result_syntax in + state.stream_stopper () ; + let retries_on_failure = 10 in + let rec try_start_block_stream cctxt chain state retries_on_failure = + let*! p = init_block_stream_with_stopper cctxt chain in + match p with + | Ok (block_stream, stream_stopper) -> + state.block_stream <- block_stream ; + state.stream_stopper <- stream_stopper ; + return_unit + | Error e -> + if retries_on_failure > 0 then + let*! () = Lwt_unix.sleep 10. in + try_start_block_stream cctxt chain state (retries_on_failure - 1) + else fail e + in + let* () = try_start_block_stream cctxt chain state retries_on_failure in + return_unit let log_errors_and_continue ~name p = p >>= function @@ -113,7 +132,9 @@ let process_new_block (cctxt : #Protocol_client_context.full) state the expected protocol has been activated *) else match state.computation_status with - | Started -> return_unit + | Started -> + Events.(emit vdf_info) ("Already started VDF (level " ^ level_str ^ ")") + >>= fun () -> return_unit | Not_started -> ( let chain = `Hash chain_id in let block = `Hash (hash, 0) in @@ -134,35 +155,48 @@ let process_new_block (cctxt : #Protocol_client_context.full) state state.constants.parametric.vdf_difficulty in state.computation_status <- Finished solution ; - Events.(emit vdf_info) - ("Finished to compute VDF (level " ^ level_str ^ ")") - >>= fun _ -> return_unit + + (* `Vdf.prove` is a long computation. We reset the block stream in + * order to not process all the blocks added to the chain during + * this time and skip straight to the current head. *) + restart_block_stream cctxt chain state | Nonce_revelation_stage | Computation_finished -> (* this should never actually happen if computation has not been started *) assert false) | Finished solution -> + Events.(emit vdf_info) ("Finished VDF (level " ^ level_str ^ ")") + >>= fun () -> let chain = `Hash chain_id in let* op_hash = inject_vdf_revelation cctxt hash chain_id solution in state.computation_status <- Injected ; Events.(emit vdf_revelation_injected) (cycle_of_level state level, Chain_services.to_string chain, op_hash) >>= fun _ -> return_unit - | Injected -> return_unit + | Injected -> + Events.(emit vdf_info) + ("Skipping, already injected VDF (level " ^ level_str ^ ")") + >>= fun () -> return_unit let start_vdf_worker (cctxt : Protocol_client_context.full) ~canceler constants - (block_stream : Client_baking_blocks.block_info tzresult Lwt_stream.t) = + chain = + let open Lwt_result_syntax in + let* block_stream, stream_stopper = + init_block_stream_with_stopper cctxt chain + in let state = { cctxt; constants; block_stream; + stream_stopper; cycle = None; computation_status = Not_started; } in - Lwt_canceler.on_cancel canceler (fun () -> Lwt.return_unit) ; - wait_for_first_block ~name state.block_stream >>= fun _first_event -> + Lwt_canceler.on_cancel canceler (fun () -> + state.stream_stopper () ; + Lwt.return_unit) ; let rec worker_loop () = Lwt.choose [ @@ -173,8 +207,9 @@ let start_vdf_worker (cctxt : Protocol_client_context.full) ~canceler constants | `Termination -> return_unit | `Block (None | Some (Error _)) -> (* exit when the node is unavailable *) + state.stream_stopper () ; Events.(emit vdf_daemon_connection_lost) name >>= fun () -> - fail Baking_errors.Node_connection_lost + tzfail Baking_errors.Node_connection_lost | `Block (Some (Ok bi)) -> log_errors_and_continue ~name @@ process_new_block cctxt state bi >>= fun () -> worker_loop () diff --git a/src/proto_014_PtKathma/lib_delegate/baking_vdf.mli b/src/proto_014_PtKathma/lib_delegate/baking_vdf.mli index 460a4b13b890..84751f723af8 100644 --- a/src/proto_014_PtKathma/lib_delegate/baking_vdf.mli +++ b/src/proto_014_PtKathma/lib_delegate/baking_vdf.mli @@ -29,5 +29,5 @@ val start_vdf_worker : Protocol_client_context.full -> canceler:Lwt_canceler.t -> Constants.t -> - Client_baking_blocks.block_info tzresult Lwt_stream.t -> + Chain_services.chain -> unit tzresult Lwt.t diff --git a/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.ml b/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.ml index a67b9c14bde3..df7b2d7f977d 100644 --- a/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.ml +++ b/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.ml @@ -145,18 +145,20 @@ end let monitor_valid_blocks cctxt ?chains ?protocols ~next_protocols () = Monitor_services.valid_blocks cctxt ?chains ?protocols ?next_protocols () - >>=? fun (block_stream, _stop) -> + >>=? fun (block_stream, stop) -> return - (Lwt_stream.map_s - (fun ((chain, block), header) -> - Block_seen_event.(Event.emit (make block header (`Valid_blocks chain))) - >>=? fun () -> - raw_info - cctxt - ~chain:(`Hash chain) - block - header.Tezos_base.Block_header.shell) - block_stream) + ( Lwt_stream.map_s + (fun ((chain, block), header) -> + Block_seen_event.( + Event.emit (make block header (`Valid_blocks chain))) + >>=? fun () -> + raw_info + cctxt + ~chain:(`Hash chain) + block + header.Tezos_base.Block_header.shell) + block_stream, + stop ) let monitor_heads cctxt ~next_protocols chain = Monitor_services.heads cctxt ?next_protocols chain diff --git a/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.mli b/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.mli index fdf29d613ddb..e5b561aeb5b9 100644 --- a/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.mli +++ b/src/proto_014_PtKathma/lib_delegate/client_baking_blocks.mli @@ -51,7 +51,7 @@ val monitor_valid_blocks : ?protocols:Protocol_hash.t list -> next_protocols:Protocol_hash.t list option -> unit -> - block_info tzresult Lwt_stream.t tzresult Lwt.t + (block_info tzresult Lwt_stream.t * RPC_context.stopper) tzresult Lwt.t val monitor_heads : #Protocol_client_context.rpc_context -> diff --git a/src/proto_014_PtKathma/lib_delegate/client_daemon.ml b/src/proto_014_PtKathma/lib_delegate/client_daemon.ml index 246556e881bd..ba2cfdac03b3 100644 --- a/src/proto_014_PtKathma/lib_delegate/client_daemon.ml +++ b/src/proto_014_PtKathma/lib_delegate/client_daemon.ml @@ -129,7 +129,7 @@ module Accuser = struct cctxt ~chains:[chain] () - >>=? fun valid_blocks_stream -> + >>=? fun (valid_blocks_stream, _) -> let canceler = Lwt_canceler.create () in let _ = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> @@ -166,13 +166,6 @@ module VDF = struct let* constants = Protocol.Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in - let* block_stream = - Client_baking_blocks.monitor_valid_blocks - ~next_protocols:(Some [Protocol.hash]) - cctxt - ~chains:[chain] - () - in let canceler = Lwt_canceler.create () in let _ = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> @@ -180,7 +173,7 @@ module VDF = struct let*! _ = Lwt_canceler.cancel canceler in Lwt.return_unit) in - Baking_vdf.start_vdf_worker cctxt ~canceler constants block_stream + Baking_vdf.start_vdf_worker cctxt ~canceler constants chain in let* () = Client_confirmations.wait_for_bootstrapped diff --git a/src/proto_alpha/lib_delegate/baking_vdf.ml b/src/proto_alpha/lib_delegate/baking_vdf.ml index 283a4799b7a4..063eb7e5f601 100644 --- a/src/proto_alpha/lib_delegate/baking_vdf.ml +++ b/src/proto_alpha/lib_delegate/baking_vdf.ml @@ -36,19 +36,38 @@ type status = Not_started | Started | Finished of vdf_solution | Injected type 'a state = { cctxt : Protocol_client_context.full; constants : Constants.t; - block_stream : (block_info, 'a) result Lwt_stream.t; + mutable block_stream : (block_info, 'a) result Lwt_stream.t; + mutable stream_stopper : RPC_context.stopper; mutable cycle : Cycle_repr.t option; mutable computation_status : status; } -let rec wait_for_first_block ~name stream = - Lwt_stream.get stream >>= function - | None | Some (Error _) -> - Delegate_events.Baking_scheduling.(emit cannot_fetch_event) name - >>= fun () -> - (* NOTE: this is not a tight loop because of Lwt_stream.get *) - wait_for_first_block ~name stream - | Some (Ok bi) -> Lwt.return bi +let init_block_stream_with_stopper cctxt chain = + Client_baking_blocks.monitor_valid_blocks + ~next_protocols:(Some [Protocol.hash]) + cctxt + ~chains:[chain] + () + +let restart_block_stream cctxt chain state = + let open Lwt_result_syntax in + state.stream_stopper () ; + let retries_on_failure = 10 in + let rec try_start_block_stream cctxt chain state retries_on_failure = + let*! p = init_block_stream_with_stopper cctxt chain in + match p with + | Ok (block_stream, stream_stopper) -> + state.block_stream <- block_stream ; + state.stream_stopper <- stream_stopper ; + return_unit + | Error e -> + if retries_on_failure > 0 then + let*! () = Lwt_unix.sleep 10. in + try_start_block_stream cctxt chain state (retries_on_failure - 1) + else fail e + in + let* () = try_start_block_stream cctxt chain state retries_on_failure in + return_unit let log_errors_and_continue ~name p = p >>= function @@ -113,7 +132,9 @@ let process_new_block (cctxt : #Protocol_client_context.full) state the expected protocol has been activated *) else match state.computation_status with - | Started -> return_unit + | Started -> + Events.(emit vdf_info) ("Already started VDF (level " ^ level_str ^ ")") + >>= fun () -> return_unit | Not_started -> ( let chain = `Hash chain_id in let block = `Hash (hash, 0) in @@ -134,35 +155,48 @@ let process_new_block (cctxt : #Protocol_client_context.full) state state.constants.parametric.vdf_difficulty in state.computation_status <- Finished solution ; - Events.(emit vdf_info) - ("Finished to compute VDF (level " ^ level_str ^ ")") - >>= fun _ -> return_unit + + (* `Vdf.prove` is a long computation. We reset the block stream in + * order to not process all the blocks added to the chain during + * this time and skip straight to the current head. *) + restart_block_stream cctxt chain state | Nonce_revelation_stage | Computation_finished -> (* this should never actually happen if computation has not been started *) assert false) | Finished solution -> + Events.(emit vdf_info) ("Finished VDF (level " ^ level_str ^ ")") + >>= fun () -> let chain = `Hash chain_id in let* op_hash = inject_vdf_revelation cctxt hash chain_id solution in state.computation_status <- Injected ; Events.(emit vdf_revelation_injected) (cycle_of_level state level, Chain_services.to_string chain, op_hash) >>= fun _ -> return_unit - | Injected -> return_unit + | Injected -> + Events.(emit vdf_info) + ("Skipping, already injected VDF (level " ^ level_str ^ ")") + >>= fun () -> return_unit let start_vdf_worker (cctxt : Protocol_client_context.full) ~canceler constants - (block_stream : Client_baking_blocks.block_info tzresult Lwt_stream.t) = + chain = + let open Lwt_result_syntax in + let* block_stream, stream_stopper = + init_block_stream_with_stopper cctxt chain + in let state = { cctxt; constants; block_stream; + stream_stopper; cycle = None; computation_status = Not_started; } in - Lwt_canceler.on_cancel canceler (fun () -> Lwt.return_unit) ; - wait_for_first_block ~name state.block_stream >>= fun _first_event -> + Lwt_canceler.on_cancel canceler (fun () -> + state.stream_stopper () ; + Lwt.return_unit) ; let rec worker_loop () = Lwt.choose [ @@ -173,8 +207,9 @@ let start_vdf_worker (cctxt : Protocol_client_context.full) ~canceler constants | `Termination -> return_unit | `Block (None | Some (Error _)) -> (* exit when the node is unavailable *) + state.stream_stopper () ; Events.(emit vdf_daemon_connection_lost) name >>= fun () -> - fail Baking_errors.Node_connection_lost + tzfail Baking_errors.Node_connection_lost | `Block (Some (Ok bi)) -> log_errors_and_continue ~name @@ process_new_block cctxt state bi >>= fun () -> worker_loop () diff --git a/src/proto_alpha/lib_delegate/baking_vdf.mli b/src/proto_alpha/lib_delegate/baking_vdf.mli index 460a4b13b890..84751f723af8 100644 --- a/src/proto_alpha/lib_delegate/baking_vdf.mli +++ b/src/proto_alpha/lib_delegate/baking_vdf.mli @@ -29,5 +29,5 @@ val start_vdf_worker : Protocol_client_context.full -> canceler:Lwt_canceler.t -> Constants.t -> - Client_baking_blocks.block_info tzresult Lwt_stream.t -> + Chain_services.chain -> unit tzresult Lwt.t diff --git a/src/proto_alpha/lib_delegate/client_baking_blocks.ml b/src/proto_alpha/lib_delegate/client_baking_blocks.ml index a67b9c14bde3..df7b2d7f977d 100644 --- a/src/proto_alpha/lib_delegate/client_baking_blocks.ml +++ b/src/proto_alpha/lib_delegate/client_baking_blocks.ml @@ -145,18 +145,20 @@ end let monitor_valid_blocks cctxt ?chains ?protocols ~next_protocols () = Monitor_services.valid_blocks cctxt ?chains ?protocols ?next_protocols () - >>=? fun (block_stream, _stop) -> + >>=? fun (block_stream, stop) -> return - (Lwt_stream.map_s - (fun ((chain, block), header) -> - Block_seen_event.(Event.emit (make block header (`Valid_blocks chain))) - >>=? fun () -> - raw_info - cctxt - ~chain:(`Hash chain) - block - header.Tezos_base.Block_header.shell) - block_stream) + ( Lwt_stream.map_s + (fun ((chain, block), header) -> + Block_seen_event.( + Event.emit (make block header (`Valid_blocks chain))) + >>=? fun () -> + raw_info + cctxt + ~chain:(`Hash chain) + block + header.Tezos_base.Block_header.shell) + block_stream, + stop ) let monitor_heads cctxt ~next_protocols chain = Monitor_services.heads cctxt ?next_protocols chain diff --git a/src/proto_alpha/lib_delegate/client_baking_blocks.mli b/src/proto_alpha/lib_delegate/client_baking_blocks.mli index fdf29d613ddb..e5b561aeb5b9 100644 --- a/src/proto_alpha/lib_delegate/client_baking_blocks.mli +++ b/src/proto_alpha/lib_delegate/client_baking_blocks.mli @@ -51,7 +51,7 @@ val monitor_valid_blocks : ?protocols:Protocol_hash.t list -> next_protocols:Protocol_hash.t list option -> unit -> - block_info tzresult Lwt_stream.t tzresult Lwt.t + (block_info tzresult Lwt_stream.t * RPC_context.stopper) tzresult Lwt.t val monitor_heads : #Protocol_client_context.rpc_context -> diff --git a/src/proto_alpha/lib_delegate/client_daemon.ml b/src/proto_alpha/lib_delegate/client_daemon.ml index 246556e881bd..ba2cfdac03b3 100644 --- a/src/proto_alpha/lib_delegate/client_daemon.ml +++ b/src/proto_alpha/lib_delegate/client_daemon.ml @@ -129,7 +129,7 @@ module Accuser = struct cctxt ~chains:[chain] () - >>=? fun valid_blocks_stream -> + >>=? fun (valid_blocks_stream, _) -> let canceler = Lwt_canceler.create () in let _ = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> @@ -166,13 +166,6 @@ module VDF = struct let* constants = Protocol.Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in - let* block_stream = - Client_baking_blocks.monitor_valid_blocks - ~next_protocols:(Some [Protocol.hash]) - cctxt - ~chains:[chain] - () - in let canceler = Lwt_canceler.create () in let _ = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> @@ -180,7 +173,7 @@ module VDF = struct let*! _ = Lwt_canceler.cancel canceler in Lwt.return_unit) in - Baking_vdf.start_vdf_worker cctxt ~canceler constants block_stream + Baking_vdf.start_vdf_worker cctxt ~canceler constants chain in let* () = Client_confirmations.wait_for_bootstrapped -- GitLab From da7b3e75c0aa26d70c652af9b1d0aa502e04ff26 Mon Sep 17 00:00:00 2001 From: Victor Dumitrescu Date: Fri, 22 Jul 2022 10:52:17 +0200 Subject: [PATCH 5/5] Proto/lib_delegate: use Lwt_result_syntax throughout VDF daemon --- .../lib_delegate/baking_vdf.ml | 85 ++++++++++++------- src/proto_alpha/lib_delegate/baking_vdf.ml | 85 ++++++++++++------- 2 files changed, 106 insertions(+), 64 deletions(-) diff --git a/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml b/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml index 063eb7e5f601..2e17f2c4ea31 100644 --- a/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml +++ b/src/proto_014_PtKathma/lib_delegate/baking_vdf.ml @@ -25,9 +25,9 @@ open Protocol open Alpha_context +open Client_baking_blocks module Events = Baking_events.VDF module D_Events = Delegate_events.Denunciator -open Client_baking_blocks type vdf_solution = Environment.Vdf.result * Environment.Vdf.proof @@ -70,8 +70,10 @@ let restart_block_stream cctxt chain state = return_unit let log_errors_and_continue ~name p = - p >>= function - | Ok () -> Lwt.return_unit + let open Lwt_syntax in + let* p = p in + match p with + | Ok () -> return_unit | Error errs -> Events.(emit vdf_daemon_error) (name, errs) let cycle_of_level state level = @@ -123,27 +125,36 @@ let process_new_block (cctxt : #Protocol_client_context.full) state let level_str = Int32.to_string (Raw_level.to_int32 level) in check_new_cycle state level ; if Protocol_hash.(protocol <> next_protocol) then - D_Events.(emit protocol_change_detected) () >>= fun () -> return_unit + let*! () = D_Events.(emit protocol_change_detected) () in + return_unit else if is_in_nonce_revelation_period state level then - Events.(emit vdf_info) - ("Skipping, still in nonce revelation period (level " ^ level_str ^ ")") - >>= fun _ -> return_unit + let*! () = + Events.(emit vdf_info) + ("Skipping, still in nonce revelation period (level " ^ level_str ^ ")") + in + return_unit (* enter main loop if we are not in the nonce revelation period and the expected protocol has been activated *) else match state.computation_status with | Started -> - Events.(emit vdf_info) ("Already started VDF (level " ^ level_str ^ ")") - >>= fun () -> return_unit + let*! () = + Events.(emit vdf_info) + ("Already started VDF (level " ^ level_str ^ ")") + in + return_unit | Not_started -> ( let chain = `Hash chain_id in let block = `Hash (hash, 0) in - Alpha_services.Seed_computation.get cctxt (chain, block) >>=? fun x -> - match x with + let* seed_computation = + Alpha_services.Seed_computation.get cctxt (chain, block) + in + match seed_computation with | Vdf_revelation_stage {seed_discriminant; seed_challenge} -> - Events.(emit vdf_info) - ("Started to compute VDF (level " ^ level_str ^ ")") - >>= fun () -> + let*! () = + Events.(emit vdf_info) + ("Started to compute VDF (level " ^ level_str ^ ")") + in state.computation_status <- Started ; let discriminant, challenge = Seed.generate_vdf_setup ~seed_discriminant ~seed_challenge @@ -165,18 +176,23 @@ let process_new_block (cctxt : #Protocol_client_context.full) state has not been started *) assert false) | Finished solution -> - Events.(emit vdf_info) ("Finished VDF (level " ^ level_str ^ ")") - >>= fun () -> + let*! () = + Events.(emit vdf_info) ("Finished VDF (level " ^ level_str ^ ")") + in let chain = `Hash chain_id in let* op_hash = inject_vdf_revelation cctxt hash chain_id solution in state.computation_status <- Injected ; - Events.(emit vdf_revelation_injected) - (cycle_of_level state level, Chain_services.to_string chain, op_hash) - >>= fun _ -> return_unit + let*! () = + Events.(emit vdf_revelation_injected) + (cycle_of_level state level, Chain_services.to_string chain, op_hash) + in + return_unit | Injected -> - Events.(emit vdf_info) - ("Skipping, already injected VDF (level " ^ level_str ^ ")") - >>= fun () -> return_unit + let*! () = + Events.(emit vdf_info) + ("Skipping, already injected VDF (level " ^ level_str ^ ")") + in + return_unit let start_vdf_worker (cctxt : Protocol_client_context.full) ~canceler constants chain = @@ -198,20 +214,25 @@ let start_vdf_worker (cctxt : Protocol_client_context.full) ~canceler constants state.stream_stopper () ; Lwt.return_unit) ; let rec worker_loop () = - Lwt.choose - [ - (Lwt_exit.clean_up_starts >|= fun _ -> `Termination); - (Lwt_stream.get state.block_stream >|= fun e -> `Block e); - ] - >>= function + let*! b = + Lwt.choose + [ + (Lwt_exit.clean_up_starts >|= fun _ -> `Termination); + (Lwt_stream.get state.block_stream >|= fun e -> `Block e); + ] + in + match b with | `Termination -> return_unit | `Block (None | Some (Error _)) -> (* exit when the node is unavailable *) state.stream_stopper () ; - Events.(emit vdf_daemon_connection_lost) name >>= fun () -> + let*! () = Events.(emit vdf_daemon_connection_lost) name in tzfail Baking_errors.Node_connection_lost | `Block (Some (Ok bi)) -> - log_errors_and_continue ~name @@ process_new_block cctxt state bi - >>= fun () -> worker_loop () + let*! () = + log_errors_and_continue ~name @@ process_new_block cctxt state bi + in + worker_loop () in - Events.(emit vdf_daemon_start) name >>= fun () -> worker_loop () + let*! () = Events.(emit vdf_daemon_start) name in + worker_loop () diff --git a/src/proto_alpha/lib_delegate/baking_vdf.ml b/src/proto_alpha/lib_delegate/baking_vdf.ml index 063eb7e5f601..2e17f2c4ea31 100644 --- a/src/proto_alpha/lib_delegate/baking_vdf.ml +++ b/src/proto_alpha/lib_delegate/baking_vdf.ml @@ -25,9 +25,9 @@ open Protocol open Alpha_context +open Client_baking_blocks module Events = Baking_events.VDF module D_Events = Delegate_events.Denunciator -open Client_baking_blocks type vdf_solution = Environment.Vdf.result * Environment.Vdf.proof @@ -70,8 +70,10 @@ let restart_block_stream cctxt chain state = return_unit let log_errors_and_continue ~name p = - p >>= function - | Ok () -> Lwt.return_unit + let open Lwt_syntax in + let* p = p in + match p with + | Ok () -> return_unit | Error errs -> Events.(emit vdf_daemon_error) (name, errs) let cycle_of_level state level = @@ -123,27 +125,36 @@ let process_new_block (cctxt : #Protocol_client_context.full) state let level_str = Int32.to_string (Raw_level.to_int32 level) in check_new_cycle state level ; if Protocol_hash.(protocol <> next_protocol) then - D_Events.(emit protocol_change_detected) () >>= fun () -> return_unit + let*! () = D_Events.(emit protocol_change_detected) () in + return_unit else if is_in_nonce_revelation_period state level then - Events.(emit vdf_info) - ("Skipping, still in nonce revelation period (level " ^ level_str ^ ")") - >>= fun _ -> return_unit + let*! () = + Events.(emit vdf_info) + ("Skipping, still in nonce revelation period (level " ^ level_str ^ ")") + in + return_unit (* enter main loop if we are not in the nonce revelation period and the expected protocol has been activated *) else match state.computation_status with | Started -> - Events.(emit vdf_info) ("Already started VDF (level " ^ level_str ^ ")") - >>= fun () -> return_unit + let*! () = + Events.(emit vdf_info) + ("Already started VDF (level " ^ level_str ^ ")") + in + return_unit | Not_started -> ( let chain = `Hash chain_id in let block = `Hash (hash, 0) in - Alpha_services.Seed_computation.get cctxt (chain, block) >>=? fun x -> - match x with + let* seed_computation = + Alpha_services.Seed_computation.get cctxt (chain, block) + in + match seed_computation with | Vdf_revelation_stage {seed_discriminant; seed_challenge} -> - Events.(emit vdf_info) - ("Started to compute VDF (level " ^ level_str ^ ")") - >>= fun () -> + let*! () = + Events.(emit vdf_info) + ("Started to compute VDF (level " ^ level_str ^ ")") + in state.computation_status <- Started ; let discriminant, challenge = Seed.generate_vdf_setup ~seed_discriminant ~seed_challenge @@ -165,18 +176,23 @@ let process_new_block (cctxt : #Protocol_client_context.full) state has not been started *) assert false) | Finished solution -> - Events.(emit vdf_info) ("Finished VDF (level " ^ level_str ^ ")") - >>= fun () -> + let*! () = + Events.(emit vdf_info) ("Finished VDF (level " ^ level_str ^ ")") + in let chain = `Hash chain_id in let* op_hash = inject_vdf_revelation cctxt hash chain_id solution in state.computation_status <- Injected ; - Events.(emit vdf_revelation_injected) - (cycle_of_level state level, Chain_services.to_string chain, op_hash) - >>= fun _ -> return_unit + let*! () = + Events.(emit vdf_revelation_injected) + (cycle_of_level state level, Chain_services.to_string chain, op_hash) + in + return_unit | Injected -> - Events.(emit vdf_info) - ("Skipping, already injected VDF (level " ^ level_str ^ ")") - >>= fun () -> return_unit + let*! () = + Events.(emit vdf_info) + ("Skipping, already injected VDF (level " ^ level_str ^ ")") + in + return_unit let start_vdf_worker (cctxt : Protocol_client_context.full) ~canceler constants chain = @@ -198,20 +214,25 @@ let start_vdf_worker (cctxt : Protocol_client_context.full) ~canceler constants state.stream_stopper () ; Lwt.return_unit) ; let rec worker_loop () = - Lwt.choose - [ - (Lwt_exit.clean_up_starts >|= fun _ -> `Termination); - (Lwt_stream.get state.block_stream >|= fun e -> `Block e); - ] - >>= function + let*! b = + Lwt.choose + [ + (Lwt_exit.clean_up_starts >|= fun _ -> `Termination); + (Lwt_stream.get state.block_stream >|= fun e -> `Block e); + ] + in + match b with | `Termination -> return_unit | `Block (None | Some (Error _)) -> (* exit when the node is unavailable *) state.stream_stopper () ; - Events.(emit vdf_daemon_connection_lost) name >>= fun () -> + let*! () = Events.(emit vdf_daemon_connection_lost) name in tzfail Baking_errors.Node_connection_lost | `Block (Some (Ok bi)) -> - log_errors_and_continue ~name @@ process_new_block cctxt state bi - >>= fun () -> worker_loop () + let*! () = + log_errors_and_continue ~name @@ process_new_block cctxt state bi + in + worker_loop () in - Events.(emit vdf_daemon_start) name >>= fun () -> worker_loop () + let*! () = Events.(emit vdf_daemon_start) name in + worker_loop () -- GitLab