From cb85eede6556f2aaf60eb3aaa2fe761a008bc46a Mon Sep 17 00:00:00 2001 From: vbot Date: Wed, 25 Jan 2023 17:53:09 +0100 Subject: [PATCH 01/14] Alpha/Baker: remove unused voting power field --- src/proto_alpha/lib_delegate/baking_lib.ml | 5 +-- .../lib_delegate/baking_scheduling.ml | 14 +++--- src/proto_alpha/lib_delegate/baking_state.ml | 44 +++++++------------ src/proto_alpha/lib_delegate/baking_state.mli | 10 +---- .../lib_delegate/operation_worker.ml | 9 +--- .../lib_delegate/operation_worker.mli | 7 +-- .../lib_delegate/state_transitions.ml | 7 ++- 7 files changed, 32 insertions(+), 64 deletions(-) diff --git a/src/proto_alpha/lib_delegate/baking_lib.ml b/src/proto_alpha/lib_delegate/baking_lib.ml index 8ce23bda6549..82c54511f347 100644 --- a/src/proto_alpha/lib_delegate/baking_lib.ml +++ b/src/proto_alpha/lib_delegate/baking_lib.ml @@ -301,7 +301,7 @@ let propose (cctxt : Protocol_client_context.full) ?minimal_fees | Some _ -> propose_at_next_level ~minimal_timestamp state | None -> ( match endorsement_quorum state with - | Some (voting_power, endorsement_qc) -> + | Some (_voting_power, endorsement_qc) -> let state = { state with @@ -323,8 +323,7 @@ let propose (cctxt : Protocol_client_context.full) ?minimal_fees let* state = State_transitions.step state - (Baking_state.Quorum_reached - (candidate, voting_power, endorsement_qc)) + (Baking_state.Quorum_reached (candidate, endorsement_qc)) >>= do_action (* this will register the elected block *) in diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index afa3a58e9d7b..437c290c1965 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -187,18 +187,14 @@ let rec wait_next_event ~timeout loop_state = loop_state.last_future_block_event <- None ; return_some (New_proposal proposal) | `QC_reached - (Some - (Operation_worker.Prequorum_reached - (candidate, voting_power, preendorsement_qc))) -> + (Some (Operation_worker.Prequorum_reached (candidate, preendorsement_qc))) + -> loop_state.last_get_qc_event <- None ; - return_some - (Prequorum_reached (candidate, voting_power, preendorsement_qc)) + return_some (Prequorum_reached (candidate, preendorsement_qc)) | `QC_reached - (Some - (Operation_worker.Quorum_reached - (candidate, voting_power, endorsement_qc))) -> + (Some (Operation_worker.Quorum_reached (candidate, endorsement_qc))) -> loop_state.last_get_qc_event <- None ; - return_some (Quorum_reached (candidate, voting_power, endorsement_qc)) + return_some (Quorum_reached (candidate, endorsement_qc)) | `Timeout e -> return_some (Timeout e) (** From the current [state], the function returns an optional diff --git a/src/proto_alpha/lib_delegate/baking_state.ml b/src/proto_alpha/lib_delegate/baking_state.ml index 2626d60cb325..b1dc70cc67a7 100644 --- a/src/proto_alpha/lib_delegate/baking_state.ml +++ b/src/proto_alpha/lib_delegate/baking_state.ml @@ -377,18 +377,12 @@ let timeout_kind_encoding = (fun at_round -> Time_to_bake_next_level {at_round}); ] -type voting_power = int - type event = | New_proposal of proposal | Prequorum_reached of - Operation_worker.candidate - * voting_power - * Kind.preendorsement operation list + Operation_worker.candidate * Kind.preendorsement operation list | Quorum_reached of - Operation_worker.candidate - * voting_power - * Kind.endorsement operation list + Operation_worker.candidate * Kind.endorsement operation list | Timeout of timeout_kind let event_encoding = @@ -404,31 +398,28 @@ let event_encoding = case (Tag 1) ~title:"Prequorum_reached" - (tup3 + (tup2 Operation_worker.candidate_encoding - Data_encoding.int31 (Data_encoding.list (dynamic_size Operation.encoding))) (function - | Prequorum_reached (candidate, voting_power, ops) -> - Some (candidate, voting_power, List.map Operation.pack ops) + | Prequorum_reached (candidate, ops) -> + Some (candidate, List.map Operation.pack ops) | _ -> None) - (fun (candidate, voting_power, ops) -> + (fun (candidate, ops) -> Prequorum_reached - (candidate, voting_power, Operation_pool.filter_preendorsements ops)); + (candidate, Operation_pool.filter_preendorsements ops)); case (Tag 2) ~title:"Quorum_reached" - (tup3 + (tup2 Operation_worker.candidate_encoding - Data_encoding.int31 (Data_encoding.list (dynamic_size Operation.encoding))) (function - | Quorum_reached (candidate, voting_power, ops) -> - Some (candidate, voting_power, List.map Operation.pack ops) + | Quorum_reached (candidate, ops) -> + Some (candidate, List.map Operation.pack ops) | _ -> None) - (fun (candidate, voting_power, ops) -> - Quorum_reached - (candidate, voting_power, Operation_pool.filter_endorsements ops)); + (fun (candidate, ops) -> + Quorum_reached (candidate, Operation_pool.filter_endorsements ops)); case (Tag 3) ~title:"Timeout" @@ -885,23 +876,20 @@ let pp_event fmt = function "new proposal received: %a" pp_block_info proposal.block - | Prequorum_reached (candidate, voting_power, preendos) -> + | Prequorum_reached (candidate, preendos) -> Format.fprintf fmt - "pre-quorum reached with %d preendorsements (power: %d) for %a at \ - round %a" + "prequorum reached with %d preendorsements for %a at round %a" (List.length preendos) - voting_power Block_hash.pp candidate.Operation_worker.hash Round.pp candidate.round_watched - | Quorum_reached (candidate, voting_power, endos) -> + | Quorum_reached (candidate, endos) -> Format.fprintf fmt - "quorum reached with %d endorsements (power: %d) for %a at round %a" + "quorum reached with %d endorsements for %a at round %a" (List.length endos) - voting_power Block_hash.pp candidate.Operation_worker.hash Round.pp diff --git a/src/proto_alpha/lib_delegate/baking_state.mli b/src/proto_alpha/lib_delegate/baking_state.mli index 97acd240588e..de951b78d22d 100644 --- a/src/proto_alpha/lib_delegate/baking_state.mli +++ b/src/proto_alpha/lib_delegate/baking_state.mli @@ -155,18 +155,12 @@ type timeout_kind = val timeout_kind_encoding : timeout_kind Data_encoding.t -type voting_power = int - type event = | New_proposal of proposal | Prequorum_reached of - Operation_worker.candidate - * voting_power - * Kind.preendorsement operation list + Operation_worker.candidate * Kind.preendorsement operation list | Quorum_reached of - Operation_worker.candidate - * voting_power - * Kind.endorsement operation list + Operation_worker.candidate * Kind.endorsement operation list | Timeout of timeout_kind val event_encoding : event Data_encoding.t diff --git a/src/proto_alpha/lib_delegate/operation_worker.ml b/src/proto_alpha/lib_delegate/operation_worker.ml index 66cec5723476..1e20efa00156 100644 --- a/src/proto_alpha/lib_delegate/operation_worker.ml +++ b/src/proto_alpha/lib_delegate/operation_worker.ml @@ -162,12 +162,9 @@ let candidate_encoding = (req "round_watched" Round.encoding) (req "payload_hash_watched" Block_payload_hash.encoding)) -type voting_power = int - type event = - | Prequorum_reached of - candidate * voting_power * Kind.preendorsement operation list - | Quorum_reached of candidate * voting_power * Kind.endorsement operation list + | Prequorum_reached of candidate * Kind.preendorsement operation list + | Quorum_reached of candidate * Kind.endorsement operation list type pqc_watched = { candidate_watched : candidate; @@ -310,7 +307,6 @@ let update_monitoring ?(should_lock = true) state ops = (Some (Prequorum_reached ( candidate_watched, - proposal_watched.current_voting_power, List.rev proposal_watched.preendorsements_received ))) ; (* Once the event has been emitted, we cancel the monitoring *) cancel_monitoring state ; @@ -370,7 +366,6 @@ let update_monitoring ?(should_lock = true) state ops = (Some (Quorum_reached ( candidate_watched, - proposal_watched.current_voting_power, List.rev proposal_watched.endorsements_received ))) ; (* Once the event has been emitted, we cancel the monitoring *) cancel_monitoring state ; diff --git a/src/proto_alpha/lib_delegate/operation_worker.mli b/src/proto_alpha/lib_delegate/operation_worker.mli index eecbc990f2d1..1c12e7356c37 100644 --- a/src/proto_alpha/lib_delegate/operation_worker.mli +++ b/src/proto_alpha/lib_delegate/operation_worker.mli @@ -41,12 +41,9 @@ type candidate = { val candidate_encoding : candidate Data_encoding.t -type voting_power = int - type event = - | Prequorum_reached of - candidate * voting_power * Kind.preendorsement operation list - | Quorum_reached of candidate * voting_power * Kind.endorsement operation list + | Prequorum_reached of candidate * Kind.preendorsement operation list + | Quorum_reached of candidate * Kind.endorsement operation list (** {1 Constructors}*) diff --git a/src/proto_alpha/lib_delegate/state_transitions.ml b/src/proto_alpha/lib_delegate/state_transitions.ml index 5cc55044263d..7d6fe18f2cba 100644 --- a/src/proto_alpha/lib_delegate/state_transitions.ml +++ b/src/proto_alpha/lib_delegate/state_transitions.ml @@ -719,14 +719,13 @@ let step (state : Baking_state.t) (event : Baking_state.event) : >>= fun () -> Events.(emit new_head_while_waiting_for_qc ()) >>= fun () -> handle_new_proposal state block_info - | ( Awaiting_preendorsements, - Prequorum_reached (candidate, _voting_power, preendorsement_qc) ) -> + | Awaiting_preendorsements, Prequorum_reached (candidate, preendorsement_qc) + -> prequorum_reached_when_awaiting_preendorsements state candidate preendorsement_qc - | ( Awaiting_endorsements, - Quorum_reached (candidate, _voting_power, endorsement_qc) ) -> + | Awaiting_endorsements, Quorum_reached (candidate, endorsement_qc) -> quorum_reached_when_waiting_endorsements state candidate endorsement_qc (* Unreachable cases *) | Idle, (Prequorum_reached _ | Quorum_reached _) -- GitLab From 27e9ec255a58149397beb6a1f489255e0dbda782 Mon Sep 17 00:00:00 2001 From: vbot Date: Thu, 19 Jan 2023 16:36:15 +0100 Subject: [PATCH 02/14] Alpha/Baker: rework RPC monitoring and expose valid blocks watch --- .../lib_delegate/baking_actions.ml | 15 + src/proto_alpha/lib_delegate/baking_events.ml | 16 +- src/proto_alpha/lib_delegate/baking_lib.ml | 16 +- src/proto_alpha/lib_delegate/baking_nonces.ml | 2 +- .../lib_delegate/baking_scheduling.ml | 12 +- .../lib_delegate/baking_simulator.ml | 25 +- .../lib_delegate/baking_simulator.mli | 1 + src/proto_alpha/lib_delegate/baking_state.ml | 90 ++--- src/proto_alpha/lib_delegate/baking_state.mli | 15 +- src/proto_alpha/lib_delegate/block_forge.ml | 47 +-- src/proto_alpha/lib_delegate/block_forge.mli | 2 + src/proto_alpha/lib_delegate/node_rpc.ml | 321 +++++++++++------- src/proto_alpha/lib_delegate/node_rpc.mli | 20 +- .../lib_delegate/state_transitions.ml | 7 +- 14 files changed, 323 insertions(+), 266 deletions(-) diff --git a/src/proto_alpha/lib_delegate/baking_actions.ml b/src/proto_alpha/lib_delegate/baking_actions.ml index 3b78c068e85d..a1ed13063f1d 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.ml +++ b/src/proto_alpha/lib_delegate/baking_actions.ml @@ -215,6 +215,7 @@ let sign_block_header state proposer unsigned_block_header = return {Block_header.shell; protocol_data = {contents; signature}} let inject_block ~state_recorder state block_to_bake ~updated_state = + let open Lwt_result_syntax in let { predecessor; round; @@ -308,10 +309,24 @@ let inject_block ~state_recorder state block_to_bake ~updated_state = in Events.(emit vote_for_liquidity_baking_toggle) liquidity_baking_toggle_vote >>= fun () -> + let chain = `Hash state.global_state.chain_id in + let pred_block = `Hash (predecessor.hash, 0) in + let* pred_resulting_context_hash = + Shell_services.Blocks.resulting_context_hash + cctxt + ~chain + ~block:pred_block + () + in + let* pred_live_blocks = + Chain_services.Blocks.live_blocks cctxt ~chain ~block:pred_block () + in Block_forge.forge cctxt ~chain_id ~pred_info:predecessor + ~pred_live_blocks + ~pred_resulting_context_hash ~timestamp ~round ~seed_nonce_hash diff --git a/src/proto_alpha/lib_delegate/baking_events.ml b/src/proto_alpha/lib_delegate/baking_events.ml index 4983b02ac7d4..b8ee3b7afbb9 100644 --- a/src/proto_alpha/lib_delegate/baking_events.ml +++ b/src/proto_alpha/lib_delegate/baking_events.ml @@ -315,16 +315,14 @@ module Node_rpc = struct ~pp1:Error_monad.pp_print_trace ("trace", Error_monad.trace_encoding) - let raw_info = - declare_2 + let error_while_monitoring_valid_proposals = + declare_1 ~section - ~name:"raw_info" - ~level:Debug - ~msg:"raw info for {block_hash} at level {level}" - ~pp1:Block_hash.pp - ("block_hash", Block_hash.encoding) - ~pp2:pp_int32 - ("level", Data_encoding.int32) + ~name:"error_while_monitoring_valid_proposals" + ~level:Error + ~msg:"error while monitoring valid proposals {trace}" + ~pp1:Error_monad.pp_print_trace + ("trace", Error_monad.trace_encoding) end module Scheduling = struct diff --git a/src/proto_alpha/lib_delegate/baking_lib.ml b/src/proto_alpha/lib_delegate/baking_lib.ml index 82c54511f347..55252076bd6b 100644 --- a/src/proto_alpha/lib_delegate/baking_lib.ml +++ b/src/proto_alpha/lib_delegate/baking_lib.ml @@ -44,10 +44,10 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config ~current_proposal delegates -let get_current_proposal cctxt = +let get_current_proposal cctxt ?cache () = let open Lwt_result_syntax in let* block_stream, _block_stream_stopper = - Node_rpc.monitor_proposals cctxt ~chain:cctxt#chain () + Node_rpc.monitor_heads cctxt ?cache ~chain:cctxt#chain () in Lwt_stream.peek block_stream >>= function | Some current_head -> return (block_stream, current_head) @@ -59,7 +59,8 @@ let preendorse (cctxt : Protocol_client_context.full) ?(force = false) delegates = let open State_transitions in let open Lwt_result_syntax in - let* _, current_proposal = get_current_proposal cctxt in + let cache = Baking_cache.Block_cache.create 10 in + let* _, current_proposal = get_current_proposal cctxt ~cache () in let config = Baking_configuration.make ~force () in let* state = create_state cctxt ~config ~current_proposal delegates in let proposal = state.level_state.latest_proposal in @@ -91,7 +92,8 @@ let preendorse (cctxt : Protocol_client_context.full) ?(force = false) delegates let endorse (cctxt : Protocol_client_context.full) ?(force = false) delegates = let open State_transitions in let open Lwt_result_syntax in - let* _, current_proposal = get_current_proposal cctxt in + let cache = Baking_cache.Block_cache.create 10 in + let* _, current_proposal = get_current_proposal cctxt ~cache () in let config = Baking_configuration.make ~force () in create_state cctxt ~config ~current_proposal delegates >>=? fun state -> let proposal = state.level_state.latest_proposal in @@ -283,7 +285,8 @@ let propose (cctxt : Protocol_client_context.full) ?minimal_fees ?minimal_nanotez_per_gas_unit ?minimal_nanotez_per_byte ?force_apply ?force ?(minimal_timestamp = false) ?extra_operations ?context_path delegates = let open Lwt_result_syntax in - let* _block_stream, current_proposal = get_current_proposal cctxt in + let cache = Baking_cache.Block_cache.create 10 in + let* _block_stream, current_proposal = get_current_proposal cctxt ~cache () in let config = Baking_configuration.make ?minimal_fees @@ -520,7 +523,8 @@ let bake (cctxt : Protocol_client_context.full) ?minimal_fees ?dal_node_endpoint () in - let* block_stream, current_proposal = get_current_proposal cctxt in + let cache = Baking_cache.Block_cache.create 10 in + let* block_stream, current_proposal = get_current_proposal cctxt ~cache () in let* state = create_state cctxt diff --git a/src/proto_alpha/lib_delegate/baking_nonces.ml b/src/proto_alpha/lib_delegate/baking_nonces.ml index fc1c9ada635a..c0c3d4d8a15d 100644 --- a/src/proto_alpha/lib_delegate/baking_nonces.ml +++ b/src/proto_alpha/lib_delegate/baking_nonces.ml @@ -255,7 +255,7 @@ let reveal_potential_nonces state new_proposal = let new_predecessor_hash = new_proposal.Baking_state.predecessor.hash in if Block_hash.(last_predecessor <> new_predecessor_hash) - && Protocol_hash.(new_proposal.predecessor.protocol = Protocol.hash) + && not (Baking_state.is_first_block_in_protocol new_proposal) then ( (* only try revealing nonces when the proposal's predecessor is a new one *) state.last_predecessor <- new_predecessor_hash ; diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index 437c290c1965..e3219f9c8bac 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -207,8 +207,7 @@ let compute_next_round_time state = | None -> state.level_state.latest_proposal | Some {proposal; _} -> proposal in - if Protocol_hash.(proposal.predecessor.next_protocol <> Protocol.hash) then - None + if Baking_state.is_first_block_in_protocol proposal then None else match state.level_state.next_level_proposed_round with | Some _proposed_round -> @@ -625,11 +624,7 @@ let create_initial_state cctxt ?(synchronize = true) ~chain config ~chain >>=? fun next_level_delegate_slots -> let elected_block = - if - Protocol_hash.( - current_proposal.block.protocol <> Protocol.hash - && current_proposal.block.next_protocol = Protocol.hash) - then + if Baking_state.is_first_block_in_protocol current_proposal then (* If the last block is a protocol transition, we admit it as a final block *) Some {proposal = current_proposal; endorsement_qc = []} @@ -731,7 +726,8 @@ let run cctxt ?canceler ?(stop_on_event = fun _ -> false) ?(on_error = fun _ -> return_unit) ~chain config delegates = Shell_services.Chain.chain_id cctxt ~chain () >>=? fun chain_id -> perform_sanity_check cctxt ~chain_id >>=? fun () -> - Node_rpc.monitor_proposals cctxt ~chain () + let cache = Baking_cache.Block_cache.create 10 in + Node_rpc.monitor_heads cctxt ~cache ~chain () >>=? fun (block_stream, _block_stream_stopper) -> (Lwt_stream.get block_stream >>= function | Some current_head -> return current_head diff --git a/src/proto_alpha/lib_delegate/baking_simulator.ml b/src/proto_alpha/lib_delegate/baking_simulator.ml index 74f2b7519cfb..000d7cc3c575 100644 --- a/src/proto_alpha/lib_delegate/baking_simulator.ml +++ b/src/proto_alpha/lib_delegate/baking_simulator.ml @@ -77,17 +77,11 @@ let check_context_consistency (abstract_index : Abstract_context_index.t) | false -> fail Invalid_context)) let begin_construction ~timestamp ~protocol_data ~force_apply - (abstract_index : Abstract_context_index.t) predecessor chain_id = + ~pred_resulting_context_hash (abstract_index : Abstract_context_index.t) + pred_block chain_id = protect (fun () -> - let { - Baking_state.shell = pred_shell; - hash = pred_hash; - resulting_context_hash; - _; - } = - predecessor - in - abstract_index.checkout_fun resulting_context_hash >>= function + let {Baking_state.shell = pred_shell; hash = pred_hash; _} = pred_block in + abstract_index.checkout_fun pred_resulting_context_hash >>= function | None -> fail Failed_to_checkout_context | Some context -> let header : Tezos_base.Block_header.shell_header = @@ -107,7 +101,7 @@ let begin_construction ~timestamp ~protocol_data ~force_apply let mode = Lifted_protocol.Construction { - predecessor_hash = predecessor.hash; + predecessor_hash = pred_hash; timestamp; block_header_data = protocol_data; } @@ -130,7 +124,14 @@ let begin_construction ~timestamp ~protocol_data ~force_apply else return_none) >>=? fun application_state -> let state = (validation_state, application_state) in - return {predecessor; context; state; rev_operations = []; header}) + return + { + predecessor = pred_block; + context; + state; + rev_operations = []; + header; + }) let ( let** ) x k = let open Lwt_result_syntax in diff --git a/src/proto_alpha/lib_delegate/baking_simulator.mli b/src/proto_alpha/lib_delegate/baking_simulator.mli index 959546fe6d21..c5155ac45ee0 100644 --- a/src/proto_alpha/lib_delegate/baking_simulator.mli +++ b/src/proto_alpha/lib_delegate/baking_simulator.mli @@ -52,6 +52,7 @@ val begin_construction : timestamp:Time.Protocol.t -> protocol_data:block_header_data -> force_apply:bool -> + pred_resulting_context_hash:Context_hash.t -> Abstract_context_index.t -> Baking_state.block_info -> Chain_id.t -> diff --git a/src/proto_alpha/lib_delegate/baking_state.ml b/src/proto_alpha/lib_delegate/baking_state.ml index b1dc70cc67a7..164807e2375e 100644 --- a/src/proto_alpha/lib_delegate/baking_state.ml +++ b/src/proto_alpha/lib_delegate/baking_state.ml @@ -103,16 +103,12 @@ type prequorum = { type block_info = { hash : Block_hash.t; shell : Block_header.shell_header; - resulting_context_hash : Context_hash.t; payload_hash : Block_payload_hash.t; payload_round : Round.t; round : Round.t; - protocol : Protocol_hash.t; - next_protocol : Protocol_hash.t; prequorum : prequorum option; quorum : Kind.endorsement operation list; payload : Operation_pool.payload; - live_blocks : Block_hash.Set.t; } type cache = { @@ -168,68 +164,48 @@ let block_info_encoding = (fun { hash; shell; - resulting_context_hash; payload_hash; payload_round; round; - protocol; - next_protocol; prequorum; quorum; payload; - live_blocks; } -> - ( ( hash, - shell, - resulting_context_hash, - payload_hash, - payload_round, - round, - protocol, - next_protocol, - prequorum, - List.map Operation.pack quorum ), - (payload, live_blocks) )) - (fun ( ( hash, - shell, - resulting_context_hash, - payload_hash, - payload_round, - round, - protocol, - next_protocol, - prequorum, - quorum ), - (payload, live_blocks) ) -> + ( hash, + shell, + payload_hash, + payload_round, + round, + prequorum, + List.map Operation.pack quorum, + payload )) + (fun ( hash, + shell, + payload_hash, + payload_round, + round, + prequorum, + quorum, + payload ) -> { hash; shell; - resulting_context_hash; payload_hash; payload_round; round; - protocol; - next_protocol; prequorum; quorum = List.filter_map Operation_pool.unpack_endorsement quorum; payload; - live_blocks; }) - (merge_objs - (obj10 - (req "hash" Block_hash.encoding) - (req "shell" Block_header.shell_header_encoding) - (req "resulting_context_hash" Context_hash.encoding) - (req "payload_hash" Block_payload_hash.encoding) - (req "payload_round" Round.encoding) - (req "round" Round.encoding) - (req "protocol" Protocol_hash.encoding) - (req "next_protocol" Protocol_hash.encoding) - (req "prequorum" (option prequorum_encoding)) - (req "quorum" (list (dynamic_size Operation.encoding)))) - (obj2 - (req "payload" Operation_pool.payload_encoding) - (req "live_blocks" Block_hash.Set.encoding))) + (obj8 + (req "hash" Block_hash.encoding) + (req "shell" Block_header.shell_header_encoding) + (req "payload_hash" Block_payload_hash.encoding) + (req "payload_round" Round.encoding) + (req "round" Round.encoding) + (req "prequorum" (option prequorum_encoding)) + (req "quorum" (list (dynamic_size Operation.encoding))) + (req "payload" Operation_pool.payload_encoding)) let round_of_shell_header shell_header = Environment.wrap_tzresult @@ -267,6 +243,9 @@ let proposal_encoding = (req "block" block_info_encoding) (req "predecessor" block_info_encoding)) +let is_first_block_in_protocol {block; predecessor; _} = + Compare.Int.(block.shell.proto_level <> predecessor.shell.proto_level) + type locked_round = {payload_hash : Block_payload_hash.t; round : Round.t} let locked_round_encoding = @@ -722,18 +701,15 @@ let pp_block_info fmt shell; payload_hash; round; - protocol; - next_protocol; prequorum; quorum; payload; - _; + payload_round; } = Format.fprintf fmt "@[Block:@ hash: %a@ payload_hash: %a@ level: %ld@ round: %a@ \ - protocol: %a@ next protocol: %a@ prequorum: %a@ quorum: %d endorsements@ \ - payload: %a@]" + prequorum: %a@ quorum: %d endorsements@ payload: %a@ payload round: %a@]" Block_hash.pp hash Block_payload_hash.pp_short @@ -741,15 +717,13 @@ let pp_block_info fmt shell.level Round.pp round - Protocol_hash.pp_short - protocol - Protocol_hash.pp_short - next_protocol (pp_option pp_prequorum) prequorum (List.length quorum) Operation_pool.pp_payload payload + Round.pp + payload_round let pp_proposal fmt {block; _} = pp_block_info fmt block diff --git a/src/proto_alpha/lib_delegate/baking_state.mli b/src/proto_alpha/lib_delegate/baking_state.mli index de951b78d22d..e75a0668b318 100644 --- a/src/proto_alpha/lib_delegate/baking_state.mli +++ b/src/proto_alpha/lib_delegate/baking_state.mli @@ -57,18 +57,12 @@ type prequorum = { type block_info = { hash : Block_hash.t; shell : Block_header.shell_header; - resulting_context_hash : Context_hash.t; payload_hash : Block_payload_hash.t; payload_round : Round.t; round : Round.t; - protocol : Protocol_hash.t; - next_protocol : Protocol_hash.t; prequorum : prequorum option; quorum : Kind.endorsement operation list; payload : Operation_pool.payload; - live_blocks : Block_hash.Set.t; - (** Set of live blocks for this block that is used to filter - old or too recent operations. *) } type cache = { @@ -109,6 +103,15 @@ type proposal = {block : block_info; predecessor : block_info} val proposal_encoding : proposal Data_encoding.t +(** Identify the first block of the protocol, ie. the block that + activates the current protocol. + + This block should be baked by the baker of the previous protocol + (that's why this same block is also referred to as the last block + of the previous protocol). It is always considered final and + therefore is not endorsed.*) +val is_first_block_in_protocol : proposal -> bool + type locked_round = {payload_hash : Block_payload_hash.t; round : Round.t} val locked_round_encoding : locked_round Data_encoding.t diff --git a/src/proto_alpha/lib_delegate/block_forge.ml b/src/proto_alpha/lib_delegate/block_forge.ml index d3be206ac6b6..01a06159e467 100644 --- a/src/proto_alpha/lib_delegate/block_forge.ml +++ b/src/proto_alpha/lib_delegate/block_forge.ml @@ -69,17 +69,19 @@ let convert_operation (op : packed_operation) : Tezos_base.Operation.t = op.protocol_data; } -(* [finalize_block_header ~shell_header ~validation_result ~operations_hash - ~pred_info ~round ~locked_round] updates the [shell_header] that was created - with dummy fields at the beginning of the block construction. It increments - the [level] and sets the actual [operations_hash], [fitness], - [validation_passes], and [context] (the predecessor resulting context hash). +(* [finalize_block_header] updates the [shell_header] that was created + with dummy fields at the beginning of the block construction. It + increments the [level] and sets the actual [operations_hash], + [fitness], [validation_passes], and [context] (the predecessor + resulting context hash). - When the operations from the block have been applied, the [fitness] is simply - retrieved from the [validation_result]. Otherwise, the [fitness] is computed - from the [round] and [locked_round] arguments. *) + When the operations from the block have been applied, the [fitness] + is simply retrieved from the [validation_result]. Otherwise, the + [fitness] is computed from the [round] and [locked_round] + arguments. *) let finalize_block_header ~shell_header ~validation_result ~operations_hash - ~(pred_info : Baking_state.block_info) ~round ~locked_round = + ~(pred_info : Baking_state.block_info) ~pred_resulting_context_hash ~round + ~locked_round = let open Lwt_result_syntax in let* fitness = match validation_result with @@ -108,7 +110,7 @@ let finalize_block_header ~shell_header ~validation_result ~operations_hash validation_passes; operations_hash; fitness; - context = pred_info.resulting_context_hash; + context = pred_resulting_context_hash; } in return header @@ -203,14 +205,15 @@ let filter_via_node ~chain_id ~fees_config ~hard_gas_limit_per_block [filter_via_node] is called to return these values. *) let filter_with_context ~chain_id ~fees_config ~hard_gas_limit_per_block ~faked_protocol_data ~user_activated_upgrades ~timestamp - ~(pred_info : Baking_state.block_info) ~force_apply ~round ~context_index - ~payload_round ~operation_pool cctxt = + ~(pred_info : Baking_state.block_info) ~pred_resulting_context_hash + ~force_apply ~round ~context_index ~payload_round ~operation_pool cctxt = let open Lwt_result_syntax in let* incremental = Baking_simulator.begin_construction ~timestamp ~protocol_data:faked_protocol_data ~force_apply + ~pred_resulting_context_hash context_index pred_info chain_id @@ -248,6 +251,7 @@ let filter_with_context ~chain_id ~fees_config ~hard_gas_limit_per_block ~validation_result ~operations_hash ~pred_info + ~pred_resulting_context_hash ~round ~locked_round:None in @@ -288,14 +292,16 @@ let apply_via_node ~chain_id ~faked_protocol_data ~timestamp consensus operations only from an [ordered_pool] via {!Operation_selection.filter_consensus_operations_only}. *) let apply_with_context ~chain_id ~faked_protocol_data ~user_activated_upgrades - ~timestamp ~(pred_info : Baking_state.block_info) ~force_apply ~round - ~ordered_pool ~context_index ~payload_hash cctxt = + ~timestamp ~(pred_info : Baking_state.block_info) + ~pred_resulting_context_hash ~force_apply ~round ~ordered_pool + ~context_index ~payload_hash cctxt = let open Lwt_result_syntax in let* incremental = Baking_simulator.begin_construction ~timestamp ~protocol_data:faked_protocol_data ~force_apply + ~pred_resulting_context_hash context_index pred_info chain_id @@ -358,6 +364,7 @@ let apply_with_context ~chain_id ~faked_protocol_data ~user_activated_upgrades ~validation_result ~operations_hash ~pred_info + ~pred_resulting_context_hash ~round ~locked_round:locked_round_when_no_validation_result in @@ -367,10 +374,10 @@ let apply_with_context ~chain_id ~faked_protocol_data ~user_activated_upgrades (* [forge] a new [unsigned_block] in accordance with [simulation_kind] and [simulation_mode] *) let forge (cctxt : #Protocol_client_context.full) ~chain_id - ~(pred_info : Baking_state.block_info) ~timestamp ~round - ~liquidity_baking_toggle_vote ~user_activated_upgrades fees_config - ~force_apply ~seed_nonce_hash ~payload_round simulation_mode simulation_kind - constants = + ~(pred_info : Baking_state.block_info) ~pred_resulting_context_hash + ~pred_live_blocks ~timestamp ~round ~liquidity_baking_toggle_vote + ~user_activated_upgrades fees_config ~force_apply ~seed_nonce_hash + ~payload_round simulation_mode simulation_kind constants = let open Lwt_result_syntax in let hard_gas_limit_per_block = constants.Constants.Parametric.hard_gas_limit_per_block @@ -382,7 +389,7 @@ let forge (cctxt : #Protocol_client_context.full) ~chain_id to our predecessor otherwise the node would reject the block. *) let filtered_pool = retain_live_operations_only - ~live_blocks:pred_info.live_blocks + ~live_blocks:pred_live_blocks operation_pool in Filter filtered_pool @@ -441,6 +448,7 @@ let forge (cctxt : #Protocol_client_context.full) ~chain_id ~user_activated_upgrades ~timestamp ~pred_info + ~pred_resulting_context_hash ~force_apply ~round ~context_index @@ -462,6 +470,7 @@ let forge (cctxt : #Protocol_client_context.full) ~chain_id ~user_activated_upgrades ~timestamp ~pred_info + ~pred_resulting_context_hash ~force_apply ~round ~ordered_pool diff --git a/src/proto_alpha/lib_delegate/block_forge.mli b/src/proto_alpha/lib_delegate/block_forge.mli index 77fb619a192f..7afc2ce9791f 100644 --- a/src/proto_alpha/lib_delegate/block_forge.mli +++ b/src/proto_alpha/lib_delegate/block_forge.mli @@ -44,6 +44,8 @@ val forge : #Protocol_client_context.full -> chain_id:Chain_id.t -> pred_info:Baking_state.block_info -> + pred_resulting_context_hash:Context_hash.t -> + pred_live_blocks:Block_hash.Set.t -> timestamp:Time.Protocol.t -> round:Round.t -> liquidity_baking_toggle_vote:Liquidity_baking.liquidity_baking_toggle_vote -> diff --git a/src/proto_alpha/lib_delegate/node_rpc.ml b/src/proto_alpha/lib_delegate/node_rpc.ml index 96ce7edebae0..7e493fc8e3b2 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.ml +++ b/src/proto_alpha/lib_delegate/node_rpc.ml @@ -25,6 +25,8 @@ open Protocol open Alpha_context +open Baking_cache +open Baking_state module Block_services = Block_services.Make (Protocol) (Protocol) module Events = Baking_events.Node_rpc @@ -50,164 +52,219 @@ let preapply_block cctxt ~chain ~head ~timestamp ~protocol_data operations = let extract_prequorum preendorsements = match preendorsements with - | h :: _ as l -> + | h :: _ -> let ({protocol_data = {contents = Single (Preendorsement content); _}; _}) = (h : Kind.preendorsement Operation.t) in Some { - Baking_state.level = Raw_level.to_int32 content.level; + level = Raw_level.to_int32 content.level; round = content.round; block_payload_hash = content.block_payload_hash; - preendorsements = l; + preendorsements; } | _ -> None -let raw_info cctxt ~chain ~block_hash shell resulting_context_hash payload_hash - payload_round current_protocol next_protocol live_blocks = - Events.(emit raw_info (block_hash, shell.Tezos_base.Block_header.level)) - >>= fun () -> - let open Protocol_client_context in - let block = `Hash (block_hash, 0) in - let is_in_protocol = Protocol_hash.(current_protocol = Protocol.hash) in - (if is_in_protocol then - Alpha_block_services.Operations.operations cctxt ~chain ~block () - >>=? fun operations -> - let operations = - List.map - (fun l -> - List.map - (fun {Alpha_block_services.shell; protocol_data; _} -> - {Alpha_context.shell; protocol_data}) - l) - operations - in - match Operation_pool.extract_operations_of_list_list operations with - | None -> failwith "Unexpected operation list size" - | Some operations -> return operations - else - (* If we are not in the current protocol, do no consider operations *) - return (None, [], Operation_pool.empty_payload)) - >>=? fun (preendorsements, quorum, payload) -> - (match Baking_state.round_of_shell_header shell with - | Ok round -> ok round - | _ -> - (* this can occur if the protocol has just changed and the - previous protocol does not have a concept of round - (e.g. Genesis) *) - ok Round.zero) - >>?= fun round -> - let prequorum = Option.bind preendorsements extract_prequorum in +let info_of_header_and_ops ~in_protocol block_hash block_header operations = + let open Result_syntax in + let shell = block_header.Tezos_base.Block_header.shell in + let dummy_payload_hash = Block_payload_hash.zero in + let* round = + Environment.wrap_tzresult @@ Fitness.round_from_raw shell.fitness + in + let payload_hash, payload_round, prequorum, quorum, payload = + if not in_protocol then + (* The first block in the protocol is baked using the previous + protocol, the encodings might change. The baker's logic is to + consider final the first block of a new protocol and not + endorse it. Therefore, we do not need to have the correct + values here. *) + (dummy_payload_hash, Round.zero, None, [], Operation_pool.empty_payload) + else + let payload_hash, payload_round = + match + Data_encoding.Binary.of_bytes_opt + Protocol.block_header_data_encoding + block_header.protocol_data + with + | Some {contents = {payload_hash; payload_round; _}; _} -> + (payload_hash, payload_round) + | None -> assert false + in + let preendorsements, quorum, payload = + WithExceptions.Option.get + ~loc:__LOC__ + (Operation_pool.extract_operations_of_list_list operations) + in + let prequorum = Option.bind preendorsements extract_prequorum in + (payload_hash, payload_round, prequorum, quorum, payload) + in return { - Baking_state.hash = block_hash; + hash = block_hash; shell; - resulting_context_hash; payload_hash; payload_round; round; - protocol = current_protocol; - next_protocol; prequorum; quorum; payload; - live_blocks; } -let dummy_payload_hash = Block_payload_hash.zero +let compute_block_info cctxt ~in_protocol ?operations ~chain block_hash + block_header = + let open Lwt_result_syntax in + let* operations = + match operations with + | None when not in_protocol -> return_nil + | None -> + let open Protocol_client_context in + let* operations = + Alpha_block_services.Operations.operations + cctxt + ~chain + ~block:(`Hash (block_hash, 0)) + () + in + let packed_operations = + List.map + (fun l -> + List.map + (fun {Alpha_block_services.shell; protocol_data; _} -> + {Alpha_context.shell; protocol_data}) + l) + operations + in + return packed_operations + | Some operations -> + let parse_op (raw_op : Tezos_base.Operation.t) = + let protocol_data = + Data_encoding.Binary.of_bytes_exn + Operation.protocol_data_encoding + raw_op.proto + in + {shell = raw_op.shell; protocol_data} + in + protect @@ fun () -> return (List.map (List.map parse_op) operations) + in + let*? block_info = + info_of_header_and_ops ~in_protocol block_hash block_header operations + in + return block_info -let info cctxt ~chain ~block () = - let open Protocol_client_context in - (* Fails if the block's protocol is not the current one *) - Shell_services.Blocks.protocols cctxt ~chain ~block () - >>=? fun {current_protocol; next_protocol} -> - Shell_services.Blocks.resulting_context_hash cctxt ~chain ~block () - >>=? fun resulting_context_hash -> - (if Protocol_hash.(current_protocol <> Protocol.hash) then - Block_services.Header.shell_header cctxt ~chain ~block () >>=? fun shell -> - Chain_services.Blocks.Header.raw_protocol_data cctxt ~chain ~block () - >>=? fun protocol_data -> - let hash = - Tezos_base.Block_header.hash {Tezos_base.Block_header.shell; protocol_data} - in - (* /!\ We decode [protocol_data] with the current protocol's - encoding, while we should use the previous protocol's - [protocol_data] encoding. For now, this works because the - encoding has not changed. *) - let payload_hash, payload_round = - match - Data_encoding.Binary.of_bytes_opt - Protocol.block_header_data_encoding - protocol_data - with - | Some {contents = {payload_hash; payload_round; _}; _} -> - (payload_hash, payload_round) - | None -> (dummy_payload_hash, Round.zero) - in - return (hash, shell, resulting_context_hash, payload_hash, payload_round) - else - Alpha_block_services.header cctxt ~chain ~block () - >>=? fun {hash; shell; protocol_data; _} -> - return - ( hash, - shell, - resulting_context_hash, - protocol_data.contents.payload_hash, - protocol_data.contents.payload_round )) - >>=? fun (hash, shell, resulting_context_hash, payload_hash, payload_round) -> - (Chain_services.Blocks.live_blocks cctxt ~chain ~block () >>= function - | Error _ -> - (* The RPC might fail when a block's metadata is not available *) - Lwt.return Block_hash.Set.empty - | Ok live_blocks -> Lwt.return live_blocks) - >>= fun live_blocks -> - raw_info - cctxt - ~chain - ~block_hash:hash - shell - resulting_context_hash - payload_hash - payload_round - current_protocol - next_protocol - live_blocks +let proposal cctxt ?(cache : block_info Block_cache.t option) ?operations ~chain + block_hash (block_header : Tezos_base.Block_header.t) = + let open Lwt_result_syntax in + let predecessor_hash = block_header.shell.predecessor in + let pred_block = `Hash (predecessor_hash, 0) in + let predecessor_opt = + Option.bind cache (fun cache -> Block_cache.find_opt cache predecessor_hash) + in + let* is_proposal_in_protocol, predecessor = + match predecessor_opt with + | Some predecessor -> + return + ( predecessor.shell.proto_level = block_header.shell.proto_level, + predecessor ) + | None -> + let* { + current_protocol = pred_current_protocol; + next_protocol = pred_next_protocol; + } = + Shell_services.Blocks.protocols cctxt ~chain ~block:pred_block () + in + let is_proposal_in_protocol = + Protocol_hash.(pred_next_protocol = Protocol.hash) + in + let* predecessor = + let in_protocol = + Protocol_hash.(pred_current_protocol = Protocol.hash) + in + let* raw_header_b = + Shell_services.Blocks.raw_header cctxt ~chain ~block:pred_block () + in + let predecessor_header = + Data_encoding.Binary.of_bytes_exn + Tezos_base.Block_header.encoding + raw_header_b + in + compute_block_info + cctxt + ~in_protocol + ~chain + predecessor_hash + predecessor_header + in + Option.iter + (fun cache -> Block_cache.replace cache predecessor_hash predecessor) + cache ; + return (is_proposal_in_protocol, predecessor) + in + let block_opt = + Option.bind cache (fun cache -> Block_cache.find_opt cache block_hash) + in + let* block = + match block_opt with + | Some pi -> return pi + | None -> + let* pi = + compute_block_info + cctxt + ~in_protocol:is_proposal_in_protocol + ?operations + ~chain + block_hash + block_header + in + Option.iter (fun cache -> Block_cache.replace cache block_hash pi) cache ; + return pi + in + return {block; predecessor} -let find_in_cache_or_fetch cctxt ?cache ~chain block_hash = - let open Baking_cache in - let fetch () = info cctxt ~chain ~block:(`Hash (block_hash, 0)) () in - match cache with - | None -> fetch () - | Some block_cache -> ( - match Block_cache.find_opt block_cache block_hash with - | Some block_info -> return block_info - | None -> - fetch () >>=? fun block_info -> - Block_cache.replace block_cache block_hash block_info ; - return block_info) +let proposal cctxt ?cache ?operations ~chain block_hash block_header = + protect @@ fun () -> + proposal cctxt ?cache ?operations ~chain block_hash block_header -let proposal cctxt ?cache ~chain block_hash = - find_in_cache_or_fetch cctxt ~chain ?cache block_hash >>=? fun block -> - let predecessor_hash = block.shell.predecessor in - find_in_cache_or_fetch cctxt ~chain ?cache predecessor_hash - >>=? fun predecessor -> return {Baking_state.block; predecessor} +let monitor_valid_proposals cctxt ~chain ?cache () = + let open Lwt_result_syntax in + let next_protocols = [Protocol.hash] in + let* block_stream, stopper = + Monitor_services.validated_blocks cctxt ~chains:[chain] ~next_protocols () + in + let stream = + let map (_chain_id, block_hash, block_header, operations) = + let*! map_result = + proposal cctxt ?cache ~operations ~chain block_hash block_header + in + match map_result with + | Ok proposal -> Lwt.return_some proposal + | Error err -> + let*! () = Events.(emit error_while_monitoring_valid_proposals err) in + Lwt.return_none + in + Lwt_stream.filter_map_s map block_stream + in + return (stream, stopper) -let monitor_proposals cctxt ~chain () = - let cache = Baking_cache.Block_cache.create 100 in - Monitor_services.heads cctxt ~next_protocols:[Protocol.hash] chain - >>=? fun (block_stream, stopper) -> - return - ( Lwt_stream.filter_map_s - (fun (block_hash, _) -> - protect (fun () -> proposal cctxt ~cache ~chain block_hash) - >>= function - | Ok proposal -> Lwt.return_some proposal - | Error err -> - Events.(emit error_while_monitoring_heads err) >>= fun () -> - Lwt.return_none) - block_stream, - stopper ) +let monitor_heads cctxt ~chain ?cache () = + let open Lwt_result_syntax in + let next_protocols = [Protocol.hash] in + let* block_stream, stopper = + Monitor_services.heads cctxt ~next_protocols chain + in + let stream, stopper = + let map (block_hash, block_header) = + let*! map_result = proposal cctxt ?cache ~chain block_hash block_header in + match map_result with + | Ok proposal -> Lwt.return_some proposal + | Error err -> + let*! () = Events.(emit error_while_monitoring_heads err) in + Lwt.return_none + in + (Lwt_stream.filter_map_s map block_stream, stopper) + in + return (stream, stopper) let await_protocol_activation cctxt ~chain () = Monitor_services.heads cctxt ~next_protocols:[Protocol.hash] chain diff --git a/src/proto_alpha/lib_delegate/node_rpc.mli b/src/proto_alpha/lib_delegate/node_rpc.mli index 3813841be4db..e77ae97d4142 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.mli +++ b/src/proto_alpha/lib_delegate/node_rpc.mli @@ -50,21 +50,19 @@ val preapply_block : (Tezos_base.Block_header.shell_header * error Preapply_result.t list) tzresult Lwt.t -(** Fetch a proposal from the node. - - @param cache is unset by default -*) -val proposal : - #Tezos_rpc.Context.simple -> - ?cache:Baking_state.block_info Baking_cache.Block_cache.t -> +(** Monitor validated blocks/proposals from the node. *) +val monitor_valid_proposals : + #Protocol_client_context.rpc_context -> chain:Shell_services.chain -> - Block_hash.t -> - Baking_state.proposal tzresult Lwt.t + ?cache:Baking_state.block_info Baking_cache.Block_cache.t -> + unit -> + (Baking_state.proposal Lwt_stream.t * (unit -> unit)) tzresult Lwt.t -(** Monitor proposals from the node.*) -val monitor_proposals : +(** Monitor heads from the node. *) +val monitor_heads : #Protocol_client_context.rpc_context -> chain:Shell_services.chain -> + ?cache:Baking_state.block_info Baking_cache.Block_cache.t -> unit -> (Baking_state.proposal Lwt_stream.t * (unit -> unit)) tzresult Lwt.t diff --git a/src/proto_alpha/lib_delegate/state_transitions.ml b/src/proto_alpha/lib_delegate/state_transitions.ml index 7d6fe18f2cba..43d07f528eb2 100644 --- a/src/proto_alpha/lib_delegate/state_transitions.ml +++ b/src/proto_alpha/lib_delegate/state_transitions.ml @@ -111,8 +111,7 @@ let may_update_proposal state (proposal : proposal) = else Lwt.return state let preendorse state proposal = - if Protocol_hash.(proposal.block.protocol <> proposal.block.next_protocol) - then + if Baking_state.is_first_block_in_protocol proposal then (* We do not preendorse the first transition block *) let new_state = update_current_phase state Idle in Lwt.return (new_state, Do_nothing) @@ -537,8 +536,8 @@ let end_of_round state current_round = let new_state = update_current_phase new_state Idle in do_nothing new_state | Some (delegate, _) -> - let last_proposal = state.level_state.latest_proposal.block in - if Protocol_hash.(last_proposal.protocol <> Protocol.hash) then + let latest_proposal = state.level_state.latest_proposal in + if Baking_state.is_first_block_in_protocol latest_proposal then (* Do not inject a block for the previous protocol! (Let the baker of the previous protocol do it.) *) do_nothing new_state -- GitLab From c3464b623b831d5b27f68d95d0d22e66337582b1 Mon Sep 17 00:00:00 2001 From: vbot Date: Wed, 25 Jan 2023 14:16:43 +0100 Subject: [PATCH 03/14] Alpha/Baker: monitor valid blocks and trigger an associated event --- src/proto_alpha/lib_delegate/baking_lib.ml | 8 +- .../lib_delegate/baking_scheduling.ml | 116 ++++++++++++++---- .../lib_delegate/baking_scheduling.mli | 5 +- src/proto_alpha/lib_delegate/baking_state.ml | 29 +++-- src/proto_alpha/lib_delegate/baking_state.mli | 3 +- src/proto_alpha/lib_delegate/node_rpc.ml | 4 +- .../lib_delegate/state_transitions.ml | 9 +- .../lib_delegate/test/test_scenario.ml | 16 +-- 8 files changed, 140 insertions(+), 50 deletions(-) diff --git a/src/proto_alpha/lib_delegate/baking_lib.ml b/src/proto_alpha/lib_delegate/baking_lib.ml index 55252076bd6b..070bf264c8f8 100644 --- a/src/proto_alpha/lib_delegate/baking_lib.ml +++ b/src/proto_alpha/lib_delegate/baking_lib.ml @@ -371,18 +371,18 @@ let propose (cctxt : Protocol_client_context.full) ?minimal_fees in return_unit -let bake_using_automaton config state block_stream = +let bake_using_automaton config state heads_stream = let open Lwt_result_syntax in let cctxt = state.global_state.cctxt in let* initial_event = first_automaton_event state in let current_level = state.level_state.latest_proposal.block.shell.level in let loop_state = Baking_scheduling.create_loop_state - block_stream + ~heads_stream state.global_state.operation_worker in let stop_on_next_level_block = function - | New_proposal proposal -> + | New_head_proposal proposal -> Compare.Int32.(proposal.block.shell.level >= Int32.succ current_level) | _ -> false in @@ -394,7 +394,7 @@ let bake_using_automaton config state block_stream = state initial_event >>=? function - | Some (New_proposal proposal) -> + | Some (New_head_proposal proposal) -> let*! () = cctxt#message "Block %a (%ld) injected" diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index e3219f9c8bac..eeaa258deb07 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -28,33 +28,52 @@ module Events = Baking_events.Scheduling open Baking_state type loop_state = { - block_stream : Baking_state.proposal Lwt_stream.t; + heads_stream : Baking_state.proposal Lwt_stream.t; + get_valid_blocks_stream : Baking_state.proposal Lwt_stream.t Lwt.t; qc_stream : Operation_worker.event Lwt_stream.t; - future_block_stream : proposal Lwt_stream.t; - push_future_block : proposal -> unit; - mutable last_get_head_event : [`New_proposal of proposal option] Lwt.t option; + future_block_stream : + [`New_future_head of proposal | `New_future_valid_proposal of proposal] + Lwt_stream.t; + push_future_block : + [`New_future_head of proposal | `New_future_valid_proposal of proposal] -> + unit; + mutable last_get_head_event : + [`New_head_proposal of proposal option] Lwt.t option; + mutable last_get_valid_block_event : + [`New_valid_proposal of proposal option] Lwt.t option; mutable last_future_block_event : - [`New_future_block of Baking_state.proposal] Lwt.t option; + [`New_future_head of proposal | `New_future_valid_proposal of proposal] + Lwt.t + option; mutable last_get_qc_event : [`QC_reached of Operation_worker.event option] Lwt.t option; } type events = - [ `New_future_block of proposal - | `New_proposal of proposal option + [ `New_future_head of proposal + | `New_future_valid_proposal of proposal + | `New_valid_proposal of proposal option + | `New_head_proposal of proposal option | `QC_reached of Operation_worker.event option | `Termination | `Timeout of timeout_kind ] Lwt.t -let create_loop_state block_stream operation_worker = +let create_loop_state ?get_valid_blocks_stream ~heads_stream operation_worker = let future_block_stream, push_future_block = Lwt_stream.create () in + let get_valid_blocks_stream = + match get_valid_blocks_stream with + | None -> Lwt.return (Lwt_stream.create () |> fst) + | Some vbs_t -> vbs_t + in { - block_stream; + heads_stream; + get_valid_blocks_stream; qc_stream = Operation_worker.get_quorum_event_stream operation_worker; future_block_stream; push_future_block = (fun x -> push_future_block (Some x)); last_get_head_event = None; + last_get_valid_block_event = None; last_future_block_event = None; last_get_qc_event = None; } @@ -111,12 +130,24 @@ let rec wait_next_event ~timeout loop_state = match loop_state.last_get_head_event with | None -> let t = - Lwt_stream.get loop_state.block_stream >|= fun e -> `New_proposal e + Lwt_stream.get loop_state.heads_stream >|= fun e -> + `New_head_proposal e in loop_state.last_get_head_event <- Some t ; t | Some t -> t in + let get_valid_block_event () = + match loop_state.last_get_valid_block_event with + | None -> + let t = + loop_state.get_valid_blocks_stream >>= fun valid_blocks_stream -> + Lwt_stream.get valid_blocks_stream >|= fun e -> `New_valid_proposal e + in + loop_state.last_get_valid_block_event <- Some t ; + t + | Some t -> t + in let get_future_block_event () = (* n.b. we should also consume the available elements in the block_stream before starting baking. *) @@ -127,7 +158,7 @@ let rec wait_next_event ~timeout loop_state = | None -> (* unreachable, we never close the stream *) assert false - | Some proposal -> `New_future_block proposal + | Some future_proposal -> future_proposal in loop_state.last_future_block_event <- Some t ; t @@ -149,6 +180,7 @@ let rec wait_next_event ~timeout loop_state = [ terminated; (get_head_event () :> events); + (get_valid_block_event () :> events); (get_future_block_event () :> events); (get_qc_event () :> events); (timeout :> events); @@ -158,7 +190,11 @@ let rec wait_next_event ~timeout loop_state = | `Termination -> (* Exit the loop *) return_none - | `New_proposal None -> + | `New_valid_proposal None -> + (* Node connection lost *) + loop_state.last_get_valid_block_event <- None ; + fail Baking_errors.Node_connection_lost + | `New_head_proposal None -> (* Node connection lost *) loop_state.last_get_head_event <- None ; fail Baking_errors.Node_connection_lost @@ -166,7 +202,22 @@ let rec wait_next_event ~timeout loop_state = (* Not supposed to happen: exit the loop *) loop_state.last_get_qc_event <- None ; return_none - | `New_proposal (Some proposal) -> ( + | `New_valid_proposal (Some proposal) -> ( + loop_state.last_get_valid_block_event <- None ; + (* Is the block in the future? *) + match sleep_until proposal.block.shell.timestamp with + | Some waiter -> + (* If so, wait until its timestamp is reached before advertising it *) + Events.(emit proposal_in_the_future proposal.block.hash) >>= fun () -> + Lwt.dont_wait + (fun () -> + waiter >>= fun () -> + loop_state.push_future_block (`New_future_valid_proposal proposal) ; + Lwt.return_unit) + (fun _exn -> ()) ; + wait_next_event ~timeout loop_state + | None -> return_some (New_valid_proposal proposal)) + | `New_head_proposal (Some proposal) -> ( loop_state.last_get_head_event <- None ; (* Is the block in the future? *) match sleep_until proposal.block.shell.timestamp with @@ -176,16 +227,21 @@ let rec wait_next_event ~timeout loop_state = Lwt.dont_wait (fun () -> waiter >>= fun () -> - loop_state.push_future_block proposal ; + loop_state.push_future_block (`New_future_head proposal) ; Lwt.return_unit) (fun _exn -> ()) ; wait_next_event ~timeout loop_state - | None -> return_some (New_proposal proposal)) - | `New_future_block proposal -> + | None -> return_some (New_head_proposal proposal)) + | `New_future_head proposal -> + Events.(emit process_proposal_in_the_future proposal.block.hash) + >>= fun () -> + loop_state.last_future_block_event <- None ; + return_some (New_head_proposal proposal) + | `New_future_valid_proposal proposal -> Events.(emit process_proposal_in_the_future proposal.block.hash) >>= fun () -> loop_state.last_future_block_event <- None ; - return_some (New_proposal proposal) + return_some (New_valid_proposal proposal) | `QC_reached (Some (Operation_worker.Prequorum_reached (candidate, preendorsement_qc))) -> @@ -662,7 +718,7 @@ let compute_bootstrap_event state = = state.round_state.current_round) then (* If so, then trigger the new proposal event to possibly preendorse *) - ok @@ Baking_state.New_proposal state.level_state.latest_proposal + ok @@ Baking_state.New_head_proposal state.level_state.latest_proposal else (* Otherwise, trigger the end of round to check whether we need to propose at this level or not *) @@ -724,12 +780,13 @@ let perform_sanity_check cctxt ~chain_id = let run cctxt ?canceler ?(stop_on_event = fun _ -> false) ?(on_error = fun _ -> return_unit) ~chain config delegates = + let open Lwt_result_syntax in Shell_services.Chain.chain_id cctxt ~chain () >>=? fun chain_id -> perform_sanity_check cctxt ~chain_id >>=? fun () -> let cache = Baking_cache.Block_cache.create 10 in Node_rpc.monitor_heads cctxt ~cache ~chain () - >>=? fun (block_stream, _block_stream_stopper) -> - (Lwt_stream.get block_stream >>= function + >>=? fun (heads_stream, _block_stream_stopper) -> + (Lwt_stream.get heads_stream >>= function | Some current_head -> return current_head | None -> failwith "head stream unexpectedly ended") >>=? fun current_proposal -> @@ -748,7 +805,7 @@ let run cctxt ?canceler ?(stop_on_event = fun _ -> false) ~current_proposal delegates >>=? fun initial_state -> - let cloned_block_stream = Lwt_stream.clone block_stream in + let cloned_block_stream = Lwt_stream.clone heads_stream in Baking_nonces.start_revelation_worker cctxt initial_state.global_state.config.nonce @@ -762,9 +819,22 @@ let run cctxt ?canceler ?(stop_on_event = fun _ -> false) Lwt_canceler.cancel revelation_worker_canceler >>= fun _ -> Lwt.return_unit)) canceler ; - + (* FIXME: currently, the client streamed RPC call will hold until at + least one element is present in the stream. This is fixed by: + https://gitlab.com/nomadic-labs/resto/-/merge_requests/50. Until + then, we await the promise completion of the RPC call later + on. *) + let get_valid_blocks_stream = + let*! vbs = Node_rpc.monitor_valid_proposals cctxt ~cache ~chain () in + match vbs with + | Error _ -> Stdlib.failwith "Failed to get the validated blocks stream" + | Ok (vbs, _) -> Lwt.return vbs + in let loop_state = - create_loop_state block_stream initial_state.global_state.operation_worker + create_loop_state + ~get_valid_blocks_stream + ~heads_stream + initial_state.global_state.operation_worker in let on_error err = Events.(emit error_while_baking err) >>= fun () -> diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.mli b/src/proto_alpha/lib_delegate/baking_scheduling.mli index 83e167cfcf34..ed09245e450d 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.mli +++ b/src/proto_alpha/lib_delegate/baking_scheduling.mli @@ -29,7 +29,10 @@ open Protocol.Alpha_context type loop_state val create_loop_state : - proposal Lwt_stream.t -> Operation_worker.t -> loop_state + ?get_valid_blocks_stream:proposal Lwt_stream.t Lwt.t -> + heads_stream:proposal Lwt_stream.t -> + Operation_worker.t -> + loop_state val sleep_until : Time.Protocol.t -> unit Lwt.t option diff --git a/src/proto_alpha/lib_delegate/baking_state.ml b/src/proto_alpha/lib_delegate/baking_state.ml index 164807e2375e..dcf2f0ea2f66 100644 --- a/src/proto_alpha/lib_delegate/baking_state.ml +++ b/src/proto_alpha/lib_delegate/baking_state.ml @@ -357,7 +357,8 @@ let timeout_kind_encoding = ] type event = - | New_proposal of proposal + | New_valid_proposal of proposal + | New_head_proposal of proposal | Prequorum_reached of Operation_worker.candidate * Kind.preendorsement operation list | Quorum_reached of @@ -370,12 +371,18 @@ let event_encoding = [ case (Tag 0) - ~title:"New_proposal" + ~title:"New_valid_proposal" proposal_encoding - (function New_proposal p -> Some p | _ -> None) - (fun p -> New_proposal p); + (function New_valid_proposal p -> Some p | _ -> None) + (fun p -> New_valid_proposal p); case (Tag 1) + ~title:"New_head_proposal" + proposal_encoding + (function New_head_proposal p -> Some p | _ -> None) + (fun p -> New_head_proposal p); + case + (Tag 2) ~title:"Prequorum_reached" (tup2 Operation_worker.candidate_encoding @@ -388,7 +395,7 @@ let event_encoding = Prequorum_reached (candidate, Operation_pool.filter_preendorsements ops)); case - (Tag 2) + (Tag 3) ~title:"Quorum_reached" (tup2 Operation_worker.candidate_encoding @@ -400,7 +407,7 @@ let event_encoding = (fun (candidate, ops) -> Quorum_reached (candidate, Operation_pool.filter_endorsements ops)); case - (Tag 3) + (Tag 4) ~title:"Timeout" timeout_kind_encoding (function Timeout tk -> Some tk | _ -> None) @@ -844,10 +851,16 @@ let pp_timeout_kind fmt = function Format.fprintf fmt "time to bake next level at round %a" Round.pp at_round let pp_event fmt = function - | New_proposal proposal -> + | New_valid_proposal proposal -> + Format.fprintf + fmt + "new valid proposal received: %a" + pp_block_info + proposal.block + | New_head_proposal proposal -> Format.fprintf fmt - "new proposal received: %a" + "new applied proposal received: %a" pp_block_info proposal.block | Prequorum_reached (candidate, preendos) -> diff --git a/src/proto_alpha/lib_delegate/baking_state.mli b/src/proto_alpha/lib_delegate/baking_state.mli index e75a0668b318..5f1973c4e60a 100644 --- a/src/proto_alpha/lib_delegate/baking_state.mli +++ b/src/proto_alpha/lib_delegate/baking_state.mli @@ -159,7 +159,8 @@ type timeout_kind = val timeout_kind_encoding : timeout_kind Data_encoding.t type event = - | New_proposal of proposal + | New_valid_proposal of proposal + | New_head_proposal of proposal | Prequorum_reached of Operation_worker.candidate * Kind.preendorsement operation list | Quorum_reached of diff --git a/src/proto_alpha/lib_delegate/node_rpc.ml b/src/proto_alpha/lib_delegate/node_rpc.ml index 7e493fc8e3b2..3fcf46abba59 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.ml +++ b/src/proto_alpha/lib_delegate/node_rpc.ml @@ -253,7 +253,7 @@ let monitor_heads cctxt ~chain ?cache () = let* block_stream, stopper = Monitor_services.heads cctxt ~next_protocols chain in - let stream, stopper = + let stream = let map (block_hash, block_header) = let*! map_result = proposal cctxt ?cache ~chain block_hash block_header in match map_result with @@ -262,7 +262,7 @@ let monitor_heads cctxt ~chain ?cache () = let*! () = Events.(emit error_while_monitoring_heads err) in Lwt.return_none in - (Lwt_stream.filter_map_s map block_stream, stopper) + Lwt_stream.filter_map_s map block_stream in return (stream, stopper) diff --git a/src/proto_alpha/lib_delegate/state_transitions.ml b/src/proto_alpha/lib_delegate/state_transitions.ml index 43d07f528eb2..63559a80e202 100644 --- a/src/proto_alpha/lib_delegate/state_transitions.ml +++ b/src/proto_alpha/lib_delegate/state_transitions.ml @@ -699,7 +699,7 @@ let step (state : Baking_state.t) (event : Baking_state.event) : (* If it is time to bake the next level, stop everything currently going on and propose the next level block *) time_to_bake state at_round - | Idle, New_proposal block_info -> + | Idle, New_head_proposal block_info -> Events.( emit new_head @@ -707,8 +707,8 @@ let step (state : Baking_state.t) (event : Baking_state.event) : block_info.block.shell.level, block_info.block.round )) >>= fun () -> handle_new_proposal state block_info - | Awaiting_endorsements, New_proposal block_info - | Awaiting_preendorsements, New_proposal block_info -> + | Awaiting_endorsements, New_head_proposal block_info + | Awaiting_preendorsements, New_head_proposal block_info -> Events.( emit new_head @@ -732,3 +732,6 @@ let step (state : Baking_state.t) (event : Baking_state.event) : | Awaiting_endorsements, Prequorum_reached _ -> (* This cannot/should not happen *) do_nothing state + | _, New_valid_proposal _p -> + (* TODO: actually do something *) + do_nothing state diff --git a/src/proto_alpha/lib_delegate/test/test_scenario.ml b/src/proto_alpha/lib_delegate/test/test_scenario.ml index cb34cc8d3d63..86516ab4bb4f 100644 --- a/src/proto_alpha/lib_delegate/test/test_scenario.ml +++ b/src/proto_alpha/lib_delegate/test/test_scenario.ml @@ -24,7 +24,7 @@ let test_level_5 () = include Default_hooks let stop_on_event = function - | Baking_state.New_proposal {block; _} -> + | Baking_state.New_head_proposal {block; _} -> (* Stop the node as soon as we receive a proposal with a level higher than [level_to_reach]. *) block.shell.level > level_to_reach @@ -649,7 +649,7 @@ let test_scenario_m1 () = return (op_hash, op, propagation_vector) let stop_on_event = function - | Baking_state.New_proposal {block; _} -> block.shell.level > 4l + | Baking_state.New_head_proposal {block; _} -> block.shell.level > 4l | _ -> false end in let config = {default_config with timeout = 60} in @@ -679,7 +679,7 @@ let test_scenario_m2 () = include Default_hooks let stop_on_event = function - | Baking_state.New_proposal {block; _} -> block.shell.level > 5l + | Baking_state.New_head_proposal {block; _} -> block.shell.level > 5l | _ -> false end in let module Missing_node : Hooks = struct @@ -742,7 +742,7 @@ Scenario M3 let test_scenario_m3 () = let stop_on_event0 = function - | Baking_state.New_proposal {block; _} -> + | Baking_state.New_head_proposal {block; _} -> block.shell.level = 1l && Protocol.Alpha_context.Round.to_int32 block.round = 6l | _ -> false @@ -917,7 +917,7 @@ Scenario M5 let test_scenario_m5 () = let stop_on_event0 = function - | Baking_state.New_proposal {block; _} -> block.shell.level >= 2l + | Baking_state.New_head_proposal {block; _} -> block.shell.level >= 2l | _ -> false in let module Node_a_hooks : Hooks = struct @@ -1004,7 +1004,7 @@ Scenario M6 let test_scenario_m6 () = let b_proposal_2_1 = ref None in let stop_on_event0 = function - | Baking_state.New_proposal {block; _} -> block.shell.level > 4l + | Baking_state.New_head_proposal {block; _} -> block.shell.level > 4l | _ -> false in let module Node_a_hooks : Hooks = struct @@ -1134,7 +1134,7 @@ let test_scenario_m7 () = let c_received_2_1 = ref false in let d_received_2_1 = ref false in let stop_on_event0 = function - | Baking_state.New_proposal {block; _} -> block.shell.level > 4l + | Baking_state.New_head_proposal {block; _} -> block.shell.level > 4l | _ -> false in let check_chain_on_success0 node_label ~chain = @@ -1338,7 +1338,7 @@ Scenario M8 let test_scenario_m8 () = let b_proposal_2_0 = ref None in let stop_on_event0 = function - | Baking_state.New_proposal {block; _} -> block.shell.level > 4l + | Baking_state.New_head_proposal {block; _} -> block.shell.level > 4l | _ -> false in let on_inject_operation0 ~op_hash ~op = -- GitLab From 070515788a1eb8306b9c39b4ec1121750d15985d Mon Sep 17 00:00:00 2001 From: vbot Date: Thu, 26 Jan 2023 16:34:20 +0100 Subject: [PATCH 04/14] Alpha/Baker: start preendorsing as soon as a valid block arrives --- src/proto_alpha/lib_delegate/baking_events.ml | 78 +++++++ .../lib_delegate/baking_scheduling.ml | 3 + src/proto_alpha/lib_delegate/baking_state.ml | 32 ++- src/proto_alpha/lib_delegate/baking_state.mli | 9 +- .../lib_delegate/operation_worker.ml | 9 +- .../lib_delegate/state_transitions.ml | 211 ++++++++++++++---- .../lib_delegate/state_transitions.mli | 6 +- .../lib_delegate/test/test_scenario.ml | 2 +- 8 files changed, 290 insertions(+), 60 deletions(-) diff --git a/src/proto_alpha/lib_delegate/baking_events.ml b/src/proto_alpha/lib_delegate/baking_events.ml index b8ee3b7afbb9..c1e32c25b2bc 100644 --- a/src/proto_alpha/lib_delegate/baking_events.ml +++ b/src/proto_alpha/lib_delegate/baking_events.ml @@ -37,6 +37,19 @@ module State_transitions = struct let section = section @ ["transitions"] + let new_valid_proposal = + declare_3 + ~section + ~name:"new_valid_proposal" + ~level:Notice + ~msg:"received new proposal {block} at level {level}, round {round}" + ~pp1:Block_hash.pp + ("block", Block_hash.encoding) + ~pp2:pp_int32 + ("level", Data_encoding.int32) + ~pp3:Round.pp + ("round", Round.encoding) + let new_head = declare_3 ~section @@ -98,6 +111,63 @@ module State_transitions = struct ~msg:"received new head while waiting for a quorum" () + let applied_expected_proposal_received = + declare_1 + ~section + ~name:"applied_expected_proposal_received" + ~level:Info + ~msg:"received the expected application notice for {proposal}" + ~pp1:Block_hash.pp + ("proposal", Block_hash.encoding) + + let unexpected_new_head_while_waiting_for_application = + declare_0 + ~section + ~name:"unexpected_new_head_while_waiting_for_application" + ~level:Info + ~msg:"received new head while waiting for another proposal's application" + () + + let new_valid_proposal_while_waiting_for_qc = + declare_0 + ~section + ~name:"new_valid_proposal_while_waiting_for_qc" + ~level:Info + ~msg:"received new valid proposal while waiting for a quorum" + () + + let valid_proposal_received_after_application = + declare_0 + ~section + ~name:"valid_proposal_received_after_application" + ~level:Info + ~msg:"received valid proposal for a block already applied" + () + + let unexpected_pqc_while_waiting_for_application = + declare_2 + ~section + ~name:"unexpected_pqc_while_waiting_for_application" + ~level:Info + ~msg: + "received an unexpected prequorum for {prequorum} while waiting for \ + the proposal's {proposal} application" + ~pp1:Block_hash.pp + ("prequorum", Block_hash.encoding) + ~pp2:Block_hash.pp + ("proposal", Block_hash.encoding) + + let pqc_while_waiting_for_application = + declare_1 + ~section + ~name:"pqc_while_waiting_for_application" + ~level:Info + ~msg: + "received expected prequorum for {prequorum} while waiting for the \ + proposal's application" + ~pp1:Block_hash.pp + ("prequorum", Block_hash.encoding) + let unexpected_proposal_round = declare_2 ~section @@ -289,6 +359,14 @@ module State_transitions = struct ~pp2:Block_hash.pp ("expected_hash", Block_hash.encoding) + let handling_prequorum_on_non_applied_proposal = + declare_0 + ~section + ~name:"handling_prequorum_on_non_applied_proposal" + ~level:Error + ~msg:"Handling prequorum on a non-applied proposal" + () + let step_current_phase = declare_2 ~section diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index eeaa258deb07..6613d073dac2 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -690,6 +690,9 @@ let create_initial_state cctxt ?(synchronize = true) ~chain config { current_level = current_proposal.block.shell.level; latest_proposal = current_proposal; + is_latest_proposal_applied = + true (* this proposal is expected to be the current head *); + delayed_prequorum = None; locked_round = None; endorsable_payload = None; elected_block; diff --git a/src/proto_alpha/lib_delegate/baking_state.ml b/src/proto_alpha/lib_delegate/baking_state.ml index dcf2f0ea2f66..0976fcef75ec 100644 --- a/src/proto_alpha/lib_delegate/baking_state.ml +++ b/src/proto_alpha/lib_delegate/baking_state.ml @@ -280,6 +280,9 @@ type elected_block = { type level_state = { current_level : int32; latest_proposal : proposal; + is_latest_proposal_applied : bool; + delayed_prequorum : + (Operation_worker.candidate * Kind.preendorsement operation list) option; (* Last proposal received where we injected an endorsement (thus we have seen 2f+1 preendorsements) *) locked_round : locked_round option; @@ -292,7 +295,11 @@ type level_state = { next_level_proposed_round : Round.t option; } -type phase = Idle | Awaiting_preendorsements | Awaiting_endorsements +type phase = + | Idle + | Awaiting_preendorsements + | Awaiting_application + | Awaiting_endorsements let phase_encoding = let open Data_encoding in @@ -312,9 +319,15 @@ let phase_encoding = (function Awaiting_preendorsements -> Some () | _ -> None) (fun () -> Awaiting_preendorsements); case - ~title:"Awaiting_endorsements" + ~title:"Awaiting_application" (Tag 2) unit + (function Awaiting_application -> Some () | _ -> None) + (fun () -> Awaiting_application); + case + ~title:"Awaiting_endorsements" + (Tag 3) + unit (function Awaiting_endorsements -> Some () | _ -> None) (fun () -> Awaiting_endorsements); ] @@ -790,6 +803,8 @@ let pp_level_state fmt { current_level; latest_proposal; + is_latest_proposal_applied; + delayed_prequorum; locked_round; endorsable_payload; elected_block; @@ -799,11 +814,13 @@ let pp_level_state fmt } = Format.fprintf fmt - "@[Level state:@ current level: %ld@ @[proposal:@ %a@]@ locked \ - round: %a@ endorsable payload: %a@ elected block: %a@ @[own delegate \ - slots:@ %a@]@ @[next level own delegate slots:@ %a@]@ next level \ - proposed round: %a@]" + "@[Level state:@ current level: %ld@ @[proposal (applied:%b, \ + delayed prequorum:%b):@ %a@]@ locked round: %a@ endorsable payload: %a@ \ + elected block: %a@ @[own delegate slots:@ %a@]@ @[next level \ + own delegate slots:@ %a@]@ next level proposed round: %a@]" current_level + is_latest_proposal_applied + (Option.is_some delayed_prequorum) pp_proposal latest_proposal (pp_option pp_locked_round) @@ -822,6 +839,7 @@ let pp_level_state fmt let pp_phase fmt = function | Idle -> Format.fprintf fmt "idle" | Awaiting_preendorsements -> Format.fprintf fmt "awaiting preendorsements" + | Awaiting_application -> Format.fprintf fmt "awaiting application" | Awaiting_endorsements -> Format.fprintf fmt "awaiting endorsements" let pp_round_state fmt {current_round; current_phase} = @@ -860,7 +878,7 @@ let pp_event fmt = function | New_head_proposal proposal -> Format.fprintf fmt - "new applied proposal received: %a" + "new head proposal received: %a" pp_block_info proposal.block | Prequorum_reached (candidate, preendos) -> diff --git a/src/proto_alpha/lib_delegate/baking_state.mli b/src/proto_alpha/lib_delegate/baking_state.mli index 5f1973c4e60a..f8e1d779d6c2 100644 --- a/src/proto_alpha/lib_delegate/baking_state.mli +++ b/src/proto_alpha/lib_delegate/baking_state.mli @@ -128,6 +128,9 @@ type elected_block = { type level_state = { current_level : int32; latest_proposal : proposal; + is_latest_proposal_applied : bool; + delayed_prequorum : + (Operation_worker.candidate * Kind.preendorsement operation list) option; locked_round : locked_round option; endorsable_payload : endorsable_payload option; elected_block : elected_block option; @@ -136,7 +139,11 @@ type level_state = { next_level_proposed_round : Round.t option; } -type phase = Idle | Awaiting_preendorsements | Awaiting_endorsements +type phase = + | Idle + | Awaiting_preendorsements + | Awaiting_application + | Awaiting_endorsements val phase_encoding : phase Data_encoding.t diff --git a/src/proto_alpha/lib_delegate/operation_worker.ml b/src/proto_alpha/lib_delegate/operation_worker.ml index 1e20efa00156..9f54d826da64 100644 --- a/src/proto_alpha/lib_delegate/operation_worker.ml +++ b/src/proto_alpha/lib_delegate/operation_worker.ml @@ -23,13 +23,6 @@ (* *) (*****************************************************************************) -(* TODO: - add events + - running state introspection to recover/restart on failure - - Do we need a mutex ? -*) - open Protocol_client_context open Protocol open Alpha_context @@ -64,7 +57,7 @@ module Events = struct ~name:"pqc_reached" ~level:Debug ~msg: - "pre-quorum reached (voting power: {voting_power}, {preendorsements} \ + "prequorum reached (voting power: {voting_power}, {preendorsements} \ preendorsements)" ~pp1:pp_int ("voting_power", Data_encoding.int31) diff --git a/src/proto_alpha/lib_delegate/state_transitions.ml b/src/proto_alpha/lib_delegate/state_transitions.ml index 63559a80e202..79640027e695 100644 --- a/src/proto_alpha/lib_delegate/state_transitions.ml +++ b/src/proto_alpha/lib_delegate/state_transitions.ml @@ -95,19 +95,33 @@ let make_preendorse_action state proposal = in Inject_preendorsements {preendorsements} -let update_proposal state proposal = +let update_proposal ~is_proposal_applied state proposal = Events.(emit updating_latest_proposal proposal.block.hash) >>= fun () -> - let new_level_state = {state.level_state with latest_proposal = proposal} in + let prev_proposal = state.level_state.latest_proposal in + let is_latest_proposal_applied = + (* mark as applied if it is indeed applied or if this specific proposal was + already marked as applied *) + is_proposal_applied + || prev_proposal.block.hash = proposal.block.hash + && state.level_state.is_latest_proposal_applied + in + let new_level_state = + { + state.level_state with + is_latest_proposal_applied; + latest_proposal = proposal; + } + in Lwt.return {state with level_state = new_level_state} -let may_update_proposal state (proposal : proposal) = +let may_update_proposal ~is_proposal_applied state (proposal : proposal) = assert ( Compare.Int32.( state.level_state.latest_proposal.block.shell.level = proposal.block.shell.level)) ; if Round.(state.level_state.latest_proposal.block.round < proposal.block.round) - then update_proposal state proposal + then update_proposal ~is_proposal_applied state proposal else Lwt.return state let preendorse state proposal = @@ -118,7 +132,13 @@ let preendorse state proposal = else Events.(emit attempting_preendorse_proposal proposal.block.hash) >>= fun () -> - let new_state = update_current_phase state Awaiting_preendorsements in + let new_state = + (* We await for the block to be applied before updating its + locked values. *) + if state.level_state.is_latest_proposal_applied then + update_current_phase state Awaiting_preendorsements + else update_current_phase state Awaiting_application + in Lwt.return (new_state, make_preendorse_action state proposal) let extract_pqc state (new_proposal : proposal) = @@ -178,9 +198,35 @@ let may_update_endorsable_payload_with_internal_pqc state in {state with level_state = new_level_state} -let rec handle_new_proposal state (new_proposal : proposal) = +let may_update_is_latest_proposal_applied ~is_proposal_applied state + new_proposal = + let current_proposal = state.level_state.latest_proposal in + if + is_proposal_applied + && Block_hash.(current_proposal.block.hash = new_proposal.block.hash) + then + let new_level_state = + {state.level_state with is_latest_proposal_applied = true} + in + let new_state = {state with level_state = new_level_state} in + new_state + else state + +let has_already_been_handled state new_proposal = + let current_proposal = state.level_state.latest_proposal in + Block_hash.(current_proposal.block.hash = new_proposal.block.hash) + && state.level_state.is_latest_proposal_applied + +let rec handle_proposal ~is_proposal_applied state (new_proposal : proposal) = let current_level = state.level_state.current_level in let new_proposal_level = new_proposal.block.shell.level in + let current_proposal = state.level_state.latest_proposal in + let state = + may_update_is_latest_proposal_applied + ~is_proposal_applied + state + new_proposal + in if Compare.Int32.(current_level > new_proposal_level) then (* The baker is ahead, a reorg may have happened. Do nothing: wait for the node to send us the branch's head. This new head @@ -188,12 +234,11 @@ let rec handle_new_proposal state (new_proposal : proposal) = proposal and thus, its level should be at least the same as our current proposal's level. *) Events.(emit baker_is_ahead_of_node (current_level, new_proposal_level)) - >>= fun () -> Lwt.return (state, Do_nothing) + >>= fun () -> do_nothing state else if Compare.Int32.(current_level = new_proposal_level) then - (* The received head is a new proposal for the current level: - let's check if it's a valid one for us. *) - let current_proposal = state.level_state.latest_proposal in if + (* The received head is a new proposal for the current level: + let's check if it's a valid one for us. *) Block_hash.( current_proposal.predecessor.hash <> new_proposal.predecessor.hash) then @@ -201,7 +246,7 @@ let rec handle_new_proposal state (new_proposal : proposal) = emit new_proposal_is_on_another_branch (current_proposal.predecessor.hash, new_proposal.predecessor.hash)) - >>= fun () -> may_switch_branch state new_proposal + >>= fun () -> may_switch_branch ~is_proposal_applied state new_proposal else is_acceptable_proposal_for_current_level state new_proposal >>= function | Invalid -> @@ -216,15 +261,16 @@ let rec handle_new_proposal state (new_proposal : proposal) = (* The proposal is outdated: we update to be able to extract its included endorsements but we do not endorse it *) Events.(emit outdated_proposal new_proposal.block.hash) >>= fun () -> - may_update_proposal state new_proposal >>= fun state -> - do_nothing state + may_update_proposal ~is_proposal_applied state new_proposal + >>= fun state -> do_nothing state | Valid_proposal -> ( (* Valid_proposal => proposal.round = current_round *) (* Check whether we need to update our endorsable payload *) let new_state = may_update_endorsable_payload_with_internal_pqc state new_proposal in - may_update_proposal new_state new_proposal >>= fun new_state -> + may_update_proposal ~is_proposal_applied new_state new_proposal + >>= fun new_state -> (* The proposal is valid but maybe we already locked on a payload *) match new_state.level_state.locked_round with | Some locked_round -> ( @@ -242,18 +288,22 @@ let rec handle_new_proposal state (new_proposal : proposal) = (* This PQC is above our locked_round, we can preendorse it *) preendorse new_state new_proposal | _ -> - (* We shouldn't preendorse this proposal, but we should at - least watch (pre)quorums events on it *) - let new_state = - update_current_phase new_state Awaiting_preendorsements - in - Lwt.return (new_state, Watch_proposal)) + (* We shouldn't preendorse this proposal, but we + should at least watch (pre)quorums events on it + but only when it is applied otherwise we await + for the proposal to be applied. *) + if is_proposal_applied then + let new_state = + update_current_phase new_state Awaiting_preendorsements + in + Lwt.return (new_state, Watch_proposal) + else do_nothing new_state) | None -> (* Otherwise, we did not lock on any payload, thus we can preendorse it *) preendorse new_state new_proposal) else - (* new_proposal.level > current_level *) + (* Last case: new_proposal_level > current_level *) (* Possible scenarios: - we received a block for a next level - we received our own block @@ -267,6 +317,8 @@ let rec handle_new_proposal state (new_proposal : proposal) = { current_level = new_level; latest_proposal = new_proposal; + is_latest_proposal_applied = is_proposal_applied; + delayed_prequorum = None; (* Unlock values *) locked_round = None; endorsable_payload = None; @@ -278,19 +330,22 @@ let rec handle_new_proposal state (new_proposal : proposal) = in (* recursive call with the up-to-date state to handle the new level proposals *) - handle_new_proposal {state with level_state; round_state} new_proposal + handle_proposal + ~is_proposal_applied + {state with level_state; round_state} + new_proposal in let action = Update_to_level {new_level_proposal = new_proposal; compute_new_state} in Lwt.return (state, action) -and may_switch_branch state new_proposal = +and may_switch_branch ~is_proposal_applied state new_proposal = let switch_branch state = Events.(emit switching_branch ()) >>= fun () -> (* If we are on a different branch, we also need to update our [round_state] accordingly. - The recursive call to [handle_new_proposal] cannot end up + The recursive call to [handle_proposal] cannot end up with an invalid proposal as it's on a different branch, thus there is no need to backtrack to the former state as the new proposal must end up being the new [latest_proposal]. That's @@ -298,10 +353,11 @@ and may_switch_branch state new_proposal = let round_update = { Baking_actions.new_round_proposal = new_proposal; - handle_proposal = (fun state -> handle_new_proposal state new_proposal); + handle_proposal = + (fun state -> handle_proposal ~is_proposal_applied state new_proposal); } in - update_proposal state new_proposal >>= fun new_state -> + update_proposal ~is_proposal_applied state new_proposal >>= fun new_state -> (* TODO if the branch proposal is outdated, we should trigger an [End_of_round] to participate *) Lwt.return (new_state, Synchronize_round round_update) @@ -337,6 +393,25 @@ and may_switch_branch state new_proposal = Events.(emit branch_proposal_has_same_prequorum ()) >>= fun () -> do_nothing state +let may_register_early_prequorum state ((candidate, _) as received_prequorum) = + if + Block_hash.( + candidate.Operation_worker.hash + <> state.level_state.latest_proposal.block.hash) + then + Events.( + emit + unexpected_pqc_while_waiting_for_application + (candidate.hash, state.level_state.latest_proposal.block.hash)) + >>= fun () -> do_nothing state + else + Events.(emit pqc_while_waiting_for_application candidate.hash) >>= fun () -> + let new_level_state = + {state.level_state with delayed_prequorum = Some received_prequorum} + in + let new_state = {state with level_state = new_level_state} in + do_nothing new_state + (** In the association map [delegate_slots], the function returns an optional pair ([delegate], [endorsing_slot]) if for the current [round], the validator [delegate] has a endorsing slot. *) @@ -606,6 +681,9 @@ let prequorum_reached_when_awaiting_preendorsements state candidate unexpected_prequorum_received (candidate.hash, latest_proposal.block.hash)) >>= fun () -> do_nothing state + else if not state.level_state.is_latest_proposal_applied then + Events.(emit handling_prequorum_on_non_applied_proposal ()) >>= fun () -> + do_nothing state else let prequorum = { @@ -672,6 +750,24 @@ let quorum_reached_when_waiting_endorsements state candidate endorsement_qc = in do_nothing new_state +let handle_expected_applied_proposal (state : Baking_state.t) = + let new_level_state = + {state.level_state with is_latest_proposal_applied = true} + in + let new_state = {state with level_state = new_level_state} in + match new_state.level_state.delayed_prequorum with + | None -> + (* The application arrived before the prequorum: just wait for the prequorum. *) + let new_state = update_current_phase new_state Awaiting_preendorsements in + do_nothing new_state + | Some (candidate, preendorsement_qc) -> + (* The application arrived after the prequorum: handle the + prequorum received earlier. *) + prequorum_reached_when_awaiting_preendorsements + new_state + candidate + preendorsement_qc + (* Hypothesis: - The state is not to be modified outside this module (NB: there are exceptions in Baking_actions: the corner cases @@ -699,25 +795,60 @@ let step (state : Baking_state.t) (event : Baking_state.event) : (* If it is time to bake the next level, stop everything currently going on and propose the next level block *) time_to_bake state at_round - | Idle, New_head_proposal block_info -> + | Idle, New_head_proposal proposal -> Events.( emit new_head - ( block_info.block.hash, - block_info.block.shell.level, - block_info.block.round )) - >>= fun () -> handle_new_proposal state block_info - | Awaiting_endorsements, New_head_proposal block_info - | Awaiting_preendorsements, New_head_proposal block_info -> + (proposal.block.hash, proposal.block.shell.level, proposal.block.round)) + >>= fun () -> handle_proposal ~is_proposal_applied:true state proposal + | Awaiting_application, New_head_proposal proposal -> + if + Block_hash.( + state.level_state.latest_proposal.block.hash <> proposal.block.hash) + then + Events.( + emit + new_head + ( proposal.block.hash, + proposal.block.shell.level, + proposal.block.round )) + >>= fun () -> + Events.(emit unexpected_new_head_while_waiting_for_application ()) + >>= fun () -> handle_proposal ~is_proposal_applied:true state proposal + else + Events.(emit applied_expected_proposal_received proposal.block.hash) + >>= fun () -> handle_expected_applied_proposal state + | Awaiting_endorsements, New_head_proposal proposal + | Awaiting_preendorsements, New_head_proposal proposal -> Events.( emit new_head - ( block_info.block.hash, - block_info.block.shell.level, - block_info.block.round )) + (proposal.block.hash, proposal.block.shell.level, proposal.block.round)) >>= fun () -> Events.(emit new_head_while_waiting_for_qc ()) >>= fun () -> - handle_new_proposal state block_info + handle_proposal ~is_proposal_applied:true state proposal + | Idle, New_valid_proposal proposal -> + Events.( + emit + new_valid_proposal + (proposal.block.hash, proposal.block.shell.level, proposal.block.round)) + >>= fun () -> handle_proposal ~is_proposal_applied:false state proposal + | Awaiting_application, New_valid_proposal proposal + | Awaiting_endorsements, New_valid_proposal proposal + | Awaiting_preendorsements, New_valid_proposal proposal -> + Events.( + emit + new_valid_proposal + (proposal.block.hash, proposal.block.shell.level, proposal.block.round)) + >>= fun () -> + if has_already_been_handled state proposal then + Events.(emit valid_proposal_received_after_application ()) >>= fun () -> + do_nothing state + else + Events.(emit new_valid_proposal_while_waiting_for_qc ()) >>= fun () -> + handle_proposal ~is_proposal_applied:false state proposal + | Awaiting_application, Prequorum_reached (candidate, preendorsement_qc) -> + may_register_early_prequorum state (candidate, preendorsement_qc) | Awaiting_preendorsements, Prequorum_reached (candidate, preendorsement_qc) -> prequorum_reached_when_awaiting_preendorsements @@ -729,9 +860,7 @@ let step (state : Baking_state.t) (event : Baking_state.event) : (* Unreachable cases *) | Idle, (Prequorum_reached _ | Quorum_reached _) | Awaiting_preendorsements, Quorum_reached _ - | Awaiting_endorsements, Prequorum_reached _ -> + | Awaiting_endorsements, Prequorum_reached _ + | Awaiting_application, Quorum_reached _ -> (* This cannot/should not happen *) do_nothing state - | _, New_valid_proposal _p -> - (* TODO: actually do something *) - do_nothing state diff --git a/src/proto_alpha/lib_delegate/state_transitions.mli b/src/proto_alpha/lib_delegate/state_transitions.mli index adc584312329..596efad2100f 100644 --- a/src/proto_alpha/lib_delegate/state_transitions.mli +++ b/src/proto_alpha/lib_delegate/state_transitions.mli @@ -43,14 +43,16 @@ val make_consensus_list : val make_preendorse_action : state -> proposal -> action -val may_update_proposal : state -> proposal -> state Lwt.t +val may_update_proposal : + is_proposal_applied:bool -> state -> proposal -> state Lwt.t val preendorse : state -> proposal -> (state * action) Lwt.t val extract_pqc : state -> proposal -> (Kind.preendorsement operation list * Round.t) option -val handle_new_proposal : state -> proposal -> (state * action) Lwt.t +val handle_proposal : + is_proposal_applied:bool -> state -> proposal -> (state * action) Lwt.t val round_proposer : state -> diff --git a/src/proto_alpha/lib_delegate/test/test_scenario.ml b/src/proto_alpha/lib_delegate/test/test_scenario.ml index 86516ab4bb4f..afc642bf9457 100644 --- a/src/proto_alpha/lib_delegate/test/test_scenario.ml +++ b/src/proto_alpha/lib_delegate/test/test_scenario.ml @@ -734,7 +734,7 @@ Scenario M3 from other nodes only go to A. 3. The chain should not make progress. Since we have both bootstrap1 and bootstrap2 in delegate selection they have equal voting power. Therefore - it is necessary to have 2 votes for pre-quorums (which is achieved when A + it is necessary to have 2 votes for prequorums (which is achieved when A is proposing) and 2 votes for quorums (impossible because B has no way to obtain PQC and thus cannot send endorsements). -- GitLab From fd4bbdcd8321acdb6f8ec68a15f043c11c003518 Mon Sep 17 00:00:00 2001 From: vbot Date: Wed, 1 Feb 2023 11:53:03 +0100 Subject: [PATCH 05/14] Mockup: add newly introduced RPCs --- src/lib_mockup/local_services.ml | 40 ++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/lib_mockup/local_services.ml b/src/lib_mockup/local_services.ml index 9a6889d19a0c..bfb1e1765a4f 100644 --- a/src/lib_mockup/local_services.ml +++ b/src/lib_mockup/local_services.ml @@ -855,6 +855,26 @@ module Make (E : MENV) = struct write_context_callback operation_bytes) + let monitor_validated_blocks () = + Directory.register + Directory.empty + Tezos_shell_services.Monitor_services.S.validated_blocks + (fun () q () -> + let chain = + match List.hd q#chains with None -> `Main | Some chain -> chain + in + with_chain ~caller_name:"monitor validated blocks" chain (fun () -> + let block_header = + Block_header. + { + shell = E.rpc_context.block_header; + protocol_data = E.protocol_data; + } + in + let block_hash = E.rpc_context.block_hash in + Tezos_rpc.Answer.return + (E.chain_id, block_hash, block_header, [[]; []; []; []]))) + let monitor_heads () = Directory.register Directory.empty @@ -871,6 +891,24 @@ module Make (E : MENV) = struct let block_hash = E.rpc_context.block_hash in Tezos_rpc.Answer.return (block_hash, block_header))) + let raw_header () = + Directory.prefix + (Tezos_rpc.Path.prefix Chain_services.path Block_services.path) + (Directory.register + Directory.empty + E.Block_services.S.raw_header + (fun (((), chain), _block) () () -> + with_chain ~caller_name:"raw header" chain (fun () -> + let block_header_b = + Data_encoding.Binary.to_bytes_exn + Tezos_base.Block_header.encoding + { + shell = E.rpc_context.block_header; + protocol_data = E.protocol_data; + } + in + Tezos_rpc.Answer.return block_header_b))) + let header () = Directory.prefix (Tezos_rpc.Path.prefix Chain_services.path Block_services.path) @@ -962,7 +1000,9 @@ module Make (E : MENV) = struct |> merge (inject_block write_context_callback) |> merge (live_blocks ()) |> merge (preapply_block ()) + |> merge (monitor_validated_blocks ()) |> merge (monitor_heads ()) + |> merge (raw_header ()) |> merge (header ()) |> merge (operations ()) |> merge (resulting_context_hash ()) -- GitLab From ef499b0182de933ace90fd7ff8828be1dbb82cee Mon Sep 17 00:00:00 2001 From: vbot Date: Tue, 31 Jan 2023 17:58:07 +0100 Subject: [PATCH 06/14] Alpha/Mockup: add the new necessary RPCs in the mockup baker --- .../test/mockup_simulator/faked_services.ml | 61 ++++++++++++++++--- .../test/mockup_simulator/mockup_simulator.ml | 58 +++++++++++++++++- .../mockup_simulator/mockup_simulator.mli | 9 +++ 3 files changed, 116 insertions(+), 12 deletions(-) diff --git a/src/proto_alpha/lib_delegate/test/mockup_simulator/faked_services.ml b/src/proto_alpha/lib_delegate/test/mockup_simulator/faked_services.ml index 624565b40889..e99d009738bf 100644 --- a/src/proto_alpha/lib_delegate/test/mockup_simulator/faked_services.ml +++ b/src/proto_alpha/lib_delegate/test/mockup_simulator/faked_services.ml @@ -8,7 +8,14 @@ module type Mocked_services_hooks = sig type mempool = Mockup.M.Block_services.Mempool.t (** The baker and endorser rely on this stream to be notified of new - blocks. *) + valid blocks. *) + val monitor_validated_blocks : + unit -> + (Chain_id.t * Block_hash.t * Block_header.t * Operation.t list list) + Tezos_rpc.Answer.stream + + (** The baker and endorser rely on this stream to be notified of new + heads. *) val monitor_heads : unit -> (Block_hash.t * Block_header.t) Tezos_rpc.Answer.stream @@ -16,6 +23,10 @@ module type Mocked_services_hooks = sig val protocols : Block_services.block -> Block_services.protocols tzresult Lwt.t + (** [raw_header] returns the byte encoded block header of the block + associated to the given block specification. *) + val raw_header : Block_services.block -> bytes tzresult Lwt.t + (** [header] returns the block header of the block associated to the given block specification. *) val header : @@ -112,6 +123,13 @@ end type hooks = (module Mocked_services_hooks) module Make (Hooks : Mocked_services_hooks) = struct + let monitor_validated_blocks = + Directory.gen_register0 + Directory.empty + Monitor_services.S.validated_blocks + (fun _next_protocol _ -> + Tezos_rpc.Answer.return_stream (Hooks.monitor_validated_blocks ())) + let monitor_heads = Directory.gen_register1 Directory.empty @@ -137,6 +155,14 @@ module Make (Hooks : Mocked_services_hooks) = struct Directory.register Directory.empty service (fun (_, block) () () -> Hooks.protocols block) + let raw_header = + Directory.prefix + (Tezos_rpc.Path.prefix Chain_services.path Block_services.path) + @@ Directory.register + Directory.empty + Mockup.M.Block_services.S.raw_header + (fun (((), _chain), block) _ _ -> Hooks.raw_header block) + let header = Directory.prefix (Tezos_rpc.Path.prefix Chain_services.path Block_services.path) @@ -281,15 +307,30 @@ module Make (Hooks : Mocked_services_hooks) = struct (fun (_, block) () () -> Hooks.raw_protocol_data block) let shell_directory chain_id = - let merge = Directory.merge in - Directory.empty |> merge monitor_heads |> merge protocols |> merge header - |> merge operations |> merge hash |> merge shell_header - |> merge resulting_context_hash - |> merge (chain chain_id) - |> merge inject_block |> merge inject_operation |> merge monitor_operations - |> merge list_blocks |> merge live_blocks |> merge raw_protocol_data - |> merge broadcast_block |> merge broadcast_operation - |> merge monitor_bootstrapped + List.fold_left + Directory.merge + Directory.empty + [ + monitor_validated_blocks; + monitor_heads; + protocols; + raw_header; + header; + operations; + hash; + shell_header; + resulting_context_hash; + chain chain_id; + inject_block; + inject_operation; + monitor_operations; + list_blocks; + live_blocks; + raw_protocol_data; + broadcast_block; + broadcast_operation; + monitor_bootstrapped; + ] let directory chain_id = let proto_directory = diff --git a/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml b/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml index 346cea3ee712..2c4d8230c19f 100644 --- a/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml +++ b/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml @@ -60,8 +60,13 @@ type state = { that functionality. *) ctxt_table : Tezos_protocol_environment.rpc_context Context_hash.Table.t; (** The context table allows us to look up rpc_context by its hash. *) + validated_blocks_pipe : + (Block_hash.t * Block_header.t * Operation.t list list) Lwt_pipe.Unbounded.t; + (** [validated_blocks_pipe] is used to implement the + [monitor_validated_blocks] RPC. *) heads_pipe : (Block_hash.t * Block_header.t) Lwt_pipe.Unbounded.t; - (** [heads_pipe] is used to implement the [monitor_heads] RPC. *) + (** [heads_pipe] is used to implement the [monitor_heads] + RPC. *) operations_pipe : (Operation_hash.t * Mockup.M.Protocol.operation) option Lwt_pipe.Unbounded.t; (** [operations_pipe] is used to implement the [operations_pipe] RPC. *) @@ -108,6 +113,12 @@ module type Hooks = sig tzresult Lwt.t + val on_new_validated_block : + block_hash:Block_hash.t -> + block_header:Block_header.t -> + operations:Operation.t list list -> + (Block_hash.t * Block_header.t * Operation.t list list) option Lwt.t + val on_new_head : block_hash:Block_hash.t -> block_header:Block_header.t -> @@ -238,11 +249,32 @@ let make_mocked_services_hooks (state : state) (user_hooks : (module Hooks)) : let module Impl : Faked_services.Mocked_services_hooks = struct type mempool = Mockup.M.Block_services.Mempool.t + let monitor_validated_blocks () = + let next () = + let rec pop_until_ok () = + Lwt_pipe.Unbounded.pop state.validated_blocks_pipe + >>= fun (block_hash, block_header, operations) -> + User_hooks.on_new_validated_block + ~block_hash + ~block_header + ~operations + >>= function + | None -> pop_until_ok () + | Some (hash, head, operations) -> + Lwt.return_some (chain_id, hash, head, operations) + in + pop_until_ok () + in + let shutdown () = () in + Tezos_rpc.Answer.{next; shutdown} + let monitor_heads () = let next () = let rec pop_until_ok () = Lwt_pipe.Unbounded.pop state.heads_pipe >>= fun (block_hash, block_header) -> + (* Sleep a 0.1s to simulate a block application delay *) + Lwt_unix.sleep 0.1 >>= fun () -> User_hooks.on_new_head ~block_hash ~block_header >>= function | None -> pop_until_ok () | Some head -> Lwt.return_some head @@ -295,6 +327,19 @@ let make_mocked_services_hooks (state : state) (user_hooks : (module Hooks)) : else Protocol.hash); } + let raw_header (block : Tezos_shell_services.Block_services.block) : + bytes tzresult Lwt.t = + locate_block state block >>=? fun x -> + let protocol_data = + Data_encoding.Binary.to_bytes_exn + Protocol.block_header_data_encoding + x.protocol_data + in + return + (Data_encoding.Binary.to_bytes_exn + Tezos_base.Block_header.encoding + {shell = x.rpc_context.block_header; protocol_data}) + let header (block : Tezos_shell_services.Block_services.block) : Mockup.M.Block_services.block_header tzresult Lwt.t = locate_block state block >>=? fun x -> @@ -716,6 +761,9 @@ let rec listener ~(user_hooks : (module Hooks)) ~state ~broadcast_pipe = process_block state block_hash block_header operations >>=? fun () -> User_hooks.check_chain_after_processing ~level ~round ~chain:state.chain >>=? fun () -> + Lwt_pipe.Unbounded.push + state.validated_blocks_pipe + (block_hash, block_header, operations) ; Lwt_pipe.Unbounded.push state.heads_pipe (block_hash, block_header) ; listener ~user_hooks ~state ~broadcast_pipe @@ -735,6 +783,7 @@ let create_fake_node_state ~i ~live_depth } in let chain0 = [genesis0] in + let validated_blocks_pipe = Lwt_pipe.Unbounded.create () in let heads_pipe = Lwt_pipe.Unbounded.create () in let operations_pipe = Lwt_pipe.Unbounded.create () in let genesis_block_true_hash = @@ -744,6 +793,8 @@ let create_fake_node_state ~i ~live_depth protocol_data = block_header0.protocol_data; } in + (* Only push genesis block as a new head, not a valid block: it is + the shell's semantics to not advertise "transition" blocks. *) Lwt_pipe.Unbounded.push heads_pipe (rpc_context0.block_hash, block_header0) ; return { @@ -768,6 +819,7 @@ let create_fake_node_state ~i ~live_depth .Block_header.context, rpc_context0 ); ]); + validated_blocks_pipe; heads_pipe; operations_pipe; streaming_operations = false; @@ -1026,7 +1078,6 @@ let make_genesis_context ~delegate_selection ~initial_seed ~round0 ~round1 in return (block_header, rpc_context) in - let level0_round0_duration = Protocol.Alpha_context.Round.round_duration round_durations @@ -1052,6 +1103,9 @@ module Default_hooks : Hooks = struct let on_inject_operation ~op_hash ~op = return (op_hash, op, default_propagation_vector) + let on_new_validated_block ~block_hash ~block_header ~operations = + Lwt.return (Some (block_hash, block_header, operations)) + let on_new_head ~block_hash ~block_header = Lwt.return (Some (block_hash, block_header)) diff --git a/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.mli b/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.mli index c01782653bdf..d9da9a19074f 100644 --- a/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.mli +++ b/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.mli @@ -74,6 +74,15 @@ module type Hooks = sig tzresult Lwt.t + (** This is called when a new validated block is going to be sent as + the response to a "monitor validated blocks" RPC call. Returning + [None] here terminates the process for the baker. *) + val on_new_validated_block : + block_hash:Block_hash.t -> + block_header:Block_header.t -> + operations:Operation.t list list -> + (Block_hash.t * Block_header.t * Operation.t list list) option Lwt.t + (** This is called when a new head is going to be sent as the response to a "monitor heads" RPC call. Returning [None] here terminates the process for the baker. *) -- GitLab From b286736dc8e2251d940498e8717894814101db10 Mon Sep 17 00:00:00 2001 From: vbot Date: Tue, 31 Jan 2023 17:59:33 +0100 Subject: [PATCH 07/14] Alpha/Mockup: lie on the genesis' proto level in mockup simulator --- src/proto_alpha/lib_client/mockup.ml | 11 ++++++- .../test/mockup_simulator/mockup_simulator.ml | 32 +++++++++++++++++-- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/proto_alpha/lib_client/mockup.ml b/src/proto_alpha/lib_client/mockup.ml index 62ed51f59076..31f3685e652c 100644 --- a/src/proto_alpha/lib_client/mockup.ml +++ b/src/proto_alpha/lib_client/mockup.ml @@ -250,6 +250,15 @@ module Forge = struct Bytes.create Constants.proof_of_work_nonce_size let make_shell ~level ~predecessor ~timestamp ~fitness ~operations_hash = + (* We initialize the [proto_level] at 1 in order to be able to + mimick a transition block in the baker. The baker distinguishes + the first block of a protocol by comparing a block and its + predecessor's proto level. If there is a difference, it must + mean that the block is a transition one. If we start at 0, we + cannot "hack" a transition block by decrementing the genesis + predecessor's protocol level because protocol levels are + encoded as uint8. *) + let proto_level = 1 in Tezos_base.Block_header. { level; @@ -257,7 +266,7 @@ module Forge = struct timestamp; fitness; operations_hash; - proto_level = 0; + proto_level; validation_passes = 0; context = Context_hash.zero; } diff --git a/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml b/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml index 2c4d8230c19f..82ae1d67c295 100644 --- a/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml +++ b/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml @@ -327,9 +327,36 @@ let make_mocked_services_hooks (state : state) (user_hooks : (module Hooks)) : else Protocol.hash); } + let may_lie_on_proto_level block x = + (* As for ../protocols, the baker distinguishes activation + blocks from "normal" blocks by comparing the [proto_level] of + the shell header and its predecessor. If the predecessor's + one is different, it must mean that we are considering an + activation block and must not endorse. Here, we do a bit of + hacking in order to return a different proto_level for the + predecessor of the genesis block which is considered as the + current protocol activation block. To perfectly mimic what is + supposed to happen, the first mocked up block created should + be made in the genesis protocol, however, it is not what's + done in the mockup mode. *) + let is_predecessor_of_genesis = + match block with + | `Hash (requested_hash, rel) -> + Int.equal rel 0 + && Block_hash.equal requested_hash genesis_predecessor_block_hash + | _ -> false + in + if is_predecessor_of_genesis then + { + x.rpc_context.block_header with + proto_level = pred x.rpc_context.block_header.proto_level; + } + else x.rpc_context.block_header + let raw_header (block : Tezos_shell_services.Block_services.block) : bytes tzresult Lwt.t = locate_block state block >>=? fun x -> + let shell = may_lie_on_proto_level block x in let protocol_data = Data_encoding.Binary.to_bytes_exn Protocol.block_header_data_encoding @@ -338,16 +365,17 @@ let make_mocked_services_hooks (state : state) (user_hooks : (module Hooks)) : return (Data_encoding.Binary.to_bytes_exn Tezos_base.Block_header.encoding - {shell = x.rpc_context.block_header; protocol_data}) + {shell; protocol_data}) let header (block : Tezos_shell_services.Block_services.block) : Mockup.M.Block_services.block_header tzresult Lwt.t = locate_block state block >>=? fun x -> + let shell = may_lie_on_proto_level block x in return { Mockup.M.Block_services.hash = x.rpc_context.block_hash; chain_id; - shell = x.rpc_context.block_header; + shell; protocol_data = x.protocol_data; } -- GitLab From 5ad750b795231cb36c54bbea904398e6fe9a34da Mon Sep 17 00:00:00 2001 From: vbot Date: Tue, 31 Jan 2023 18:28:22 +0100 Subject: [PATCH 08/14] Alpha/Baker: add preendorsement on valid block test --- .../lib_delegate/test/test_scenario.ml | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/src/proto_alpha/lib_delegate/test/test_scenario.ml b/src/proto_alpha/lib_delegate/test/test_scenario.ml index afc642bf9457..693b84bdda01 100644 --- a/src/proto_alpha/lib_delegate/test/test_scenario.ml +++ b/src/proto_alpha/lib_delegate/test/test_scenario.ml @@ -54,6 +54,65 @@ let test_level_5 () = in run ~config [(3, (module Hooks)); (2, (module Hooks))] +let test_preendorse_on_valid () = + let level_to_reach = 2l in + let round_to_reach = 1l in + let module Hooks : Hooks = struct + include Default_hooks + + let on_new_head ~block_hash ~block_header = + (* Stop notifying heads on the level to reach, only notify that + it has been validated *) + if block_header.Block_header.shell.level < level_to_reach then + Lwt.return_some (block_hash, block_header) + else Lwt.return_none + + let seen_candidate = ref None + + let pqc_noticed = ref false + + let stop_on_event = function + | Baking_state.Prequorum_reached (candidate, _) -> + (* Register the PQC notice. *) + (match !seen_candidate with + | Some seen_candidate + when Block_hash.(candidate.hash = seen_candidate) -> + pqc_noticed := true + | _ -> ()) ; + false + | Baking_state.Quorum_reached (candidate, _) -> + (* Ensure that we never see a QC on the seen candidate. *) + (match !seen_candidate with + | Some seen_candidate + when Block_hash.(candidate.hash = seen_candidate) -> + Stdlib.failwith "Quorum occured on the seen candidate" + | _ -> ()) ; + false + | New_head_proposal {block; _} -> + (* Ensure that we never notice a new head at the level where + we are not supposed to. *) + if block.shell.level = level_to_reach then + Stdlib.failwith "Unexpected new head event" + else false + | New_valid_proposal {block; _} -> + (* Register the seen valid proposal candidate. *) + if + block.shell.level = level_to_reach + && Protocol.Alpha_context.Round.to_int32 block.round = 0l + then seen_candidate := Some block.hash ; + (* Stop the node when we reach level 2 / round 2. *) + block.shell.level = level_to_reach + && Protocol.Alpha_context.Round.to_int32 block.round >= round_to_reach + | _ -> false + + let check_chain_on_success ~chain:_ = + assert (!seen_candidate <> None) ; + assert !pqc_noticed ; + return_unit + end in + let config = {default_config with timeout = 10} in + run ~config [(1, (module Hooks))] + (* Scenario T1 @@ -1470,6 +1529,7 @@ let tests = let open Tezos_base_test_helpers.Tztest in [ tztest "reaches level 5" `Quick test_level_5; + tztest "cannot progress without new head" `Quick test_preendorse_on_valid; tztest "scenario t1" `Quick test_scenario_t1; tztest "scenario t2" `Quick test_scenario_t2; tztest "scenario t3" `Quick test_scenario_t3; -- GitLab From 4c60cb376378de2be36d25927f4637946e8a7063 Mon Sep 17 00:00:00 2001 From: vbot Date: Thu, 2 Feb 2023 17:10:36 +0100 Subject: [PATCH 09/14] Alpha/Baker: refactor operations injection --- .../lib_delegate/baking_actions.ml | 29 +++++++------------ src/proto_alpha/lib_delegate/baking_events.ml | 11 +++++++ src/proto_alpha/lib_delegate/node_rpc.ml | 6 ++++ src/proto_alpha/lib_delegate/node_rpc.mli | 10 +++++++ 4 files changed, 38 insertions(+), 18 deletions(-) diff --git a/src/proto_alpha/lib_delegate/baking_actions.ml b/src/proto_alpha/lib_delegate/baking_actions.ml index a1ed13063f1d..b81bd7198d14 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.ml +++ b/src/proto_alpha/lib_delegate/baking_actions.ml @@ -430,18 +430,12 @@ let inject_preendorsements state ~preendorsements = (* TODO: add a RPC to inject multiple operations *) List.iter_ep (fun (delegate, operation) -> - let encoded_op = - Data_encoding.Binary.to_bytes_exn Operation.encoding operation - in protect ~on_error:(fun err -> Events.(emit failed_to_inject_preendorsement (delegate, err)) >>= fun () -> return_unit) (fun () -> - Shell_services.Injection.operation - cctxt - ~chain:(`Hash chain_id) - encoded_op + Node_rpc.inject_operation cctxt ~chain:(`Hash chain_id) operation >>=? fun oph -> Events.(emit preendorsement_injected (oph, delegate)) >>= fun () -> return_unit)) @@ -558,17 +552,16 @@ let inject_endorsements state ~endorsements = sign_endorsements state endorsements >>=? fun signed_operations -> (* TODO: add a RPC to inject multiple operations *) List.iter_ep - (fun (delegate, signed_operation) -> - let encoded_op = - Data_encoding.Binary.to_bytes_exn Operation.encoding signed_operation - in - Shell_services.Injection.operation - cctxt - ~chain:(`Hash chain_id) - encoded_op - >>=? fun oph -> - Events.(emit endorsement_injected (oph, delegate)) >>= fun () -> - return_unit) + (fun (delegate, operation) -> + protect + ~on_error:(fun err -> + Events.(emit failed_to_inject_endorsement (delegate, err)) + >>= fun () -> return_unit) + (fun () -> + Node_rpc.inject_operation cctxt ~chain:(`Hash chain_id) operation + >>=? fun oph -> + Events.(emit endorsement_injected (oph, delegate)) >>= fun () -> + return_unit)) signed_operations let inject_dal_attestations state attestations = diff --git a/src/proto_alpha/lib_delegate/baking_events.ml b/src/proto_alpha/lib_delegate/baking_events.ml index c1e32c25b2bc..70e6ccead56d 100644 --- a/src/proto_alpha/lib_delegate/baking_events.ml +++ b/src/proto_alpha/lib_delegate/baking_events.ml @@ -613,6 +613,17 @@ module Actions = struct ~pp2:Error_monad.pp_print_trace ("trace", Error_monad.trace_encoding) + let failed_to_inject_endorsement = + declare_2 + ~section + ~name:"failed_to_inject_endorsement" + ~level:Error + ~msg:"failed to inject endorsement for {delegate} -- {trace}" + ~pp1:Baking_state.pp_consensus_key_and_delegate + ("delegate", Baking_state.consensus_key_and_delegate_encoding) + ~pp2:Error_monad.pp_print_trace + ("trace", Error_monad.trace_encoding) + let potential_double_baking = declare_2 ~section diff --git a/src/proto_alpha/lib_delegate/node_rpc.ml b/src/proto_alpha/lib_delegate/node_rpc.ml index 3fcf46abba59..c3d83ee70557 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.ml +++ b/src/proto_alpha/lib_delegate/node_rpc.ml @@ -41,6 +41,12 @@ let inject_block cctxt ?(force = false) ~chain signed_block_header operations = signed_shell_header_bytes operations +let inject_operation cctxt ~chain operation = + let encoded_op = + Data_encoding.Binary.to_bytes_exn Operation.encoding operation + in + Shell_services.Injection.operation cctxt ~chain encoded_op + let preapply_block cctxt ~chain ~head ~timestamp ~protocol_data operations = Block_services.Helpers.Preapply.block cctxt diff --git a/src/proto_alpha/lib_delegate/node_rpc.mli b/src/proto_alpha/lib_delegate/node_rpc.mli index e77ae97d4142..973fcf5322b2 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.mli +++ b/src/proto_alpha/lib_delegate/node_rpc.mli @@ -39,6 +39,16 @@ val inject_block : Tezos_base.Operation.t list list -> Block_hash.t tzresult Lwt.t +(** Inject an operation. + + @return operation hash of the newly injected operation +*) +val inject_operation : + #Protocol_client_context.full -> + chain:Shell_services.chain -> + packed_operation -> + Operation_hash.t tzresult Lwt.t + (** Preapply a block using the node validation mechanism.*) val preapply_block : #Protocol_client_context.full -> -- GitLab From a5cf3170e5c6d46a3d18d577f11b170d161bb91f Mon Sep 17 00:00:00 2001 From: vbot Date: Fri, 3 Feb 2023 10:21:41 +0100 Subject: [PATCH 10/14] Alpha/Baker: make operation and block injection async and forcing --- src/proto_alpha/lib_delegate/node_rpc.ml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/proto_alpha/lib_delegate/node_rpc.ml b/src/proto_alpha/lib_delegate/node_rpc.ml index c3d83ee70557..3aead5b36d4c 100644 --- a/src/proto_alpha/lib_delegate/node_rpc.ml +++ b/src/proto_alpha/lib_delegate/node_rpc.ml @@ -35,6 +35,7 @@ let inject_block cctxt ?(force = false) ~chain signed_block_header operations = Data_encoding.Binary.to_bytes_exn Block_header.encoding signed_block_header in Shell_services.Injection.block + ~async:true cctxt ~chain ~force @@ -45,7 +46,10 @@ let inject_operation cctxt ~chain operation = let encoded_op = Data_encoding.Binary.to_bytes_exn Operation.encoding operation in - Shell_services.Injection.operation cctxt ~chain encoded_op + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4875 + `Shell_services.Injection.operation` should be used instead once + the needed changes in the protocol are in place. *) + Shell_services.Injection.private_operation cctxt ~async:true ~chain encoded_op let preapply_block cctxt ~chain ~head ~timestamp ~protocol_data operations = Block_services.Helpers.Preapply.block -- GitLab From fb6d50b736b9ee0869aef548319fae446b602d47 Mon Sep 17 00:00:00 2001 From: vbot Date: Fri, 3 Feb 2023 11:55:02 +0100 Subject: [PATCH 11/14] Alpha/Baker: fix a concurrency bug canceling the PQC watch --- .../lib_delegate/operation_worker.ml | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/proto_alpha/lib_delegate/operation_worker.ml b/src/proto_alpha/lib_delegate/operation_worker.ml index 9f54d826da64..1568bd5bb901 100644 --- a/src/proto_alpha/lib_delegate/operation_worker.ml +++ b/src/proto_alpha/lib_delegate/operation_worker.ml @@ -247,6 +247,21 @@ let is_valid_consensus_content (candidate : candidate) consensus_content = let cancel_monitoring state = state.proposal_watched <- None +let reset_monitoring state = + Lwt_mutex.with_lock state.lock @@ fun () -> + match state.proposal_watched with + | None -> Lwt.return_unit + | Some (Pqc_watch pqc_watched) -> + pqc_watched.current_voting_power <- 0 ; + pqc_watched.preendorsements_count <- 0 ; + pqc_watched.preendorsements_received <- [] ; + Lwt.return_unit + | Some (Qc_watch qc_watched) -> + qc_watched.current_voting_power <- 0 ; + qc_watched.endorsements_count <- 0 ; + qc_watched.endorsements_received <- [] ; + Lwt.return_unit + let update_monitoring ?(should_lock = true) state ops = (if should_lock then Lwt_mutex.with_lock state.lock else fun f -> f ()) @@ fun () -> @@ -484,11 +499,10 @@ let create ?(monitor_node_operations = true) Lwt_stream.get operation_stream >>= function | None -> (* When the stream closes, it means a new head has been set, - we cancel the monitoring and flush current operations *) + we reset the monitoring and flush current operations *) Events.(emit end_of_stream ()) >>= fun () -> op_stream_stopper () ; - cancel_monitoring state ; - worker_loop () + reset_monitoring state >>= fun () -> worker_loop () | Some ops -> state.operation_pool <- Operation_pool.add_operations state.operation_pool ops ; -- GitLab From efb032186d9b510d72f54ba176587c72f31eaff2 Mon Sep 17 00:00:00 2001 From: vbot Date: Tue, 14 Feb 2023 14:50:41 +0100 Subject: [PATCH 12/14] Alpha/Baker: register preendorsements and reinject after application --- .../lib_delegate/baking_actions.ml | 28 +++++++++++++++++-- .../lib_delegate/baking_actions.mli | 3 +- src/proto_alpha/lib_delegate/baking_lib.ml | 1 + .../lib_delegate/baking_scheduling.ml | 1 + src/proto_alpha/lib_delegate/baking_state.ml | 10 +++++-- src/proto_alpha/lib_delegate/baking_state.mli | 1 + .../lib_delegate/state_transitions.ml | 21 ++++++++++++-- .../lib_delegate/state_transitions.mli | 2 -- 8 files changed, 56 insertions(+), 11 deletions(-) diff --git a/src/proto_alpha/lib_delegate/baking_actions.ml b/src/proto_alpha/lib_delegate/baking_actions.ml index b81bd7198d14..07cefe422396 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.ml +++ b/src/proto_alpha/lib_delegate/baking_actions.ml @@ -128,6 +128,7 @@ type action = | Inject_preendorsements of { preendorsements : (consensus_key_and_delegate * consensus_content) list; } + | Reinject_preendorsements of {preendorsements : packed_operation list} | Inject_endorsements of { endorsements : (consensus_key_and_delegate * consensus_content) list; } @@ -159,6 +160,7 @@ let pp_action fmt = function | Update_to_level _ -> Format.fprintf fmt "update to level" | Synchronize_round _ -> Format.fprintf fmt "synchronize round" | Watch_proposal -> Format.fprintf fmt "watch proposal" + | Reinject_preendorsements _ -> Format.fprintf fmt "reinject preendorsements" let generate_seed_nonce_hash config delegate level = if level.Level.expected_commitment then @@ -440,6 +442,16 @@ let inject_preendorsements state ~preendorsements = Events.(emit preendorsement_injected (oph, delegate)) >>= fun () -> return_unit)) signed_operations + >>=? fun () -> + (* Hackish way of registering injected preendorsements *) + let endorsements = List.map snd signed_operations in + if endorsements = [] then return state + else + let new_level_state = + {state.level_state with injected_preendorsements = Some endorsements} + in + let new_state = {state with level_state = new_level_state} in + return new_state let sign_endorsements state endorsements = let cctxt = state.global_state.cctxt in @@ -738,6 +750,16 @@ let update_to_level state level_update = compute_new_state ~current_round ~delegate_slots ~next_level_delegate_slots >>= return +let reinject_preendorsements state preendorsements = + let cctxt = state.global_state.cctxt in + let chain = `Hash state.global_state.chain_id in + List.iter_p + (fun operation -> + Node_rpc.inject_operation cctxt ~chain operation >>= fun _res -> + (* Ignore errors for now *) + Lwt.return_unit) + preendorsements + let synchronize_round state {new_round_proposal; handle_proposal} = Events.(emit synchronizing_round new_round_proposal.predecessor.hash) >>= fun () -> @@ -770,8 +792,8 @@ let rec perform_action ~state_recorder state (action : action) = | Inject_block {block_to_bake; updated_state} -> inject_block state ~state_recorder block_to_bake ~updated_state | Inject_preendorsements {preendorsements} -> - inject_preendorsements state ~preendorsements >>=? fun () -> - perform_action ~state_recorder state Watch_proposal + inject_preendorsements state ~preendorsements >>=? fun new_state -> + perform_action ~state_recorder new_state Watch_proposal | Inject_endorsements {endorsements} -> state_recorder ~new_state:state >>=? fun () -> inject_endorsements state ~endorsements >>=? fun () -> @@ -793,3 +815,5 @@ let rec perform_action ~state_recorder state (action : action) = (* We wait for preendorsements to trigger the [Prequorum_reached] event *) start_waiting_for_preendorsement_quorum state >>= fun () -> return state + | Reinject_preendorsements {preendorsements} -> + reinject_preendorsements state preendorsements >>= fun () -> return state diff --git a/src/proto_alpha/lib_delegate/baking_actions.mli b/src/proto_alpha/lib_delegate/baking_actions.mli index 360f1f761e27..e348430f39cd 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.mli +++ b/src/proto_alpha/lib_delegate/baking_actions.mli @@ -54,6 +54,7 @@ type action = | Inject_preendorsements of { preendorsements : (consensus_key_and_delegate * consensus_content) list; } + | Reinject_preendorsements of {preendorsements : packed_operation list} | Inject_endorsements of { endorsements : (consensus_key_and_delegate * consensus_content) list; } @@ -93,7 +94,7 @@ val inject_block : val inject_preendorsements : state -> preendorsements:(consensus_key_and_delegate * consensus_content) list -> - unit tzresult Lwt.t + state tzresult Lwt.t val sign_endorsements : state -> diff --git a/src/proto_alpha/lib_delegate/baking_lib.ml b/src/proto_alpha/lib_delegate/baking_lib.ml index 070bf264c8f8..82c1753daab6 100644 --- a/src/proto_alpha/lib_delegate/baking_lib.ml +++ b/src/proto_alpha/lib_delegate/baking_lib.ml @@ -88,6 +88,7 @@ let preendorse (cctxt : Protocol_client_context.full) ?(force = false) delegates (List.map fst consensus_list) in Baking_actions.inject_preendorsements state ~preendorsements:consensus_list + >>=? fun (_ignored_state : state) -> return_unit let endorse (cctxt : Protocol_client_context.full) ?(force = false) delegates = let open State_transitions in diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index 6613d073dac2..bab77e256bdc 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -693,6 +693,7 @@ let create_initial_state cctxt ?(synchronize = true) ~chain config is_latest_proposal_applied = true (* this proposal is expected to be the current head *); delayed_prequorum = None; + injected_preendorsements = None; locked_round = None; endorsable_payload = None; elected_block; diff --git a/src/proto_alpha/lib_delegate/baking_state.ml b/src/proto_alpha/lib_delegate/baking_state.ml index 0976fcef75ec..e8472e48cc12 100644 --- a/src/proto_alpha/lib_delegate/baking_state.ml +++ b/src/proto_alpha/lib_delegate/baking_state.ml @@ -283,6 +283,7 @@ type level_state = { is_latest_proposal_applied : bool; delayed_prequorum : (Operation_worker.candidate * Kind.preendorsement operation list) option; + injected_preendorsements : packed_operation list option; (* Last proposal received where we injected an endorsement (thus we have seen 2f+1 preendorsements) *) locked_round : locked_round option; @@ -805,6 +806,7 @@ let pp_level_state fmt latest_proposal; is_latest_proposal_applied; delayed_prequorum; + injected_preendorsements; locked_round; endorsable_payload; elected_block; @@ -815,12 +817,14 @@ let pp_level_state fmt Format.fprintf fmt "@[Level state:@ current level: %ld@ @[proposal (applied:%b, \ - delayed prequorum:%b):@ %a@]@ locked round: %a@ endorsable payload: %a@ \ - elected block: %a@ @[own delegate slots:@ %a@]@ @[next level \ - own delegate slots:@ %a@]@ next level proposed round: %a@]" + delayed prequorum:%b, injected preendorsements: %d):@ %a@]@ locked round: \ + %a@ endorsable payload: %a@ elected block: %a@ @[own delegate \ + slots:@ %a@]@ @[next level own delegate slots:@ %a@]@ next level \ + proposed round: %a@]" current_level is_latest_proposal_applied (Option.is_some delayed_prequorum) + (match injected_preendorsements with None -> 0 | Some l -> List.length l) pp_proposal latest_proposal (pp_option pp_locked_round) diff --git a/src/proto_alpha/lib_delegate/baking_state.mli b/src/proto_alpha/lib_delegate/baking_state.mli index f8e1d779d6c2..59a17685b8a4 100644 --- a/src/proto_alpha/lib_delegate/baking_state.mli +++ b/src/proto_alpha/lib_delegate/baking_state.mli @@ -131,6 +131,7 @@ type level_state = { is_latest_proposal_applied : bool; delayed_prequorum : (Operation_worker.candidate * Kind.preendorsement operation list) option; + injected_preendorsements : packed_operation list option; locked_round : locked_round option; endorsable_payload : endorsable_payload option; elected_block : elected_block option; diff --git a/src/proto_alpha/lib_delegate/state_transitions.ml b/src/proto_alpha/lib_delegate/state_transitions.ml index 79640027e695..ab8f6d35d7ba 100644 --- a/src/proto_alpha/lib_delegate/state_transitions.ml +++ b/src/proto_alpha/lib_delegate/state_transitions.ml @@ -319,6 +319,7 @@ let rec handle_proposal ~is_proposal_applied state (new_proposal : proposal) = latest_proposal = new_proposal; is_latest_proposal_applied = is_proposal_applied; delayed_prequorum = None; + injected_preendorsements = None; (* Unlock values *) locked_round = None; endorsable_payload = None; @@ -756,10 +757,24 @@ let handle_expected_applied_proposal (state : Baking_state.t) = in let new_state = {state with level_state = new_level_state} in match new_state.level_state.delayed_prequorum with - | None -> - (* The application arrived before the prequorum: just wait for the prequorum. *) + | None -> ( + (* The application arrived before the prequorum: wait for the prequorum. *) let new_state = update_current_phase new_state Awaiting_preendorsements in - do_nothing new_state + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4877 This + mechanism is only temporary and should be removed when the + protocol and prevalidator correctly accept early + preendorsements. *) + match new_state.level_state.injected_preendorsements with + | None -> do_nothing new_state + | Some preendorsements -> + let reinject_preendorsement_action = + Reinject_preendorsements {preendorsements} + in + let new_level_state = + {new_state.level_state with injected_preendorsements = None} + in + let new_state = {new_state with level_state = new_level_state} in + Lwt.return (new_state, reinject_preendorsement_action)) | Some (candidate, preendorsement_qc) -> (* The application arrived after the prequorum: handle the prequorum received earlier. *) diff --git a/src/proto_alpha/lib_delegate/state_transitions.mli b/src/proto_alpha/lib_delegate/state_transitions.mli index 596efad2100f..2fa02e3b2d8a 100644 --- a/src/proto_alpha/lib_delegate/state_transitions.mli +++ b/src/proto_alpha/lib_delegate/state_transitions.mli @@ -41,8 +41,6 @@ val is_acceptable_proposal_for_current_level : val make_consensus_list : state -> proposal -> (consensus_key_and_delegate * consensus_content) list -val make_preendorse_action : state -> proposal -> action - val may_update_proposal : is_proposal_applied:bool -> state -> proposal -> state Lwt.t -- GitLab From c08ae3826eeb6e9580f7f898361d8bb9b0ceac94 Mon Sep 17 00:00:00 2001 From: vbot Date: Thu, 26 Jan 2023 17:03:30 +0100 Subject: [PATCH 13/14] Mumbai/Baker: propagate changes --- src/proto_016_PtMumbai/lib_client/mockup.ml | 2 +- .../lib_delegate/baking_actions.ml | 72 ++-- .../lib_delegate/baking_actions.mli | 3 +- .../lib_delegate/baking_events.ml | 105 +++++- .../lib_delegate/baking_lib.ml | 30 +- .../lib_delegate/baking_nonces.ml | 2 +- .../lib_delegate/baking_scheduling.ml | 146 +++++--- .../lib_delegate/baking_scheduling.mli | 5 +- .../lib_delegate/baking_simulator.ml | 25 +- .../lib_delegate/baking_simulator.mli | 1 + .../lib_delegate/baking_state.ml | 193 +++++----- .../lib_delegate/baking_state.mli | 38 +- .../lib_delegate/block_forge.ml | 47 ++- .../lib_delegate/block_forge.mli | 2 + .../lib_delegate/node_rpc.ml | 331 +++++++++++------- .../lib_delegate/node_rpc.mli | 30 +- .../lib_delegate/operation_worker.ml | 38 +- .../lib_delegate/operation_worker.mli | 7 +- .../lib_delegate/state_transitions.ml | 237 ++++++++++--- .../lib_delegate/state_transitions.mli | 8 +- .../test/mockup_simulator/faked_services.ml | 61 +++- .../test/mockup_simulator/mockup_simulator.ml | 88 ++++- .../mockup_simulator/mockup_simulator.mli | 9 + .../lib_delegate/test/test_scenario.ml | 78 ++++- 24 files changed, 1089 insertions(+), 469 deletions(-) diff --git a/src/proto_016_PtMumbai/lib_client/mockup.ml b/src/proto_016_PtMumbai/lib_client/mockup.ml index 93e2c2690c0d..e82f935fb341 100644 --- a/src/proto_016_PtMumbai/lib_client/mockup.ml +++ b/src/proto_016_PtMumbai/lib_client/mockup.ml @@ -257,7 +257,7 @@ module Forge = struct timestamp; fitness; operations_hash; - proto_level = 0; + proto_level = 1; validation_passes = 0; context = Context_hash.zero; } diff --git a/src/proto_016_PtMumbai/lib_delegate/baking_actions.ml b/src/proto_016_PtMumbai/lib_delegate/baking_actions.ml index b24a54100308..7eb0bc6dd90d 100644 --- a/src/proto_016_PtMumbai/lib_delegate/baking_actions.ml +++ b/src/proto_016_PtMumbai/lib_delegate/baking_actions.ml @@ -128,6 +128,7 @@ type action = | Inject_preendorsements of { preendorsements : (consensus_key_and_delegate * consensus_content) list; } + | Reinject_preendorsements of {preendorsements : packed_operation list} | Inject_endorsements of { endorsements : (consensus_key_and_delegate * consensus_content) list; } @@ -159,6 +160,7 @@ let pp_action fmt = function | Update_to_level _ -> Format.fprintf fmt "update to level" | Synchronize_round _ -> Format.fprintf fmt "synchronize round" | Watch_proposal -> Format.fprintf fmt "watch proposal" + | Reinject_preendorsements _ -> Format.fprintf fmt "reinject preendorsements" let generate_seed_nonce_hash config delegate level = if level.Level.expected_commitment then @@ -215,6 +217,7 @@ let sign_block_header state proposer unsigned_block_header = return {Block_header.shell; protocol_data = {contents; signature}} let inject_block ~state_recorder state block_to_bake ~updated_state = + let open Lwt_result_syntax in let { predecessor; round; @@ -308,10 +311,24 @@ let inject_block ~state_recorder state block_to_bake ~updated_state = in Events.(emit vote_for_liquidity_baking_toggle) liquidity_baking_toggle_vote >>= fun () -> + let chain = `Hash state.global_state.chain_id in + let pred_block = `Hash (predecessor.hash, 0) in + let* pred_resulting_context_hash = + Shell_services.Blocks.resulting_context_hash + cctxt + ~chain + ~block:pred_block + () + in + let* pred_live_blocks = + Chain_services.Blocks.live_blocks cctxt ~chain ~block:pred_block () + in Block_forge.forge cctxt ~chain_id ~pred_info:predecessor + ~pred_live_blocks + ~pred_resulting_context_hash ~timestamp ~round ~seed_nonce_hash @@ -415,22 +432,26 @@ let inject_preendorsements state ~preendorsements = (* TODO: add a RPC to inject multiple operations *) List.iter_ep (fun (delegate, operation) -> - let encoded_op = - Data_encoding.Binary.to_bytes_exn Operation.encoding operation - in protect ~on_error:(fun err -> Events.(emit failed_to_inject_preendorsement (delegate, err)) >>= fun () -> return_unit) (fun () -> - Shell_services.Injection.operation - cctxt - ~chain:(`Hash chain_id) - encoded_op + Node_rpc.inject_operation cctxt ~chain:(`Hash chain_id) operation >>=? fun oph -> Events.(emit preendorsement_injected (oph, delegate)) >>= fun () -> return_unit)) signed_operations + >>=? fun () -> + (* Hackish way of registering injected preendorsements *) + let endorsements = List.map snd signed_operations in + if endorsements = [] then return state + else + let new_level_state = + {state.level_state with injected_preendorsements = Some endorsements} + in + let new_state = {state with level_state = new_level_state} in + return new_state let sign_endorsements state endorsements = let cctxt = state.global_state.cctxt in @@ -503,17 +524,16 @@ let inject_endorsements state ~endorsements = sign_endorsements state endorsements >>=? fun signed_operations -> (* TODO: add a RPC to inject multiple operations *) List.iter_ep - (fun (delegate, signed_operation) -> - let encoded_op = - Data_encoding.Binary.to_bytes_exn Operation.encoding signed_operation - in - Shell_services.Injection.operation - cctxt - ~chain:(`Hash chain_id) - encoded_op - >>=? fun oph -> - Events.(emit endorsement_injected (oph, delegate)) >>= fun () -> - return_unit) + (fun (delegate, operation) -> + protect + ~on_error:(fun err -> + Events.(emit failed_to_inject_endorsement (delegate, err)) + >>= fun () -> return_unit) + (fun () -> + Node_rpc.inject_operation cctxt ~chain:(`Hash chain_id) operation + >>=? fun oph -> + Events.(emit endorsement_injected (oph, delegate)) >>= fun () -> + return_unit)) signed_operations let prepare_waiting_for_quorum state = @@ -601,6 +621,16 @@ let update_to_level state level_update = compute_new_state ~current_round ~delegate_slots ~next_level_delegate_slots >>= return +let reinject_preendorsements state preendorsements = + let cctxt = state.global_state.cctxt in + let chain = `Hash state.global_state.chain_id in + List.iter_p + (fun operation -> + Node_rpc.inject_operation cctxt ~chain operation >>= fun _res -> + (* Ignore errors for now *) + Lwt.return_unit) + preendorsements + let synchronize_round state {new_round_proposal; handle_proposal} = Events.(emit synchronizing_round new_round_proposal.predecessor.hash) >>= fun () -> @@ -633,8 +663,8 @@ let rec perform_action ~state_recorder state (action : action) = | Inject_block {block_to_bake; updated_state} -> inject_block state ~state_recorder block_to_bake ~updated_state | Inject_preendorsements {preendorsements} -> - inject_preendorsements state ~preendorsements >>=? fun () -> - perform_action ~state_recorder state Watch_proposal + inject_preendorsements state ~preendorsements >>=? fun new_state -> + perform_action ~state_recorder new_state Watch_proposal | Inject_endorsements {endorsements} -> state_recorder ~new_state:state >>=? fun () -> inject_endorsements state ~endorsements >>=? fun () -> @@ -651,3 +681,5 @@ let rec perform_action ~state_recorder state (action : action) = (* We wait for preendorsements to trigger the [Prequorum_reached] event *) start_waiting_for_preendorsement_quorum state >>= fun () -> return state + | Reinject_preendorsements {preendorsements} -> + reinject_preendorsements state preendorsements >>= fun () -> return state diff --git a/src/proto_016_PtMumbai/lib_delegate/baking_actions.mli b/src/proto_016_PtMumbai/lib_delegate/baking_actions.mli index 5b9715905497..d9598b3b9796 100644 --- a/src/proto_016_PtMumbai/lib_delegate/baking_actions.mli +++ b/src/proto_016_PtMumbai/lib_delegate/baking_actions.mli @@ -54,6 +54,7 @@ type action = | Inject_preendorsements of { preendorsements : (consensus_key_and_delegate * consensus_content) list; } + | Reinject_preendorsements of {preendorsements : packed_operation list} | Inject_endorsements of { endorsements : (consensus_key_and_delegate * consensus_content) list; } @@ -93,7 +94,7 @@ val inject_block : val inject_preendorsements : state -> preendorsements:(consensus_key_and_delegate * consensus_content) list -> - unit tzresult Lwt.t + state tzresult Lwt.t val sign_endorsements : state -> diff --git a/src/proto_016_PtMumbai/lib_delegate/baking_events.ml b/src/proto_016_PtMumbai/lib_delegate/baking_events.ml index 9c2af301a0a0..cbc69820a829 100644 --- a/src/proto_016_PtMumbai/lib_delegate/baking_events.ml +++ b/src/proto_016_PtMumbai/lib_delegate/baking_events.ml @@ -37,6 +37,19 @@ module State_transitions = struct let section = section @ ["transitions"] + let new_valid_proposal = + declare_3 + ~section + ~name:"new_valid_proposal" + ~level:Notice + ~msg:"received new proposal {block} at level {level}, round {round}" + ~pp1:Block_hash.pp + ("block", Block_hash.encoding) + ~pp2:pp_int32 + ("level", Data_encoding.int32) + ~pp3:Round.pp + ("round", Round.encoding) + let new_head = declare_3 ~section @@ -98,6 +111,63 @@ module State_transitions = struct ~msg:"received new head while waiting for a quorum" () + let applied_expected_proposal_received = + declare_1 + ~section + ~name:"applied_expected_proposal_received" + ~level:Info + ~msg:"received the expected application notice for {proposal}" + ~pp1:Block_hash.pp + ("proposal", Block_hash.encoding) + + let unexpected_new_head_while_waiting_for_application = + declare_0 + ~section + ~name:"unexpected_new_head_while_waiting_for_application" + ~level:Info + ~msg:"received new head while waiting for another proposal's application" + () + + let new_valid_proposal_while_waiting_for_qc = + declare_0 + ~section + ~name:"new_valid_proposal_while_waiting_for_qc" + ~level:Info + ~msg:"received new valid proposal while waiting for a quorum" + () + + let valid_proposal_received_after_application = + declare_0 + ~section + ~name:"valid_proposal_received_after_application" + ~level:Info + ~msg:"received valid proposal for a block already applied" + () + + let unexpected_pqc_while_waiting_for_application = + declare_2 + ~section + ~name:"unexpected_pqc_while_waiting_for_application" + ~level:Info + ~msg: + "received an unexpected prequorum for {prequorum} while waiting for \ + the proposal's {proposal} application" + ~pp1:Block_hash.pp + ("prequorum", Block_hash.encoding) + ~pp2:Block_hash.pp + ("proposal", Block_hash.encoding) + + let pqc_while_waiting_for_application = + declare_1 + ~section + ~name:"pqc_while_waiting_for_application" + ~level:Info + ~msg: + "received expected prequorum for {prequorum} while waiting for the \ + proposal's application" + ~pp1:Block_hash.pp + ("prequorum", Block_hash.encoding) + let unexpected_proposal_round = declare_2 ~section @@ -289,6 +359,14 @@ module State_transitions = struct ~pp2:Block_hash.pp ("expected_hash", Block_hash.encoding) + let handling_prequorum_on_non_applied_proposal = + declare_0 + ~section + ~name:"handling_prequorum_on_non_applied_proposal" + ~level:Error + ~msg:"Handling prequorum on a non-applied proposal" + () + let step_current_phase = declare_2 ~section @@ -315,16 +393,14 @@ module Node_rpc = struct ~pp1:Error_monad.pp_print_trace ("trace", Error_monad.trace_encoding) - let raw_info = - declare_2 + let error_while_monitoring_valid_proposals = + declare_1 ~section - ~name:"raw_info" - ~level:Debug - ~msg:"raw info for {block_hash} at level {level}" - ~pp1:Block_hash.pp - ("block_hash", Block_hash.encoding) - ~pp2:pp_int32 - ("level", Data_encoding.int32) + ~name:"error_while_monitoring_valid_proposals" + ~level:Error + ~msg:"error while monitoring valid proposals {trace}" + ~pp1:Error_monad.pp_print_trace + ("trace", Error_monad.trace_encoding) end module Scheduling = struct @@ -526,6 +602,17 @@ module Actions = struct ~pp2:Error_monad.pp_print_trace ("trace", Error_monad.trace_encoding) + let failed_to_inject_endorsement = + declare_2 + ~section + ~name:"failed_to_inject_endorsement" + ~level:Error + ~msg:"failed to inject endorsement for {delegate} -- {trace}" + ~pp1:Baking_state.pp_consensus_key_and_delegate + ("delegate", Baking_state.consensus_key_and_delegate_encoding) + ~pp2:Error_monad.pp_print_trace + ("trace", Error_monad.trace_encoding) + let potential_double_baking = declare_2 ~section diff --git a/src/proto_016_PtMumbai/lib_delegate/baking_lib.ml b/src/proto_016_PtMumbai/lib_delegate/baking_lib.ml index ebb2cd35fc0c..383e0f867739 100644 --- a/src/proto_016_PtMumbai/lib_delegate/baking_lib.ml +++ b/src/proto_016_PtMumbai/lib_delegate/baking_lib.ml @@ -44,10 +44,10 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config ~current_proposal delegates -let get_current_proposal cctxt = +let get_current_proposal cctxt ?cache () = let open Lwt_result_syntax in let* block_stream, _block_stream_stopper = - Node_rpc.monitor_proposals cctxt ~chain:cctxt#chain () + Node_rpc.monitor_heads cctxt ?cache ~chain:cctxt#chain () in Lwt_stream.peek block_stream >>= function | Some current_head -> return (block_stream, current_head) @@ -59,7 +59,8 @@ let preendorse (cctxt : Protocol_client_context.full) ?(force = false) delegates = let open State_transitions in let open Lwt_result_syntax in - let* _, current_proposal = get_current_proposal cctxt in + let cache = Baking_cache.Block_cache.create 10 in + let* _, current_proposal = get_current_proposal cctxt ~cache () in let config = Baking_configuration.make ~force () in let* state = create_state cctxt ~config ~current_proposal delegates in let proposal = state.level_state.latest_proposal in @@ -87,11 +88,13 @@ let preendorse (cctxt : Protocol_client_context.full) ?(force = false) delegates (List.map fst consensus_list) in Baking_actions.inject_preendorsements state ~preendorsements:consensus_list + >>=? fun (_ignored_state : state) -> return_unit let endorse (cctxt : Protocol_client_context.full) ?(force = false) delegates = let open State_transitions in let open Lwt_result_syntax in - let* _, current_proposal = get_current_proposal cctxt in + let cache = Baking_cache.Block_cache.create 10 in + let* _, current_proposal = get_current_proposal cctxt ~cache () in let config = Baking_configuration.make ~force () in create_state cctxt ~config ~current_proposal delegates >>=? fun state -> let proposal = state.level_state.latest_proposal in @@ -283,7 +286,8 @@ let propose (cctxt : Protocol_client_context.full) ?minimal_fees ?minimal_nanotez_per_gas_unit ?minimal_nanotez_per_byte ?force_apply ?force ?(minimal_timestamp = false) ?extra_operations ?context_path delegates = let open Lwt_result_syntax in - let* _block_stream, current_proposal = get_current_proposal cctxt in + let cache = Baking_cache.Block_cache.create 10 in + let* _block_stream, current_proposal = get_current_proposal cctxt ~cache () in let config = Baking_configuration.make ?minimal_fees @@ -301,7 +305,7 @@ let propose (cctxt : Protocol_client_context.full) ?minimal_fees | Some _ -> propose_at_next_level ~minimal_timestamp state | None -> ( match endorsement_quorum state with - | Some (voting_power, endorsement_qc) -> + | Some (_voting_power, endorsement_qc) -> let state = { state with @@ -323,8 +327,7 @@ let propose (cctxt : Protocol_client_context.full) ?minimal_fees let* state = State_transitions.step state - (Baking_state.Quorum_reached - (candidate, voting_power, endorsement_qc)) + (Baking_state.Quorum_reached (candidate, endorsement_qc)) >>= do_action (* this will register the elected block *) in @@ -369,18 +372,18 @@ let propose (cctxt : Protocol_client_context.full) ?minimal_fees in return_unit -let bake_using_automaton config state block_stream = +let bake_using_automaton config state heads_stream = let open Lwt_result_syntax in let cctxt = state.global_state.cctxt in let* initial_event = first_automaton_event state in let current_level = state.level_state.latest_proposal.block.shell.level in let loop_state = Baking_scheduling.create_loop_state - block_stream + ~heads_stream state.global_state.operation_worker in let stop_on_next_level_block = function - | New_proposal proposal -> + | New_head_proposal proposal -> Compare.Int32.(proposal.block.shell.level >= Int32.succ current_level) | _ -> false in @@ -392,7 +395,7 @@ let bake_using_automaton config state block_stream = state initial_event >>=? function - | Some (New_proposal proposal) -> + | Some (New_head_proposal proposal) -> let*! () = cctxt#message "Block %a (%ld) injected" @@ -508,7 +511,8 @@ let bake (cctxt : Protocol_client_context.full) ?minimal_fees ?extra_operations () in - let* block_stream, current_proposal = get_current_proposal cctxt in + let cache = Baking_cache.Block_cache.create 10 in + let* block_stream, current_proposal = get_current_proposal cctxt ~cache () in let* state = create_state cctxt diff --git a/src/proto_016_PtMumbai/lib_delegate/baking_nonces.ml b/src/proto_016_PtMumbai/lib_delegate/baking_nonces.ml index fc1c9ada635a..c0c3d4d8a15d 100644 --- a/src/proto_016_PtMumbai/lib_delegate/baking_nonces.ml +++ b/src/proto_016_PtMumbai/lib_delegate/baking_nonces.ml @@ -255,7 +255,7 @@ let reveal_potential_nonces state new_proposal = let new_predecessor_hash = new_proposal.Baking_state.predecessor.hash in if Block_hash.(last_predecessor <> new_predecessor_hash) - && Protocol_hash.(new_proposal.predecessor.protocol = Protocol.hash) + && not (Baking_state.is_first_block_in_protocol new_proposal) then ( (* only try revealing nonces when the proposal's predecessor is a new one *) state.last_predecessor <- new_predecessor_hash ; diff --git a/src/proto_016_PtMumbai/lib_delegate/baking_scheduling.ml b/src/proto_016_PtMumbai/lib_delegate/baking_scheduling.ml index 96350fe3ba3e..3389440a8469 100644 --- a/src/proto_016_PtMumbai/lib_delegate/baking_scheduling.ml +++ b/src/proto_016_PtMumbai/lib_delegate/baking_scheduling.ml @@ -28,33 +28,52 @@ module Events = Baking_events.Scheduling open Baking_state type loop_state = { - block_stream : Baking_state.proposal Lwt_stream.t; + heads_stream : Baking_state.proposal Lwt_stream.t; + get_valid_blocks_stream : Baking_state.proposal Lwt_stream.t Lwt.t; qc_stream : Operation_worker.event Lwt_stream.t; - future_block_stream : proposal Lwt_stream.t; - push_future_block : proposal -> unit; - mutable last_get_head_event : [`New_proposal of proposal option] Lwt.t option; + future_block_stream : + [`New_future_head of proposal | `New_future_valid_proposal of proposal] + Lwt_stream.t; + push_future_block : + [`New_future_head of proposal | `New_future_valid_proposal of proposal] -> + unit; + mutable last_get_head_event : + [`New_head_proposal of proposal option] Lwt.t option; + mutable last_get_valid_block_event : + [`New_valid_proposal of proposal option] Lwt.t option; mutable last_future_block_event : - [`New_future_block of Baking_state.proposal] Lwt.t option; + [`New_future_head of proposal | `New_future_valid_proposal of proposal] + Lwt.t + option; mutable last_get_qc_event : [`QC_reached of Operation_worker.event option] Lwt.t option; } type events = - [ `New_future_block of proposal - | `New_proposal of proposal option + [ `New_future_head of proposal + | `New_future_valid_proposal of proposal + | `New_valid_proposal of proposal option + | `New_head_proposal of proposal option | `QC_reached of Operation_worker.event option | `Termination | `Timeout of timeout_kind ] Lwt.t -let create_loop_state block_stream operation_worker = +let create_loop_state ?get_valid_blocks_stream ~heads_stream operation_worker = let future_block_stream, push_future_block = Lwt_stream.create () in + let get_valid_blocks_stream = + match get_valid_blocks_stream with + | None -> Lwt.return (Lwt_stream.create () |> fst) + | Some vbs_t -> vbs_t + in { - block_stream; + heads_stream; + get_valid_blocks_stream; qc_stream = Operation_worker.get_quorum_event_stream operation_worker; future_block_stream; push_future_block = (fun x -> push_future_block (Some x)); last_get_head_event = None; + last_get_valid_block_event = None; last_future_block_event = None; last_get_qc_event = None; } @@ -111,12 +130,24 @@ let rec wait_next_event ~timeout loop_state = match loop_state.last_get_head_event with | None -> let t = - Lwt_stream.get loop_state.block_stream >|= fun e -> `New_proposal e + Lwt_stream.get loop_state.heads_stream >|= fun e -> + `New_head_proposal e in loop_state.last_get_head_event <- Some t ; t | Some t -> t in + let get_valid_block_event () = + match loop_state.last_get_valid_block_event with + | None -> + let t = + loop_state.get_valid_blocks_stream >>= fun valid_blocks_stream -> + Lwt_stream.get valid_blocks_stream >|= fun e -> `New_valid_proposal e + in + loop_state.last_get_valid_block_event <- Some t ; + t + | Some t -> t + in let get_future_block_event () = (* n.b. we should also consume the available elements in the block_stream before starting baking. *) @@ -127,7 +158,7 @@ let rec wait_next_event ~timeout loop_state = | None -> (* unreachable, we never close the stream *) assert false - | Some proposal -> `New_future_block proposal + | Some future_proposal -> future_proposal in loop_state.last_future_block_event <- Some t ; t @@ -149,6 +180,7 @@ let rec wait_next_event ~timeout loop_state = [ terminated; (get_head_event () :> events); + (get_valid_block_event () :> events); (get_future_block_event () :> events); (get_qc_event () :> events); (timeout :> events); @@ -158,7 +190,11 @@ let rec wait_next_event ~timeout loop_state = | `Termination -> (* Exit the loop *) return_none - | `New_proposal None -> + | `New_valid_proposal None -> + (* Node connection lost *) + loop_state.last_get_valid_block_event <- None ; + fail Baking_errors.Node_connection_lost + | `New_head_proposal None -> (* Node connection lost *) loop_state.last_get_head_event <- None ; fail Baking_errors.Node_connection_lost @@ -166,7 +202,22 @@ let rec wait_next_event ~timeout loop_state = (* Not supposed to happen: exit the loop *) loop_state.last_get_qc_event <- None ; return_none - | `New_proposal (Some proposal) -> ( + | `New_valid_proposal (Some proposal) -> ( + loop_state.last_get_valid_block_event <- None ; + (* Is the block in the future? *) + match sleep_until proposal.block.shell.timestamp with + | Some waiter -> + (* If so, wait until its timestamp is reached before advertising it *) + Events.(emit proposal_in_the_future proposal.block.hash) >>= fun () -> + Lwt.dont_wait + (fun () -> + waiter >>= fun () -> + loop_state.push_future_block (`New_future_valid_proposal proposal) ; + Lwt.return_unit) + (fun _exn -> ()) ; + wait_next_event ~timeout loop_state + | None -> return_some (New_valid_proposal proposal)) + | `New_head_proposal (Some proposal) -> ( loop_state.last_get_head_event <- None ; (* Is the block in the future? *) match sleep_until proposal.block.shell.timestamp with @@ -176,29 +227,30 @@ let rec wait_next_event ~timeout loop_state = Lwt.dont_wait (fun () -> waiter >>= fun () -> - loop_state.push_future_block proposal ; + loop_state.push_future_block (`New_future_head proposal) ; Lwt.return_unit) (fun _exn -> ()) ; wait_next_event ~timeout loop_state - | None -> return_some (New_proposal proposal)) - | `New_future_block proposal -> + | None -> return_some (New_head_proposal proposal)) + | `New_future_head proposal -> + Events.(emit process_proposal_in_the_future proposal.block.hash) + >>= fun () -> + loop_state.last_future_block_event <- None ; + return_some (New_head_proposal proposal) + | `New_future_valid_proposal proposal -> Events.(emit process_proposal_in_the_future proposal.block.hash) >>= fun () -> loop_state.last_future_block_event <- None ; - return_some (New_proposal proposal) + return_some (New_valid_proposal proposal) | `QC_reached - (Some - (Operation_worker.Prequorum_reached - (candidate, voting_power, preendorsement_qc))) -> + (Some (Operation_worker.Prequorum_reached (candidate, preendorsement_qc))) + -> loop_state.last_get_qc_event <- None ; - return_some - (Prequorum_reached (candidate, voting_power, preendorsement_qc)) + return_some (Prequorum_reached (candidate, preendorsement_qc)) | `QC_reached - (Some - (Operation_worker.Quorum_reached - (candidate, voting_power, endorsement_qc))) -> + (Some (Operation_worker.Quorum_reached (candidate, endorsement_qc))) -> loop_state.last_get_qc_event <- None ; - return_some (Quorum_reached (candidate, voting_power, endorsement_qc)) + return_some (Quorum_reached (candidate, endorsement_qc)) | `Timeout e -> return_some (Timeout e) (** From the current [state], the function returns an optional @@ -211,8 +263,7 @@ let compute_next_round_time state = | None -> state.level_state.latest_proposal | Some {proposal; _} -> proposal in - if Protocol_hash.(proposal.predecessor.next_protocol <> Protocol.hash) then - None + if Baking_state.is_first_block_in_protocol proposal then None else match state.level_state.next_level_proposed_round with | Some _proposed_round -> @@ -615,11 +666,7 @@ let create_initial_state cctxt ?(synchronize = true) ~chain config ~chain >>=? fun next_level_delegate_slots -> let elected_block = - if - Protocol_hash.( - current_proposal.block.protocol <> Protocol.hash - && current_proposal.block.next_protocol = Protocol.hash) - then + if Baking_state.is_first_block_in_protocol current_proposal then (* If the last block is a protocol transition, we admit it as a final block *) Some {proposal = current_proposal; endorsement_qc = []} @@ -629,6 +676,10 @@ let create_initial_state cctxt ?(synchronize = true) ~chain config { current_level = current_proposal.block.shell.level; latest_proposal = current_proposal; + is_latest_proposal_applied = + true (* this proposal is expected to be the current head *); + delayed_prequorum = None; + injected_preendorsements = None; locked_round = None; endorsable_payload = None; elected_block; @@ -657,7 +708,7 @@ let compute_bootstrap_event state = = state.round_state.current_round) then (* If so, then trigger the new proposal event to possibly preendorse *) - ok @@ Baking_state.New_proposal state.level_state.latest_proposal + ok @@ Baking_state.New_head_proposal state.level_state.latest_proposal else (* Otherwise, trigger the end of round to check whether we need to propose at this level or not *) @@ -719,11 +770,13 @@ let perform_sanity_check cctxt ~chain_id = let run cctxt ?canceler ?(stop_on_event = fun _ -> false) ?(on_error = fun _ -> return_unit) ~chain config delegates = + let open Lwt_result_syntax in Shell_services.Chain.chain_id cctxt ~chain () >>=? fun chain_id -> perform_sanity_check cctxt ~chain_id >>=? fun () -> - Node_rpc.monitor_proposals cctxt ~chain () - >>=? fun (block_stream, _block_stream_stopper) -> - (Lwt_stream.get block_stream >>= function + let cache = Baking_cache.Block_cache.create 10 in + Node_rpc.monitor_heads cctxt ~cache ~chain () + >>=? fun (heads_stream, _block_stream_stopper) -> + (Lwt_stream.get heads_stream >>= function | Some current_head -> return current_head | None -> failwith "head stream unexpectedly ended") >>=? fun current_proposal -> @@ -742,7 +795,7 @@ let run cctxt ?canceler ?(stop_on_event = fun _ -> false) ~current_proposal delegates >>=? fun initial_state -> - let cloned_block_stream = Lwt_stream.clone block_stream in + let cloned_block_stream = Lwt_stream.clone heads_stream in Baking_nonces.start_revelation_worker cctxt initial_state.global_state.config.nonce @@ -756,9 +809,22 @@ let run cctxt ?canceler ?(stop_on_event = fun _ -> false) Lwt_canceler.cancel revelation_worker_canceler >>= fun _ -> Lwt.return_unit)) canceler ; - + (* FIXME: currently, the client streamed RPC call will hold until at + least one element is present in the stream. This is fixed by: + https://gitlab.com/nomadic-labs/resto/-/merge_requests/50. Until + then, we await the promise completion of the RPC call later + on. *) + let get_valid_blocks_stream = + let*! vbs = Node_rpc.monitor_valid_proposals cctxt ~cache ~chain () in + match vbs with + | Error _ -> Stdlib.failwith "Failed to get the validated blocks stream" + | Ok (vbs, _) -> Lwt.return vbs + in let loop_state = - create_loop_state block_stream initial_state.global_state.operation_worker + create_loop_state + ~get_valid_blocks_stream + ~heads_stream + initial_state.global_state.operation_worker in let on_error err = Events.(emit error_while_baking err) >>= fun () -> diff --git a/src/proto_016_PtMumbai/lib_delegate/baking_scheduling.mli b/src/proto_016_PtMumbai/lib_delegate/baking_scheduling.mli index 83e167cfcf34..ed09245e450d 100644 --- a/src/proto_016_PtMumbai/lib_delegate/baking_scheduling.mli +++ b/src/proto_016_PtMumbai/lib_delegate/baking_scheduling.mli @@ -29,7 +29,10 @@ open Protocol.Alpha_context type loop_state val create_loop_state : - proposal Lwt_stream.t -> Operation_worker.t -> loop_state + ?get_valid_blocks_stream:proposal Lwt_stream.t Lwt.t -> + heads_stream:proposal Lwt_stream.t -> + Operation_worker.t -> + loop_state val sleep_until : Time.Protocol.t -> unit Lwt.t option diff --git a/src/proto_016_PtMumbai/lib_delegate/baking_simulator.ml b/src/proto_016_PtMumbai/lib_delegate/baking_simulator.ml index 74f2b7519cfb..000d7cc3c575 100644 --- a/src/proto_016_PtMumbai/lib_delegate/baking_simulator.ml +++ b/src/proto_016_PtMumbai/lib_delegate/baking_simulator.ml @@ -77,17 +77,11 @@ let check_context_consistency (abstract_index : Abstract_context_index.t) | false -> fail Invalid_context)) let begin_construction ~timestamp ~protocol_data ~force_apply - (abstract_index : Abstract_context_index.t) predecessor chain_id = + ~pred_resulting_context_hash (abstract_index : Abstract_context_index.t) + pred_block chain_id = protect (fun () -> - let { - Baking_state.shell = pred_shell; - hash = pred_hash; - resulting_context_hash; - _; - } = - predecessor - in - abstract_index.checkout_fun resulting_context_hash >>= function + let {Baking_state.shell = pred_shell; hash = pred_hash; _} = pred_block in + abstract_index.checkout_fun pred_resulting_context_hash >>= function | None -> fail Failed_to_checkout_context | Some context -> let header : Tezos_base.Block_header.shell_header = @@ -107,7 +101,7 @@ let begin_construction ~timestamp ~protocol_data ~force_apply let mode = Lifted_protocol.Construction { - predecessor_hash = predecessor.hash; + predecessor_hash = pred_hash; timestamp; block_header_data = protocol_data; } @@ -130,7 +124,14 @@ let begin_construction ~timestamp ~protocol_data ~force_apply else return_none) >>=? fun application_state -> let state = (validation_state, application_state) in - return {predecessor; context; state; rev_operations = []; header}) + return + { + predecessor = pred_block; + context; + state; + rev_operations = []; + header; + }) let ( let** ) x k = let open Lwt_result_syntax in diff --git a/src/proto_016_PtMumbai/lib_delegate/baking_simulator.mli b/src/proto_016_PtMumbai/lib_delegate/baking_simulator.mli index 959546fe6d21..c5155ac45ee0 100644 --- a/src/proto_016_PtMumbai/lib_delegate/baking_simulator.mli +++ b/src/proto_016_PtMumbai/lib_delegate/baking_simulator.mli @@ -52,6 +52,7 @@ val begin_construction : timestamp:Time.Protocol.t -> protocol_data:block_header_data -> force_apply:bool -> + pred_resulting_context_hash:Context_hash.t -> Abstract_context_index.t -> Baking_state.block_info -> Chain_id.t -> diff --git a/src/proto_016_PtMumbai/lib_delegate/baking_state.ml b/src/proto_016_PtMumbai/lib_delegate/baking_state.ml index 8d7b97748316..de069f210571 100644 --- a/src/proto_016_PtMumbai/lib_delegate/baking_state.ml +++ b/src/proto_016_PtMumbai/lib_delegate/baking_state.ml @@ -103,16 +103,12 @@ type prequorum = { type block_info = { hash : Block_hash.t; shell : Block_header.shell_header; - resulting_context_hash : Context_hash.t; payload_hash : Block_payload_hash.t; payload_round : Round.t; round : Round.t; - protocol : Protocol_hash.t; - next_protocol : Protocol_hash.t; prequorum : prequorum option; quorum : Kind.endorsement operation list; payload : Operation_pool.payload; - live_blocks : Block_hash.Set.t; } type cache = { @@ -167,68 +163,48 @@ let block_info_encoding = (fun { hash; shell; - resulting_context_hash; payload_hash; payload_round; round; - protocol; - next_protocol; prequorum; quorum; payload; - live_blocks; } -> - ( ( hash, - shell, - resulting_context_hash, - payload_hash, - payload_round, - round, - protocol, - next_protocol, - prequorum, - List.map Operation.pack quorum ), - (payload, live_blocks) )) - (fun ( ( hash, - shell, - resulting_context_hash, - payload_hash, - payload_round, - round, - protocol, - next_protocol, - prequorum, - quorum ), - (payload, live_blocks) ) -> + ( hash, + shell, + payload_hash, + payload_round, + round, + prequorum, + List.map Operation.pack quorum, + payload )) + (fun ( hash, + shell, + payload_hash, + payload_round, + round, + prequorum, + quorum, + payload ) -> { hash; shell; - resulting_context_hash; payload_hash; payload_round; round; - protocol; - next_protocol; prequorum; quorum = List.filter_map Operation_pool.unpack_endorsement quorum; payload; - live_blocks; }) - (merge_objs - (obj10 - (req "hash" Block_hash.encoding) - (req "shell" Block_header.shell_header_encoding) - (req "resulting_context_hash" Context_hash.encoding) - (req "payload_hash" Block_payload_hash.encoding) - (req "payload_round" Round.encoding) - (req "round" Round.encoding) - (req "protocol" Protocol_hash.encoding) - (req "next_protocol" Protocol_hash.encoding) - (req "prequorum" (option prequorum_encoding)) - (req "quorum" (list (dynamic_size Operation.encoding)))) - (obj2 - (req "payload" Operation_pool.payload_encoding) - (req "live_blocks" Block_hash.Set.encoding))) + (obj8 + (req "hash" Block_hash.encoding) + (req "shell" Block_header.shell_header_encoding) + (req "payload_hash" Block_payload_hash.encoding) + (req "payload_round" Round.encoding) + (req "round" Round.encoding) + (req "prequorum" (option prequorum_encoding)) + (req "quorum" (list (dynamic_size Operation.encoding))) + (req "payload" Operation_pool.payload_encoding)) let round_of_shell_header shell_header = Environment.wrap_tzresult @@ -266,6 +242,9 @@ let proposal_encoding = (req "block" block_info_encoding) (req "predecessor" block_info_encoding)) +let is_first_block_in_protocol {block; predecessor; _} = + Compare.Int.(block.shell.proto_level <> predecessor.shell.proto_level) + type locked_round = {payload_hash : Block_payload_hash.t; round : Round.t} let locked_round_encoding = @@ -300,6 +279,10 @@ type elected_block = { type level_state = { current_level : int32; latest_proposal : proposal; + is_latest_proposal_applied : bool; + delayed_prequorum : + (Operation_worker.candidate * Kind.preendorsement operation list) option; + injected_preendorsements : packed_operation list option; (* Last proposal received where we injected an endorsement (thus we have seen 2f+1 preendorsements) *) locked_round : locked_round option; @@ -312,7 +295,11 @@ type level_state = { next_level_proposed_round : Round.t option; } -type phase = Idle | Awaiting_preendorsements | Awaiting_endorsements +type phase = + | Idle + | Awaiting_preendorsements + | Awaiting_application + | Awaiting_endorsements let phase_encoding = let open Data_encoding in @@ -332,9 +319,15 @@ let phase_encoding = (function Awaiting_preendorsements -> Some () | _ -> None) (fun () -> Awaiting_preendorsements); case - ~title:"Awaiting_endorsements" + ~title:"Awaiting_application" (Tag 2) unit + (function Awaiting_application -> Some () | _ -> None) + (fun () -> Awaiting_application); + case + ~title:"Awaiting_endorsements" + (Tag 3) + unit (function Awaiting_endorsements -> Some () | _ -> None) (fun () -> Awaiting_endorsements); ] @@ -376,18 +369,13 @@ let timeout_kind_encoding = (fun at_round -> Time_to_bake_next_level {at_round}); ] -type voting_power = int - type event = - | New_proposal of proposal + | New_valid_proposal of proposal + | New_head_proposal of proposal | Prequorum_reached of - Operation_worker.candidate - * voting_power - * Kind.preendorsement operation list + Operation_worker.candidate * Kind.preendorsement operation list | Quorum_reached of - Operation_worker.candidate - * voting_power - * Kind.endorsement operation list + Operation_worker.candidate * Kind.endorsement operation list | Timeout of timeout_kind let event_encoding = @@ -396,40 +384,43 @@ let event_encoding = [ case (Tag 0) - ~title:"New_proposal" + ~title:"New_valid_proposal" proposal_encoding - (function New_proposal p -> Some p | _ -> None) - (fun p -> New_proposal p); + (function New_valid_proposal p -> Some p | _ -> None) + (fun p -> New_valid_proposal p); case (Tag 1) + ~title:"New_head_proposal" + proposal_encoding + (function New_head_proposal p -> Some p | _ -> None) + (fun p -> New_head_proposal p); + case + (Tag 2) ~title:"Prequorum_reached" - (tup3 + (tup2 Operation_worker.candidate_encoding - Data_encoding.int31 (Data_encoding.list (dynamic_size Operation.encoding))) (function - | Prequorum_reached (candidate, voting_power, ops) -> - Some (candidate, voting_power, List.map Operation.pack ops) + | Prequorum_reached (candidate, ops) -> + Some (candidate, List.map Operation.pack ops) | _ -> None) - (fun (candidate, voting_power, ops) -> + (fun (candidate, ops) -> Prequorum_reached - (candidate, voting_power, Operation_pool.filter_preendorsements ops)); + (candidate, Operation_pool.filter_preendorsements ops)); case - (Tag 2) + (Tag 3) ~title:"Quorum_reached" - (tup3 + (tup2 Operation_worker.candidate_encoding - Data_encoding.int31 (Data_encoding.list (dynamic_size Operation.encoding))) (function - | Quorum_reached (candidate, voting_power, ops) -> - Some (candidate, voting_power, List.map Operation.pack ops) + | Quorum_reached (candidate, ops) -> + Some (candidate, List.map Operation.pack ops) | _ -> None) - (fun (candidate, voting_power, ops) -> - Quorum_reached - (candidate, voting_power, Operation_pool.filter_endorsements ops)); + (fun (candidate, ops) -> + Quorum_reached (candidate, Operation_pool.filter_endorsements ops)); case - (Tag 3) + (Tag 4) ~title:"Timeout" timeout_kind_encoding (function Timeout tk -> Some tk | _ -> None) @@ -730,18 +721,15 @@ let pp_block_info fmt shell; payload_hash; round; - protocol; - next_protocol; prequorum; quorum; payload; - _; + payload_round; } = Format.fprintf fmt "@[Block:@ hash: %a@ payload_hash: %a@ level: %ld@ round: %a@ \ - protocol: %a@ next protocol: %a@ prequorum: %a@ quorum: %d endorsements@ \ - payload: %a@]" + prequorum: %a@ quorum: %d endorsements@ payload: %a@ payload round: %a@]" Block_hash.pp hash Block_payload_hash.pp_short @@ -749,15 +737,13 @@ let pp_block_info fmt shell.level Round.pp round - Protocol_hash.pp_short - protocol - Protocol_hash.pp_short - next_protocol (pp_option pp_prequorum) prequorum (List.length quorum) Operation_pool.pp_payload payload + Round.pp + payload_round let pp_proposal fmt {block; _} = pp_block_info fmt block @@ -817,6 +803,9 @@ let pp_level_state fmt { current_level; latest_proposal; + is_latest_proposal_applied; + delayed_prequorum; + injected_preendorsements; locked_round; endorsable_payload; elected_block; @@ -826,11 +815,15 @@ let pp_level_state fmt } = Format.fprintf fmt - "@[Level state:@ current level: %ld@ @[proposal:@ %a@]@ locked \ - round: %a@ endorsable payload: %a@ elected block: %a@ @[own delegate \ + "@[Level state:@ current level: %ld@ @[proposal (applied:%b, \ + delayed prequorum:%b, injected preendorsements: %d):@ %a@]@ locked round: \ + %a@ endorsable payload: %a@ elected block: %a@ @[own delegate \ slots:@ %a@]@ @[next level own delegate slots:@ %a@]@ next level \ proposed round: %a@]" current_level + is_latest_proposal_applied + (Option.is_some delayed_prequorum) + (match injected_preendorsements with None -> 0 | Some l -> List.length l) pp_proposal latest_proposal (pp_option pp_locked_round) @@ -849,6 +842,7 @@ let pp_level_state fmt let pp_phase fmt = function | Idle -> Format.fprintf fmt "idle" | Awaiting_preendorsements -> Format.fprintf fmt "awaiting preendorsements" + | Awaiting_application -> Format.fprintf fmt "awaiting application" | Awaiting_endorsements -> Format.fprintf fmt "awaiting endorsements" let pp_round_state fmt {current_round; current_phase} = @@ -878,29 +872,32 @@ let pp_timeout_kind fmt = function Format.fprintf fmt "time to bake next level at round %a" Round.pp at_round let pp_event fmt = function - | New_proposal proposal -> + | New_valid_proposal proposal -> + Format.fprintf + fmt + "new valid proposal received: %a" + pp_block_info + proposal.block + | New_head_proposal proposal -> Format.fprintf fmt - "new proposal received: %a" + "new head proposal received: %a" pp_block_info proposal.block - | Prequorum_reached (candidate, voting_power, preendos) -> + | Prequorum_reached (candidate, preendos) -> Format.fprintf fmt - "pre-quorum reached with %d preendorsements (power: %d) for %a at \ - round %a" + "prequorum reached with %d preendorsements for %a at round %a" (List.length preendos) - voting_power Block_hash.pp candidate.Operation_worker.hash Round.pp candidate.round_watched - | Quorum_reached (candidate, voting_power, endos) -> + | Quorum_reached (candidate, endos) -> Format.fprintf fmt - "quorum reached with %d endorsements (power: %d) for %a at round %a" + "quorum reached with %d endorsements for %a at round %a" (List.length endos) - voting_power Block_hash.pp candidate.Operation_worker.hash Round.pp diff --git a/src/proto_016_PtMumbai/lib_delegate/baking_state.mli b/src/proto_016_PtMumbai/lib_delegate/baking_state.mli index fef6d3ccc336..cfa439d81738 100644 --- a/src/proto_016_PtMumbai/lib_delegate/baking_state.mli +++ b/src/proto_016_PtMumbai/lib_delegate/baking_state.mli @@ -57,18 +57,12 @@ type prequorum = { type block_info = { hash : Block_hash.t; shell : Block_header.shell_header; - resulting_context_hash : Context_hash.t; payload_hash : Block_payload_hash.t; payload_round : Round.t; round : Round.t; - protocol : Protocol_hash.t; - next_protocol : Protocol_hash.t; prequorum : prequorum option; quorum : Kind.endorsement operation list; payload : Operation_pool.payload; - live_blocks : Block_hash.Set.t; - (** Set of live blocks for this block that is used to filter - old or too recent operations. *) } type cache = { @@ -108,6 +102,15 @@ type proposal = {block : block_info; predecessor : block_info} val proposal_encoding : proposal Data_encoding.t +(** Identify the first block of the protocol, ie. the block that + activates the current protocol. + + This block should be baked by the baker of the previous protocol + (that's why this same block is also referred to as the last block + of the previous protocol). It is always considered final and + therefore is not endorsed.*) +val is_first_block_in_protocol : proposal -> bool + type locked_round = {payload_hash : Block_payload_hash.t; round : Round.t} val locked_round_encoding : locked_round Data_encoding.t @@ -124,6 +127,10 @@ type elected_block = { type level_state = { current_level : int32; latest_proposal : proposal; + is_latest_proposal_applied : bool; + delayed_prequorum : + (Operation_worker.candidate * Kind.preendorsement operation list) option; + injected_preendorsements : packed_operation list option; locked_round : locked_round option; endorsable_payload : endorsable_payload option; elected_block : elected_block option; @@ -132,7 +139,11 @@ type level_state = { next_level_proposed_round : Round.t option; } -type phase = Idle | Awaiting_preendorsements | Awaiting_endorsements +type phase = + | Idle + | Awaiting_preendorsements + | Awaiting_application + | Awaiting_endorsements val phase_encoding : phase Data_encoding.t @@ -154,18 +165,13 @@ type timeout_kind = val timeout_kind_encoding : timeout_kind Data_encoding.t -type voting_power = int - type event = - | New_proposal of proposal + | New_valid_proposal of proposal + | New_head_proposal of proposal | Prequorum_reached of - Operation_worker.candidate - * voting_power - * Kind.preendorsement operation list + Operation_worker.candidate * Kind.preendorsement operation list | Quorum_reached of - Operation_worker.candidate - * voting_power - * Kind.endorsement operation list + Operation_worker.candidate * Kind.endorsement operation list | Timeout of timeout_kind val event_encoding : event Data_encoding.t diff --git a/src/proto_016_PtMumbai/lib_delegate/block_forge.ml b/src/proto_016_PtMumbai/lib_delegate/block_forge.ml index d3be206ac6b6..01a06159e467 100644 --- a/src/proto_016_PtMumbai/lib_delegate/block_forge.ml +++ b/src/proto_016_PtMumbai/lib_delegate/block_forge.ml @@ -69,17 +69,19 @@ let convert_operation (op : packed_operation) : Tezos_base.Operation.t = op.protocol_data; } -(* [finalize_block_header ~shell_header ~validation_result ~operations_hash - ~pred_info ~round ~locked_round] updates the [shell_header] that was created - with dummy fields at the beginning of the block construction. It increments - the [level] and sets the actual [operations_hash], [fitness], - [validation_passes], and [context] (the predecessor resulting context hash). +(* [finalize_block_header] updates the [shell_header] that was created + with dummy fields at the beginning of the block construction. It + increments the [level] and sets the actual [operations_hash], + [fitness], [validation_passes], and [context] (the predecessor + resulting context hash). - When the operations from the block have been applied, the [fitness] is simply - retrieved from the [validation_result]. Otherwise, the [fitness] is computed - from the [round] and [locked_round] arguments. *) + When the operations from the block have been applied, the [fitness] + is simply retrieved from the [validation_result]. Otherwise, the + [fitness] is computed from the [round] and [locked_round] + arguments. *) let finalize_block_header ~shell_header ~validation_result ~operations_hash - ~(pred_info : Baking_state.block_info) ~round ~locked_round = + ~(pred_info : Baking_state.block_info) ~pred_resulting_context_hash ~round + ~locked_round = let open Lwt_result_syntax in let* fitness = match validation_result with @@ -108,7 +110,7 @@ let finalize_block_header ~shell_header ~validation_result ~operations_hash validation_passes; operations_hash; fitness; - context = pred_info.resulting_context_hash; + context = pred_resulting_context_hash; } in return header @@ -203,14 +205,15 @@ let filter_via_node ~chain_id ~fees_config ~hard_gas_limit_per_block [filter_via_node] is called to return these values. *) let filter_with_context ~chain_id ~fees_config ~hard_gas_limit_per_block ~faked_protocol_data ~user_activated_upgrades ~timestamp - ~(pred_info : Baking_state.block_info) ~force_apply ~round ~context_index - ~payload_round ~operation_pool cctxt = + ~(pred_info : Baking_state.block_info) ~pred_resulting_context_hash + ~force_apply ~round ~context_index ~payload_round ~operation_pool cctxt = let open Lwt_result_syntax in let* incremental = Baking_simulator.begin_construction ~timestamp ~protocol_data:faked_protocol_data ~force_apply + ~pred_resulting_context_hash context_index pred_info chain_id @@ -248,6 +251,7 @@ let filter_with_context ~chain_id ~fees_config ~hard_gas_limit_per_block ~validation_result ~operations_hash ~pred_info + ~pred_resulting_context_hash ~round ~locked_round:None in @@ -288,14 +292,16 @@ let apply_via_node ~chain_id ~faked_protocol_data ~timestamp consensus operations only from an [ordered_pool] via {!Operation_selection.filter_consensus_operations_only}. *) let apply_with_context ~chain_id ~faked_protocol_data ~user_activated_upgrades - ~timestamp ~(pred_info : Baking_state.block_info) ~force_apply ~round - ~ordered_pool ~context_index ~payload_hash cctxt = + ~timestamp ~(pred_info : Baking_state.block_info) + ~pred_resulting_context_hash ~force_apply ~round ~ordered_pool + ~context_index ~payload_hash cctxt = let open Lwt_result_syntax in let* incremental = Baking_simulator.begin_construction ~timestamp ~protocol_data:faked_protocol_data ~force_apply + ~pred_resulting_context_hash context_index pred_info chain_id @@ -358,6 +364,7 @@ let apply_with_context ~chain_id ~faked_protocol_data ~user_activated_upgrades ~validation_result ~operations_hash ~pred_info + ~pred_resulting_context_hash ~round ~locked_round:locked_round_when_no_validation_result in @@ -367,10 +374,10 @@ let apply_with_context ~chain_id ~faked_protocol_data ~user_activated_upgrades (* [forge] a new [unsigned_block] in accordance with [simulation_kind] and [simulation_mode] *) let forge (cctxt : #Protocol_client_context.full) ~chain_id - ~(pred_info : Baking_state.block_info) ~timestamp ~round - ~liquidity_baking_toggle_vote ~user_activated_upgrades fees_config - ~force_apply ~seed_nonce_hash ~payload_round simulation_mode simulation_kind - constants = + ~(pred_info : Baking_state.block_info) ~pred_resulting_context_hash + ~pred_live_blocks ~timestamp ~round ~liquidity_baking_toggle_vote + ~user_activated_upgrades fees_config ~force_apply ~seed_nonce_hash + ~payload_round simulation_mode simulation_kind constants = let open Lwt_result_syntax in let hard_gas_limit_per_block = constants.Constants.Parametric.hard_gas_limit_per_block @@ -382,7 +389,7 @@ let forge (cctxt : #Protocol_client_context.full) ~chain_id to our predecessor otherwise the node would reject the block. *) let filtered_pool = retain_live_operations_only - ~live_blocks:pred_info.live_blocks + ~live_blocks:pred_live_blocks operation_pool in Filter filtered_pool @@ -441,6 +448,7 @@ let forge (cctxt : #Protocol_client_context.full) ~chain_id ~user_activated_upgrades ~timestamp ~pred_info + ~pred_resulting_context_hash ~force_apply ~round ~context_index @@ -462,6 +470,7 @@ let forge (cctxt : #Protocol_client_context.full) ~chain_id ~user_activated_upgrades ~timestamp ~pred_info + ~pred_resulting_context_hash ~force_apply ~round ~ordered_pool diff --git a/src/proto_016_PtMumbai/lib_delegate/block_forge.mli b/src/proto_016_PtMumbai/lib_delegate/block_forge.mli index 77fb619a192f..7afc2ce9791f 100644 --- a/src/proto_016_PtMumbai/lib_delegate/block_forge.mli +++ b/src/proto_016_PtMumbai/lib_delegate/block_forge.mli @@ -44,6 +44,8 @@ val forge : #Protocol_client_context.full -> chain_id:Chain_id.t -> pred_info:Baking_state.block_info -> + pred_resulting_context_hash:Context_hash.t -> + pred_live_blocks:Block_hash.Set.t -> timestamp:Time.Protocol.t -> round:Round.t -> liquidity_baking_toggle_vote:Liquidity_baking.liquidity_baking_toggle_vote -> diff --git a/src/proto_016_PtMumbai/lib_delegate/node_rpc.ml b/src/proto_016_PtMumbai/lib_delegate/node_rpc.ml index a6d3d71cc15e..754e96117c11 100644 --- a/src/proto_016_PtMumbai/lib_delegate/node_rpc.ml +++ b/src/proto_016_PtMumbai/lib_delegate/node_rpc.ml @@ -25,6 +25,8 @@ open Protocol open Alpha_context +open Baking_cache +open Baking_state module Block_services = Block_services.Make (Protocol) (Protocol) module Events = Baking_events.Node_rpc @@ -33,12 +35,22 @@ let inject_block cctxt ?(force = false) ~chain signed_block_header operations = Data_encoding.Binary.to_bytes_exn Block_header.encoding signed_block_header in Shell_services.Injection.block + ~async:true cctxt ~chain ~force signed_shell_header_bytes operations +let inject_operation cctxt ~chain operation = + let encoded_op = + Data_encoding.Binary.to_bytes_exn Operation.encoding operation + in + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4875 + `Shell_services.Injection.operation` should be used instead once + the needed changes in the protocol are in place. *) + Shell_services.Injection.private_operation cctxt ~async:true ~chain encoded_op + let preapply_block cctxt ~chain ~head ~timestamp ~protocol_data operations = Block_services.Helpers.Preapply.block cctxt @@ -50,164 +62,219 @@ let preapply_block cctxt ~chain ~head ~timestamp ~protocol_data operations = let extract_prequorum preendorsements = match preendorsements with - | h :: _ as l -> + | h :: _ -> let ({protocol_data = {contents = Single (Preendorsement content); _}; _}) = (h : Kind.preendorsement Operation.t) in Some { - Baking_state.level = Raw_level.to_int32 content.level; + level = Raw_level.to_int32 content.level; round = content.round; block_payload_hash = content.block_payload_hash; - preendorsements = l; + preendorsements; } | _ -> None -let raw_info cctxt ~chain ~block_hash shell resulting_context_hash payload_hash - payload_round current_protocol next_protocol live_blocks = - Events.(emit raw_info (block_hash, shell.Tezos_base.Block_header.level)) - >>= fun () -> - let open Protocol_client_context in - let block = `Hash (block_hash, 0) in - let is_in_protocol = Protocol_hash.(current_protocol = Protocol.hash) in - (if is_in_protocol then - Alpha_block_services.Operations.operations cctxt ~chain ~block () - >>=? fun operations -> - let operations = - List.map - (fun l -> - List.map - (fun {Alpha_block_services.shell; protocol_data; _} -> - {Alpha_context.shell; protocol_data}) - l) - operations - in - match Operation_pool.extract_operations_of_list_list operations with - | None -> failwith "Unexpected operation list size" - | Some operations -> return operations - else - (* If we are not in the current protocol, do no consider operations *) - return (None, [], Operation_pool.empty_payload)) - >>=? fun (preendorsements, quorum, payload) -> - (match Baking_state.round_of_shell_header shell with - | Ok round -> ok round - | _ -> - (* this can occur if the protocol has just changed and the - previous protocol does not have a concept of round - (e.g. Genesis) *) - ok Round.zero) - >>?= fun round -> - let prequorum = Option.bind preendorsements extract_prequorum in +let info_of_header_and_ops ~in_protocol block_hash block_header operations = + let open Result_syntax in + let shell = block_header.Tezos_base.Block_header.shell in + let dummy_payload_hash = Block_payload_hash.zero in + let* round = + Environment.wrap_tzresult @@ Fitness.round_from_raw shell.fitness + in + let payload_hash, payload_round, prequorum, quorum, payload = + if not in_protocol then + (* The first block in the protocol is baked using the previous + protocol, the encodings might change. The baker's logic is to + consider final the first block of a new protocol and not + endorse it. Therefore, we do not need to have the correct + values here. *) + (dummy_payload_hash, Round.zero, None, [], Operation_pool.empty_payload) + else + let payload_hash, payload_round = + match + Data_encoding.Binary.of_bytes_opt + Protocol.block_header_data_encoding + block_header.protocol_data + with + | Some {contents = {payload_hash; payload_round; _}; _} -> + (payload_hash, payload_round) + | None -> assert false + in + let preendorsements, quorum, payload = + WithExceptions.Option.get + ~loc:__LOC__ + (Operation_pool.extract_operations_of_list_list operations) + in + let prequorum = Option.bind preendorsements extract_prequorum in + (payload_hash, payload_round, prequorum, quorum, payload) + in return { - Baking_state.hash = block_hash; + hash = block_hash; shell; - resulting_context_hash; payload_hash; payload_round; round; - protocol = current_protocol; - next_protocol; prequorum; quorum; payload; - live_blocks; } -let dummy_payload_hash = Block_payload_hash.zero +let compute_block_info cctxt ~in_protocol ?operations ~chain block_hash + block_header = + let open Lwt_result_syntax in + let* operations = + match operations with + | None when not in_protocol -> return_nil + | None -> + let open Protocol_client_context in + let* operations = + Alpha_block_services.Operations.operations + cctxt + ~chain + ~block:(`Hash (block_hash, 0)) + () + in + let packed_operations = + List.map + (fun l -> + List.map + (fun {Alpha_block_services.shell; protocol_data; _} -> + {Alpha_context.shell; protocol_data}) + l) + operations + in + return packed_operations + | Some operations -> + let parse_op (raw_op : Tezos_base.Operation.t) = + let protocol_data = + Data_encoding.Binary.of_bytes_exn + Operation.protocol_data_encoding + raw_op.proto + in + {shell = raw_op.shell; protocol_data} + in + protect @@ fun () -> return (List.map (List.map parse_op) operations) + in + let*? block_info = + info_of_header_and_ops ~in_protocol block_hash block_header operations + in + return block_info -let info cctxt ~chain ~block () = - let open Protocol_client_context in - (* Fails if the block's protocol is not the current one *) - Shell_services.Blocks.protocols cctxt ~chain ~block () - >>=? fun {current_protocol; next_protocol} -> - Shell_services.Blocks.resulting_context_hash cctxt ~chain ~block () - >>=? fun resulting_context_hash -> - (if Protocol_hash.(current_protocol <> Protocol.hash) then - Block_services.Header.shell_header cctxt ~chain ~block () >>=? fun shell -> - Chain_services.Blocks.Header.raw_protocol_data cctxt ~chain ~block () - >>=? fun protocol_data -> - let hash = - Tezos_base.Block_header.hash {Tezos_base.Block_header.shell; protocol_data} - in - (* /!\ We decode [protocol_data] with the current protocol's - encoding, while we should use the previous protocol's - [protocol_data] encoding. For now, this works because the - encoding has not changed. *) - let payload_hash, payload_round = - match - Data_encoding.Binary.of_bytes_opt - Protocol.block_header_data_encoding - protocol_data - with - | Some {contents = {payload_hash; payload_round; _}; _} -> - (payload_hash, payload_round) - | None -> (dummy_payload_hash, Round.zero) - in - return (hash, shell, resulting_context_hash, payload_hash, payload_round) - else - Alpha_block_services.header cctxt ~chain ~block () - >>=? fun {hash; shell; protocol_data; _} -> - return - ( hash, - shell, - resulting_context_hash, - protocol_data.contents.payload_hash, - protocol_data.contents.payload_round )) - >>=? fun (hash, shell, resulting_context_hash, payload_hash, payload_round) -> - (Chain_services.Blocks.live_blocks cctxt ~chain ~block () >>= function - | Error _ -> - (* The RPC might fail when a block's metadata is not available *) - Lwt.return Block_hash.Set.empty - | Ok live_blocks -> Lwt.return live_blocks) - >>= fun live_blocks -> - raw_info - cctxt - ~chain - ~block_hash:hash - shell - resulting_context_hash - payload_hash - payload_round - current_protocol - next_protocol - live_blocks +let proposal cctxt ?(cache : block_info Block_cache.t option) ?operations ~chain + block_hash (block_header : Tezos_base.Block_header.t) = + let open Lwt_result_syntax in + let predecessor_hash = block_header.shell.predecessor in + let pred_block = `Hash (predecessor_hash, 0) in + let predecessor_opt = + Option.bind cache (fun cache -> Block_cache.find_opt cache predecessor_hash) + in + let* is_proposal_in_protocol, predecessor = + match predecessor_opt with + | Some predecessor -> + return + ( predecessor.shell.proto_level = block_header.shell.proto_level, + predecessor ) + | None -> + let* { + current_protocol = pred_current_protocol; + next_protocol = pred_next_protocol; + } = + Shell_services.Blocks.protocols cctxt ~chain ~block:pred_block () + in + let is_proposal_in_protocol = + Protocol_hash.(pred_next_protocol = Protocol.hash) + in + let* predecessor = + let in_protocol = + Protocol_hash.(pred_current_protocol = Protocol.hash) + in + let* raw_header_b = + Shell_services.Blocks.raw_header cctxt ~chain ~block:pred_block () + in + let predecessor_header = + Data_encoding.Binary.of_bytes_exn + Tezos_base.Block_header.encoding + raw_header_b + in + compute_block_info + cctxt + ~in_protocol + ~chain + predecessor_hash + predecessor_header + in + Option.iter + (fun cache -> Block_cache.replace cache predecessor_hash predecessor) + cache ; + return (is_proposal_in_protocol, predecessor) + in + let block_opt = + Option.bind cache (fun cache -> Block_cache.find_opt cache block_hash) + in + let* block = + match block_opt with + | Some pi -> return pi + | None -> + let* pi = + compute_block_info + cctxt + ~in_protocol:is_proposal_in_protocol + ?operations + ~chain + block_hash + block_header + in + Option.iter (fun cache -> Block_cache.replace cache block_hash pi) cache ; + return pi + in + return {block; predecessor} -let find_in_cache_or_fetch cctxt ?cache ~chain block_hash = - let open Baking_cache in - let fetch () = info cctxt ~chain ~block:(`Hash (block_hash, 0)) () in - match cache with - | None -> fetch () - | Some block_cache -> ( - match Block_cache.find_opt block_cache block_hash with - | Some block_info -> return block_info - | None -> - fetch () >>=? fun block_info -> - Block_cache.replace block_cache block_hash block_info ; - return block_info) +let proposal cctxt ?cache ?operations ~chain block_hash block_header = + protect @@ fun () -> + proposal cctxt ?cache ?operations ~chain block_hash block_header -let proposal cctxt ?cache ~chain block_hash = - find_in_cache_or_fetch cctxt ~chain ?cache block_hash >>=? fun block -> - let predecessor_hash = block.shell.predecessor in - find_in_cache_or_fetch cctxt ~chain ?cache predecessor_hash - >>=? fun predecessor -> return {Baking_state.block; predecessor} +let monitor_valid_proposals cctxt ~chain ?cache () = + let open Lwt_result_syntax in + let next_protocols = [Protocol.hash] in + let* block_stream, stopper = + Monitor_services.validated_blocks cctxt ~chains:[chain] ~next_protocols () + in + let stream = + let map (_chain_id, block_hash, block_header, operations) = + let*! map_result = + proposal cctxt ?cache ~operations ~chain block_hash block_header + in + match map_result with + | Ok proposal -> Lwt.return_some proposal + | Error err -> + let*! () = Events.(emit error_while_monitoring_valid_proposals err) in + Lwt.return_none + in + Lwt_stream.filter_map_s map block_stream + in + return (stream, stopper) -let monitor_proposals cctxt ~chain () = - let cache = Baking_cache.Block_cache.create 100 in - Monitor_services.heads cctxt ~next_protocols:[Protocol.hash] chain - >>=? fun (block_stream, stopper) -> - return - ( Lwt_stream.filter_map_s - (fun (block_hash, _) -> - protect (fun () -> proposal cctxt ~cache ~chain block_hash) - >>= function - | Ok proposal -> Lwt.return_some proposal - | Error err -> - Events.(emit error_while_monitoring_heads err) >>= fun () -> - Lwt.return_none) - block_stream, - stopper ) +let monitor_heads cctxt ~chain ?cache () = + let open Lwt_result_syntax in + let next_protocols = [Protocol.hash] in + let* block_stream, stopper = + Monitor_services.heads cctxt ~next_protocols chain + in + let stream = + let map (block_hash, block_header) = + let*! map_result = proposal cctxt ?cache ~chain block_hash block_header in + match map_result with + | Ok proposal -> Lwt.return_some proposal + | Error err -> + let*! () = Events.(emit error_while_monitoring_heads err) in + Lwt.return_none + in + Lwt_stream.filter_map_s map block_stream + in + return (stream, stopper) let await_protocol_activation cctxt ~chain () = Monitor_services.heads cctxt ~next_protocols:[Protocol.hash] chain diff --git a/src/proto_016_PtMumbai/lib_delegate/node_rpc.mli b/src/proto_016_PtMumbai/lib_delegate/node_rpc.mli index 2dd0468ef6f1..00977029991a 100644 --- a/src/proto_016_PtMumbai/lib_delegate/node_rpc.mli +++ b/src/proto_016_PtMumbai/lib_delegate/node_rpc.mli @@ -39,6 +39,16 @@ val inject_block : Tezos_base.Operation.t list list -> Block_hash.t tzresult Lwt.t +(** Inject an operation. + + @return operation hash of the newly injected operation +*) +val inject_operation : + #Protocol_client_context.full -> + chain:Shell_services.chain -> + packed_operation -> + Operation_hash.t tzresult Lwt.t + (** Preapply a block using the node validation mechanism.*) val preapply_block : #Protocol_client_context.full -> @@ -50,21 +60,19 @@ val preapply_block : (Tezos_base.Block_header.shell_header * error Preapply_result.t list) tzresult Lwt.t -(** Fetch a proposal from the node. - - @param cache is unset by default -*) -val proposal : - #Tezos_rpc.Context.simple -> - ?cache:Baking_state.block_info Baking_cache.Block_cache.t -> +(** Monitor validated blocks/proposals from the node. *) +val monitor_valid_proposals : + #Protocol_client_context.rpc_context -> chain:Shell_services.chain -> - Block_hash.t -> - Baking_state.proposal tzresult Lwt.t + ?cache:Baking_state.block_info Baking_cache.Block_cache.t -> + unit -> + (Baking_state.proposal Lwt_stream.t * (unit -> unit)) tzresult Lwt.t -(** Monitor proposals from the node.*) -val monitor_proposals : +(** Monitor heads from the node. *) +val monitor_heads : #Protocol_client_context.rpc_context -> chain:Shell_services.chain -> + ?cache:Baking_state.block_info Baking_cache.Block_cache.t -> unit -> (Baking_state.proposal Lwt_stream.t * (unit -> unit)) tzresult Lwt.t diff --git a/src/proto_016_PtMumbai/lib_delegate/operation_worker.ml b/src/proto_016_PtMumbai/lib_delegate/operation_worker.ml index 66cec5723476..1568bd5bb901 100644 --- a/src/proto_016_PtMumbai/lib_delegate/operation_worker.ml +++ b/src/proto_016_PtMumbai/lib_delegate/operation_worker.ml @@ -23,13 +23,6 @@ (* *) (*****************************************************************************) -(* TODO: - add events + - running state introspection to recover/restart on failure - - Do we need a mutex ? -*) - open Protocol_client_context open Protocol open Alpha_context @@ -64,7 +57,7 @@ module Events = struct ~name:"pqc_reached" ~level:Debug ~msg: - "pre-quorum reached (voting power: {voting_power}, {preendorsements} \ + "prequorum reached (voting power: {voting_power}, {preendorsements} \ preendorsements)" ~pp1:pp_int ("voting_power", Data_encoding.int31) @@ -162,12 +155,9 @@ let candidate_encoding = (req "round_watched" Round.encoding) (req "payload_hash_watched" Block_payload_hash.encoding)) -type voting_power = int - type event = - | Prequorum_reached of - candidate * voting_power * Kind.preendorsement operation list - | Quorum_reached of candidate * voting_power * Kind.endorsement operation list + | Prequorum_reached of candidate * Kind.preendorsement operation list + | Quorum_reached of candidate * Kind.endorsement operation list type pqc_watched = { candidate_watched : candidate; @@ -257,6 +247,21 @@ let is_valid_consensus_content (candidate : candidate) consensus_content = let cancel_monitoring state = state.proposal_watched <- None +let reset_monitoring state = + Lwt_mutex.with_lock state.lock @@ fun () -> + match state.proposal_watched with + | None -> Lwt.return_unit + | Some (Pqc_watch pqc_watched) -> + pqc_watched.current_voting_power <- 0 ; + pqc_watched.preendorsements_count <- 0 ; + pqc_watched.preendorsements_received <- [] ; + Lwt.return_unit + | Some (Qc_watch qc_watched) -> + qc_watched.current_voting_power <- 0 ; + qc_watched.endorsements_count <- 0 ; + qc_watched.endorsements_received <- [] ; + Lwt.return_unit + let update_monitoring ?(should_lock = true) state ops = (if should_lock then Lwt_mutex.with_lock state.lock else fun f -> f ()) @@ fun () -> @@ -310,7 +315,6 @@ let update_monitoring ?(should_lock = true) state ops = (Some (Prequorum_reached ( candidate_watched, - proposal_watched.current_voting_power, List.rev proposal_watched.preendorsements_received ))) ; (* Once the event has been emitted, we cancel the monitoring *) cancel_monitoring state ; @@ -370,7 +374,6 @@ let update_monitoring ?(should_lock = true) state ops = (Some (Quorum_reached ( candidate_watched, - proposal_watched.current_voting_power, List.rev proposal_watched.endorsements_received ))) ; (* Once the event has been emitted, we cancel the monitoring *) cancel_monitoring state ; @@ -496,11 +499,10 @@ let create ?(monitor_node_operations = true) Lwt_stream.get operation_stream >>= function | None -> (* When the stream closes, it means a new head has been set, - we cancel the monitoring and flush current operations *) + we reset the monitoring and flush current operations *) Events.(emit end_of_stream ()) >>= fun () -> op_stream_stopper () ; - cancel_monitoring state ; - worker_loop () + reset_monitoring state >>= fun () -> worker_loop () | Some ops -> state.operation_pool <- Operation_pool.add_operations state.operation_pool ops ; diff --git a/src/proto_016_PtMumbai/lib_delegate/operation_worker.mli b/src/proto_016_PtMumbai/lib_delegate/operation_worker.mli index eecbc990f2d1..1c12e7356c37 100644 --- a/src/proto_016_PtMumbai/lib_delegate/operation_worker.mli +++ b/src/proto_016_PtMumbai/lib_delegate/operation_worker.mli @@ -41,12 +41,9 @@ type candidate = { val candidate_encoding : candidate Data_encoding.t -type voting_power = int - type event = - | Prequorum_reached of - candidate * voting_power * Kind.preendorsement operation list - | Quorum_reached of candidate * voting_power * Kind.endorsement operation list + | Prequorum_reached of candidate * Kind.preendorsement operation list + | Quorum_reached of candidate * Kind.endorsement operation list (** {1 Constructors}*) diff --git a/src/proto_016_PtMumbai/lib_delegate/state_transitions.ml b/src/proto_016_PtMumbai/lib_delegate/state_transitions.ml index 5cc55044263d..ab8f6d35d7ba 100644 --- a/src/proto_016_PtMumbai/lib_delegate/state_transitions.ml +++ b/src/proto_016_PtMumbai/lib_delegate/state_transitions.ml @@ -95,31 +95,50 @@ let make_preendorse_action state proposal = in Inject_preendorsements {preendorsements} -let update_proposal state proposal = +let update_proposal ~is_proposal_applied state proposal = Events.(emit updating_latest_proposal proposal.block.hash) >>= fun () -> - let new_level_state = {state.level_state with latest_proposal = proposal} in + let prev_proposal = state.level_state.latest_proposal in + let is_latest_proposal_applied = + (* mark as applied if it is indeed applied or if this specific proposal was + already marked as applied *) + is_proposal_applied + || prev_proposal.block.hash = proposal.block.hash + && state.level_state.is_latest_proposal_applied + in + let new_level_state = + { + state.level_state with + is_latest_proposal_applied; + latest_proposal = proposal; + } + in Lwt.return {state with level_state = new_level_state} -let may_update_proposal state (proposal : proposal) = +let may_update_proposal ~is_proposal_applied state (proposal : proposal) = assert ( Compare.Int32.( state.level_state.latest_proposal.block.shell.level = proposal.block.shell.level)) ; if Round.(state.level_state.latest_proposal.block.round < proposal.block.round) - then update_proposal state proposal + then update_proposal ~is_proposal_applied state proposal else Lwt.return state let preendorse state proposal = - if Protocol_hash.(proposal.block.protocol <> proposal.block.next_protocol) - then + if Baking_state.is_first_block_in_protocol proposal then (* We do not preendorse the first transition block *) let new_state = update_current_phase state Idle in Lwt.return (new_state, Do_nothing) else Events.(emit attempting_preendorse_proposal proposal.block.hash) >>= fun () -> - let new_state = update_current_phase state Awaiting_preendorsements in + let new_state = + (* We await for the block to be applied before updating its + locked values. *) + if state.level_state.is_latest_proposal_applied then + update_current_phase state Awaiting_preendorsements + else update_current_phase state Awaiting_application + in Lwt.return (new_state, make_preendorse_action state proposal) let extract_pqc state (new_proposal : proposal) = @@ -179,9 +198,35 @@ let may_update_endorsable_payload_with_internal_pqc state in {state with level_state = new_level_state} -let rec handle_new_proposal state (new_proposal : proposal) = +let may_update_is_latest_proposal_applied ~is_proposal_applied state + new_proposal = + let current_proposal = state.level_state.latest_proposal in + if + is_proposal_applied + && Block_hash.(current_proposal.block.hash = new_proposal.block.hash) + then + let new_level_state = + {state.level_state with is_latest_proposal_applied = true} + in + let new_state = {state with level_state = new_level_state} in + new_state + else state + +let has_already_been_handled state new_proposal = + let current_proposal = state.level_state.latest_proposal in + Block_hash.(current_proposal.block.hash = new_proposal.block.hash) + && state.level_state.is_latest_proposal_applied + +let rec handle_proposal ~is_proposal_applied state (new_proposal : proposal) = let current_level = state.level_state.current_level in let new_proposal_level = new_proposal.block.shell.level in + let current_proposal = state.level_state.latest_proposal in + let state = + may_update_is_latest_proposal_applied + ~is_proposal_applied + state + new_proposal + in if Compare.Int32.(current_level > new_proposal_level) then (* The baker is ahead, a reorg may have happened. Do nothing: wait for the node to send us the branch's head. This new head @@ -189,12 +234,11 @@ let rec handle_new_proposal state (new_proposal : proposal) = proposal and thus, its level should be at least the same as our current proposal's level. *) Events.(emit baker_is_ahead_of_node (current_level, new_proposal_level)) - >>= fun () -> Lwt.return (state, Do_nothing) + >>= fun () -> do_nothing state else if Compare.Int32.(current_level = new_proposal_level) then - (* The received head is a new proposal for the current level: - let's check if it's a valid one for us. *) - let current_proposal = state.level_state.latest_proposal in if + (* The received head is a new proposal for the current level: + let's check if it's a valid one for us. *) Block_hash.( current_proposal.predecessor.hash <> new_proposal.predecessor.hash) then @@ -202,7 +246,7 @@ let rec handle_new_proposal state (new_proposal : proposal) = emit new_proposal_is_on_another_branch (current_proposal.predecessor.hash, new_proposal.predecessor.hash)) - >>= fun () -> may_switch_branch state new_proposal + >>= fun () -> may_switch_branch ~is_proposal_applied state new_proposal else is_acceptable_proposal_for_current_level state new_proposal >>= function | Invalid -> @@ -217,15 +261,16 @@ let rec handle_new_proposal state (new_proposal : proposal) = (* The proposal is outdated: we update to be able to extract its included endorsements but we do not endorse it *) Events.(emit outdated_proposal new_proposal.block.hash) >>= fun () -> - may_update_proposal state new_proposal >>= fun state -> - do_nothing state + may_update_proposal ~is_proposal_applied state new_proposal + >>= fun state -> do_nothing state | Valid_proposal -> ( (* Valid_proposal => proposal.round = current_round *) (* Check whether we need to update our endorsable payload *) let new_state = may_update_endorsable_payload_with_internal_pqc state new_proposal in - may_update_proposal new_state new_proposal >>= fun new_state -> + may_update_proposal ~is_proposal_applied new_state new_proposal + >>= fun new_state -> (* The proposal is valid but maybe we already locked on a payload *) match new_state.level_state.locked_round with | Some locked_round -> ( @@ -243,18 +288,22 @@ let rec handle_new_proposal state (new_proposal : proposal) = (* This PQC is above our locked_round, we can preendorse it *) preendorse new_state new_proposal | _ -> - (* We shouldn't preendorse this proposal, but we should at - least watch (pre)quorums events on it *) - let new_state = - update_current_phase new_state Awaiting_preendorsements - in - Lwt.return (new_state, Watch_proposal)) + (* We shouldn't preendorse this proposal, but we + should at least watch (pre)quorums events on it + but only when it is applied otherwise we await + for the proposal to be applied. *) + if is_proposal_applied then + let new_state = + update_current_phase new_state Awaiting_preendorsements + in + Lwt.return (new_state, Watch_proposal) + else do_nothing new_state) | None -> (* Otherwise, we did not lock on any payload, thus we can preendorse it *) preendorse new_state new_proposal) else - (* new_proposal.level > current_level *) + (* Last case: new_proposal_level > current_level *) (* Possible scenarios: - we received a block for a next level - we received our own block @@ -268,6 +317,9 @@ let rec handle_new_proposal state (new_proposal : proposal) = { current_level = new_level; latest_proposal = new_proposal; + is_latest_proposal_applied = is_proposal_applied; + delayed_prequorum = None; + injected_preendorsements = None; (* Unlock values *) locked_round = None; endorsable_payload = None; @@ -279,19 +331,22 @@ let rec handle_new_proposal state (new_proposal : proposal) = in (* recursive call with the up-to-date state to handle the new level proposals *) - handle_new_proposal {state with level_state; round_state} new_proposal + handle_proposal + ~is_proposal_applied + {state with level_state; round_state} + new_proposal in let action = Update_to_level {new_level_proposal = new_proposal; compute_new_state} in Lwt.return (state, action) -and may_switch_branch state new_proposal = +and may_switch_branch ~is_proposal_applied state new_proposal = let switch_branch state = Events.(emit switching_branch ()) >>= fun () -> (* If we are on a different branch, we also need to update our [round_state] accordingly. - The recursive call to [handle_new_proposal] cannot end up + The recursive call to [handle_proposal] cannot end up with an invalid proposal as it's on a different branch, thus there is no need to backtrack to the former state as the new proposal must end up being the new [latest_proposal]. That's @@ -299,10 +354,11 @@ and may_switch_branch state new_proposal = let round_update = { Baking_actions.new_round_proposal = new_proposal; - handle_proposal = (fun state -> handle_new_proposal state new_proposal); + handle_proposal = + (fun state -> handle_proposal ~is_proposal_applied state new_proposal); } in - update_proposal state new_proposal >>= fun new_state -> + update_proposal ~is_proposal_applied state new_proposal >>= fun new_state -> (* TODO if the branch proposal is outdated, we should trigger an [End_of_round] to participate *) Lwt.return (new_state, Synchronize_round round_update) @@ -338,6 +394,25 @@ and may_switch_branch state new_proposal = Events.(emit branch_proposal_has_same_prequorum ()) >>= fun () -> do_nothing state +let may_register_early_prequorum state ((candidate, _) as received_prequorum) = + if + Block_hash.( + candidate.Operation_worker.hash + <> state.level_state.latest_proposal.block.hash) + then + Events.( + emit + unexpected_pqc_while_waiting_for_application + (candidate.hash, state.level_state.latest_proposal.block.hash)) + >>= fun () -> do_nothing state + else + Events.(emit pqc_while_waiting_for_application candidate.hash) >>= fun () -> + let new_level_state = + {state.level_state with delayed_prequorum = Some received_prequorum} + in + let new_state = {state with level_state = new_level_state} in + do_nothing new_state + (** In the association map [delegate_slots], the function returns an optional pair ([delegate], [endorsing_slot]) if for the current [round], the validator [delegate] has a endorsing slot. *) @@ -537,8 +612,8 @@ let end_of_round state current_round = let new_state = update_current_phase new_state Idle in do_nothing new_state | Some (delegate, _) -> - let last_proposal = state.level_state.latest_proposal.block in - if Protocol_hash.(last_proposal.protocol <> Protocol.hash) then + let latest_proposal = state.level_state.latest_proposal in + if Baking_state.is_first_block_in_protocol latest_proposal then (* Do not inject a block for the previous protocol! (Let the baker of the previous protocol do it.) *) do_nothing new_state @@ -607,6 +682,9 @@ let prequorum_reached_when_awaiting_preendorsements state candidate unexpected_prequorum_received (candidate.hash, latest_proposal.block.hash)) >>= fun () -> do_nothing state + else if not state.level_state.is_latest_proposal_applied then + Events.(emit handling_prequorum_on_non_applied_proposal ()) >>= fun () -> + do_nothing state else let prequorum = { @@ -673,6 +751,38 @@ let quorum_reached_when_waiting_endorsements state candidate endorsement_qc = in do_nothing new_state +let handle_expected_applied_proposal (state : Baking_state.t) = + let new_level_state = + {state.level_state with is_latest_proposal_applied = true} + in + let new_state = {state with level_state = new_level_state} in + match new_state.level_state.delayed_prequorum with + | None -> ( + (* The application arrived before the prequorum: wait for the prequorum. *) + let new_state = update_current_phase new_state Awaiting_preendorsements in + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4877 This + mechanism is only temporary and should be removed when the + protocol and prevalidator correctly accept early + preendorsements. *) + match new_state.level_state.injected_preendorsements with + | None -> do_nothing new_state + | Some preendorsements -> + let reinject_preendorsement_action = + Reinject_preendorsements {preendorsements} + in + let new_level_state = + {new_state.level_state with injected_preendorsements = None} + in + let new_state = {new_state with level_state = new_level_state} in + Lwt.return (new_state, reinject_preendorsement_action)) + | Some (candidate, preendorsement_qc) -> + (* The application arrived after the prequorum: handle the + prequorum received earlier. *) + prequorum_reached_when_awaiting_preendorsements + new_state + candidate + preendorsement_qc + (* Hypothesis: - The state is not to be modified outside this module (NB: there are exceptions in Baking_actions: the corner cases @@ -700,37 +810,72 @@ let step (state : Baking_state.t) (event : Baking_state.event) : (* If it is time to bake the next level, stop everything currently going on and propose the next level block *) time_to_bake state at_round - | Idle, New_proposal block_info -> + | Idle, New_head_proposal proposal -> Events.( emit new_head - ( block_info.block.hash, - block_info.block.shell.level, - block_info.block.round )) - >>= fun () -> handle_new_proposal state block_info - | Awaiting_endorsements, New_proposal block_info - | Awaiting_preendorsements, New_proposal block_info -> + (proposal.block.hash, proposal.block.shell.level, proposal.block.round)) + >>= fun () -> handle_proposal ~is_proposal_applied:true state proposal + | Awaiting_application, New_head_proposal proposal -> + if + Block_hash.( + state.level_state.latest_proposal.block.hash <> proposal.block.hash) + then + Events.( + emit + new_head + ( proposal.block.hash, + proposal.block.shell.level, + proposal.block.round )) + >>= fun () -> + Events.(emit unexpected_new_head_while_waiting_for_application ()) + >>= fun () -> handle_proposal ~is_proposal_applied:true state proposal + else + Events.(emit applied_expected_proposal_received proposal.block.hash) + >>= fun () -> handle_expected_applied_proposal state + | Awaiting_endorsements, New_head_proposal proposal + | Awaiting_preendorsements, New_head_proposal proposal -> Events.( emit new_head - ( block_info.block.hash, - block_info.block.shell.level, - block_info.block.round )) + (proposal.block.hash, proposal.block.shell.level, proposal.block.round)) >>= fun () -> Events.(emit new_head_while_waiting_for_qc ()) >>= fun () -> - handle_new_proposal state block_info - | ( Awaiting_preendorsements, - Prequorum_reached (candidate, _voting_power, preendorsement_qc) ) -> + handle_proposal ~is_proposal_applied:true state proposal + | Idle, New_valid_proposal proposal -> + Events.( + emit + new_valid_proposal + (proposal.block.hash, proposal.block.shell.level, proposal.block.round)) + >>= fun () -> handle_proposal ~is_proposal_applied:false state proposal + | Awaiting_application, New_valid_proposal proposal + | Awaiting_endorsements, New_valid_proposal proposal + | Awaiting_preendorsements, New_valid_proposal proposal -> + Events.( + emit + new_valid_proposal + (proposal.block.hash, proposal.block.shell.level, proposal.block.round)) + >>= fun () -> + if has_already_been_handled state proposal then + Events.(emit valid_proposal_received_after_application ()) >>= fun () -> + do_nothing state + else + Events.(emit new_valid_proposal_while_waiting_for_qc ()) >>= fun () -> + handle_proposal ~is_proposal_applied:false state proposal + | Awaiting_application, Prequorum_reached (candidate, preendorsement_qc) -> + may_register_early_prequorum state (candidate, preendorsement_qc) + | Awaiting_preendorsements, Prequorum_reached (candidate, preendorsement_qc) + -> prequorum_reached_when_awaiting_preendorsements state candidate preendorsement_qc - | ( Awaiting_endorsements, - Quorum_reached (candidate, _voting_power, endorsement_qc) ) -> + | Awaiting_endorsements, Quorum_reached (candidate, endorsement_qc) -> quorum_reached_when_waiting_endorsements state candidate endorsement_qc (* Unreachable cases *) | Idle, (Prequorum_reached _ | Quorum_reached _) | Awaiting_preendorsements, Quorum_reached _ - | Awaiting_endorsements, Prequorum_reached _ -> + | Awaiting_endorsements, Prequorum_reached _ + | Awaiting_application, Quorum_reached _ -> (* This cannot/should not happen *) do_nothing state diff --git a/src/proto_016_PtMumbai/lib_delegate/state_transitions.mli b/src/proto_016_PtMumbai/lib_delegate/state_transitions.mli index adc584312329..2fa02e3b2d8a 100644 --- a/src/proto_016_PtMumbai/lib_delegate/state_transitions.mli +++ b/src/proto_016_PtMumbai/lib_delegate/state_transitions.mli @@ -41,16 +41,16 @@ val is_acceptable_proposal_for_current_level : val make_consensus_list : state -> proposal -> (consensus_key_and_delegate * consensus_content) list -val make_preendorse_action : state -> proposal -> action - -val may_update_proposal : state -> proposal -> state Lwt.t +val may_update_proposal : + is_proposal_applied:bool -> state -> proposal -> state Lwt.t val preendorse : state -> proposal -> (state * action) Lwt.t val extract_pqc : state -> proposal -> (Kind.preendorsement operation list * Round.t) option -val handle_new_proposal : state -> proposal -> (state * action) Lwt.t +val handle_proposal : + is_proposal_applied:bool -> state -> proposal -> (state * action) Lwt.t val round_proposer : state -> diff --git a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/faked_services.ml b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/faked_services.ml index 624565b40889..e99d009738bf 100644 --- a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/faked_services.ml +++ b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/faked_services.ml @@ -8,7 +8,14 @@ module type Mocked_services_hooks = sig type mempool = Mockup.M.Block_services.Mempool.t (** The baker and endorser rely on this stream to be notified of new - blocks. *) + valid blocks. *) + val monitor_validated_blocks : + unit -> + (Chain_id.t * Block_hash.t * Block_header.t * Operation.t list list) + Tezos_rpc.Answer.stream + + (** The baker and endorser rely on this stream to be notified of new + heads. *) val monitor_heads : unit -> (Block_hash.t * Block_header.t) Tezos_rpc.Answer.stream @@ -16,6 +23,10 @@ module type Mocked_services_hooks = sig val protocols : Block_services.block -> Block_services.protocols tzresult Lwt.t + (** [raw_header] returns the byte encoded block header of the block + associated to the given block specification. *) + val raw_header : Block_services.block -> bytes tzresult Lwt.t + (** [header] returns the block header of the block associated to the given block specification. *) val header : @@ -112,6 +123,13 @@ end type hooks = (module Mocked_services_hooks) module Make (Hooks : Mocked_services_hooks) = struct + let monitor_validated_blocks = + Directory.gen_register0 + Directory.empty + Monitor_services.S.validated_blocks + (fun _next_protocol _ -> + Tezos_rpc.Answer.return_stream (Hooks.monitor_validated_blocks ())) + let monitor_heads = Directory.gen_register1 Directory.empty @@ -137,6 +155,14 @@ module Make (Hooks : Mocked_services_hooks) = struct Directory.register Directory.empty service (fun (_, block) () () -> Hooks.protocols block) + let raw_header = + Directory.prefix + (Tezos_rpc.Path.prefix Chain_services.path Block_services.path) + @@ Directory.register + Directory.empty + Mockup.M.Block_services.S.raw_header + (fun (((), _chain), block) _ _ -> Hooks.raw_header block) + let header = Directory.prefix (Tezos_rpc.Path.prefix Chain_services.path Block_services.path) @@ -281,15 +307,30 @@ module Make (Hooks : Mocked_services_hooks) = struct (fun (_, block) () () -> Hooks.raw_protocol_data block) let shell_directory chain_id = - let merge = Directory.merge in - Directory.empty |> merge monitor_heads |> merge protocols |> merge header - |> merge operations |> merge hash |> merge shell_header - |> merge resulting_context_hash - |> merge (chain chain_id) - |> merge inject_block |> merge inject_operation |> merge monitor_operations - |> merge list_blocks |> merge live_blocks |> merge raw_protocol_data - |> merge broadcast_block |> merge broadcast_operation - |> merge monitor_bootstrapped + List.fold_left + Directory.merge + Directory.empty + [ + monitor_validated_blocks; + monitor_heads; + protocols; + raw_header; + header; + operations; + hash; + shell_header; + resulting_context_hash; + chain chain_id; + inject_block; + inject_operation; + monitor_operations; + list_blocks; + live_blocks; + raw_protocol_data; + broadcast_block; + broadcast_operation; + monitor_bootstrapped; + ] let directory chain_id = let proto_directory = diff --git a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.ml b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.ml index 346cea3ee712..82ae1d67c295 100644 --- a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.ml +++ b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.ml @@ -60,8 +60,13 @@ type state = { that functionality. *) ctxt_table : Tezos_protocol_environment.rpc_context Context_hash.Table.t; (** The context table allows us to look up rpc_context by its hash. *) + validated_blocks_pipe : + (Block_hash.t * Block_header.t * Operation.t list list) Lwt_pipe.Unbounded.t; + (** [validated_blocks_pipe] is used to implement the + [monitor_validated_blocks] RPC. *) heads_pipe : (Block_hash.t * Block_header.t) Lwt_pipe.Unbounded.t; - (** [heads_pipe] is used to implement the [monitor_heads] RPC. *) + (** [heads_pipe] is used to implement the [monitor_heads] + RPC. *) operations_pipe : (Operation_hash.t * Mockup.M.Protocol.operation) option Lwt_pipe.Unbounded.t; (** [operations_pipe] is used to implement the [operations_pipe] RPC. *) @@ -108,6 +113,12 @@ module type Hooks = sig tzresult Lwt.t + val on_new_validated_block : + block_hash:Block_hash.t -> + block_header:Block_header.t -> + operations:Operation.t list list -> + (Block_hash.t * Block_header.t * Operation.t list list) option Lwt.t + val on_new_head : block_hash:Block_hash.t -> block_header:Block_header.t -> @@ -238,11 +249,32 @@ let make_mocked_services_hooks (state : state) (user_hooks : (module Hooks)) : let module Impl : Faked_services.Mocked_services_hooks = struct type mempool = Mockup.M.Block_services.Mempool.t + let monitor_validated_blocks () = + let next () = + let rec pop_until_ok () = + Lwt_pipe.Unbounded.pop state.validated_blocks_pipe + >>= fun (block_hash, block_header, operations) -> + User_hooks.on_new_validated_block + ~block_hash + ~block_header + ~operations + >>= function + | None -> pop_until_ok () + | Some (hash, head, operations) -> + Lwt.return_some (chain_id, hash, head, operations) + in + pop_until_ok () + in + let shutdown () = () in + Tezos_rpc.Answer.{next; shutdown} + let monitor_heads () = let next () = let rec pop_until_ok () = Lwt_pipe.Unbounded.pop state.heads_pipe >>= fun (block_hash, block_header) -> + (* Sleep a 0.1s to simulate a block application delay *) + Lwt_unix.sleep 0.1 >>= fun () -> User_hooks.on_new_head ~block_hash ~block_header >>= function | None -> pop_until_ok () | Some head -> Lwt.return_some head @@ -295,14 +327,55 @@ let make_mocked_services_hooks (state : state) (user_hooks : (module Hooks)) : else Protocol.hash); } + let may_lie_on_proto_level block x = + (* As for ../protocols, the baker distinguishes activation + blocks from "normal" blocks by comparing the [proto_level] of + the shell header and its predecessor. If the predecessor's + one is different, it must mean that we are considering an + activation block and must not endorse. Here, we do a bit of + hacking in order to return a different proto_level for the + predecessor of the genesis block which is considered as the + current protocol activation block. To perfectly mimic what is + supposed to happen, the first mocked up block created should + be made in the genesis protocol, however, it is not what's + done in the mockup mode. *) + let is_predecessor_of_genesis = + match block with + | `Hash (requested_hash, rel) -> + Int.equal rel 0 + && Block_hash.equal requested_hash genesis_predecessor_block_hash + | _ -> false + in + if is_predecessor_of_genesis then + { + x.rpc_context.block_header with + proto_level = pred x.rpc_context.block_header.proto_level; + } + else x.rpc_context.block_header + + let raw_header (block : Tezos_shell_services.Block_services.block) : + bytes tzresult Lwt.t = + locate_block state block >>=? fun x -> + let shell = may_lie_on_proto_level block x in + let protocol_data = + Data_encoding.Binary.to_bytes_exn + Protocol.block_header_data_encoding + x.protocol_data + in + return + (Data_encoding.Binary.to_bytes_exn + Tezos_base.Block_header.encoding + {shell; protocol_data}) + let header (block : Tezos_shell_services.Block_services.block) : Mockup.M.Block_services.block_header tzresult Lwt.t = locate_block state block >>=? fun x -> + let shell = may_lie_on_proto_level block x in return { Mockup.M.Block_services.hash = x.rpc_context.block_hash; chain_id; - shell = x.rpc_context.block_header; + shell; protocol_data = x.protocol_data; } @@ -716,6 +789,9 @@ let rec listener ~(user_hooks : (module Hooks)) ~state ~broadcast_pipe = process_block state block_hash block_header operations >>=? fun () -> User_hooks.check_chain_after_processing ~level ~round ~chain:state.chain >>=? fun () -> + Lwt_pipe.Unbounded.push + state.validated_blocks_pipe + (block_hash, block_header, operations) ; Lwt_pipe.Unbounded.push state.heads_pipe (block_hash, block_header) ; listener ~user_hooks ~state ~broadcast_pipe @@ -735,6 +811,7 @@ let create_fake_node_state ~i ~live_depth } in let chain0 = [genesis0] in + let validated_blocks_pipe = Lwt_pipe.Unbounded.create () in let heads_pipe = Lwt_pipe.Unbounded.create () in let operations_pipe = Lwt_pipe.Unbounded.create () in let genesis_block_true_hash = @@ -744,6 +821,8 @@ let create_fake_node_state ~i ~live_depth protocol_data = block_header0.protocol_data; } in + (* Only push genesis block as a new head, not a valid block: it is + the shell's semantics to not advertise "transition" blocks. *) Lwt_pipe.Unbounded.push heads_pipe (rpc_context0.block_hash, block_header0) ; return { @@ -768,6 +847,7 @@ let create_fake_node_state ~i ~live_depth .Block_header.context, rpc_context0 ); ]); + validated_blocks_pipe; heads_pipe; operations_pipe; streaming_operations = false; @@ -1026,7 +1106,6 @@ let make_genesis_context ~delegate_selection ~initial_seed ~round0 ~round1 in return (block_header, rpc_context) in - let level0_round0_duration = Protocol.Alpha_context.Round.round_duration round_durations @@ -1052,6 +1131,9 @@ module Default_hooks : Hooks = struct let on_inject_operation ~op_hash ~op = return (op_hash, op, default_propagation_vector) + let on_new_validated_block ~block_hash ~block_header ~operations = + Lwt.return (Some (block_hash, block_header, operations)) + let on_new_head ~block_hash ~block_header = Lwt.return (Some (block_hash, block_header)) diff --git a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.mli b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.mli index c01782653bdf..d9da9a19074f 100644 --- a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.mli +++ b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.mli @@ -74,6 +74,15 @@ module type Hooks = sig tzresult Lwt.t + (** This is called when a new validated block is going to be sent as + the response to a "monitor validated blocks" RPC call. Returning + [None] here terminates the process for the baker. *) + val on_new_validated_block : + block_hash:Block_hash.t -> + block_header:Block_header.t -> + operations:Operation.t list list -> + (Block_hash.t * Block_header.t * Operation.t list list) option Lwt.t + (** This is called when a new head is going to be sent as the response to a "monitor heads" RPC call. Returning [None] here terminates the process for the baker. *) diff --git a/src/proto_016_PtMumbai/lib_delegate/test/test_scenario.ml b/src/proto_016_PtMumbai/lib_delegate/test/test_scenario.ml index cb34cc8d3d63..693b84bdda01 100644 --- a/src/proto_016_PtMumbai/lib_delegate/test/test_scenario.ml +++ b/src/proto_016_PtMumbai/lib_delegate/test/test_scenario.ml @@ -24,7 +24,7 @@ let test_level_5 () = include Default_hooks let stop_on_event = function - | Baking_state.New_proposal {block; _} -> + | Baking_state.New_head_proposal {block; _} -> (* Stop the node as soon as we receive a proposal with a level higher than [level_to_reach]. *) block.shell.level > level_to_reach @@ -54,6 +54,65 @@ let test_level_5 () = in run ~config [(3, (module Hooks)); (2, (module Hooks))] +let test_preendorse_on_valid () = + let level_to_reach = 2l in + let round_to_reach = 1l in + let module Hooks : Hooks = struct + include Default_hooks + + let on_new_head ~block_hash ~block_header = + (* Stop notifying heads on the level to reach, only notify that + it has been validated *) + if block_header.Block_header.shell.level < level_to_reach then + Lwt.return_some (block_hash, block_header) + else Lwt.return_none + + let seen_candidate = ref None + + let pqc_noticed = ref false + + let stop_on_event = function + | Baking_state.Prequorum_reached (candidate, _) -> + (* Register the PQC notice. *) + (match !seen_candidate with + | Some seen_candidate + when Block_hash.(candidate.hash = seen_candidate) -> + pqc_noticed := true + | _ -> ()) ; + false + | Baking_state.Quorum_reached (candidate, _) -> + (* Ensure that we never see a QC on the seen candidate. *) + (match !seen_candidate with + | Some seen_candidate + when Block_hash.(candidate.hash = seen_candidate) -> + Stdlib.failwith "Quorum occured on the seen candidate" + | _ -> ()) ; + false + | New_head_proposal {block; _} -> + (* Ensure that we never notice a new head at the level where + we are not supposed to. *) + if block.shell.level = level_to_reach then + Stdlib.failwith "Unexpected new head event" + else false + | New_valid_proposal {block; _} -> + (* Register the seen valid proposal candidate. *) + if + block.shell.level = level_to_reach + && Protocol.Alpha_context.Round.to_int32 block.round = 0l + then seen_candidate := Some block.hash ; + (* Stop the node when we reach level 2 / round 2. *) + block.shell.level = level_to_reach + && Protocol.Alpha_context.Round.to_int32 block.round >= round_to_reach + | _ -> false + + let check_chain_on_success ~chain:_ = + assert (!seen_candidate <> None) ; + assert !pqc_noticed ; + return_unit + end in + let config = {default_config with timeout = 10} in + run ~config [(1, (module Hooks))] + (* Scenario T1 @@ -649,7 +708,7 @@ let test_scenario_m1 () = return (op_hash, op, propagation_vector) let stop_on_event = function - | Baking_state.New_proposal {block; _} -> block.shell.level > 4l + | Baking_state.New_head_proposal {block; _} -> block.shell.level > 4l | _ -> false end in let config = {default_config with timeout = 60} in @@ -679,7 +738,7 @@ let test_scenario_m2 () = include Default_hooks let stop_on_event = function - | Baking_state.New_proposal {block; _} -> block.shell.level > 5l + | Baking_state.New_head_proposal {block; _} -> block.shell.level > 5l | _ -> false end in let module Missing_node : Hooks = struct @@ -734,7 +793,7 @@ Scenario M3 from other nodes only go to A. 3. The chain should not make progress. Since we have both bootstrap1 and bootstrap2 in delegate selection they have equal voting power. Therefore - it is necessary to have 2 votes for pre-quorums (which is achieved when A + it is necessary to have 2 votes for prequorums (which is achieved when A is proposing) and 2 votes for quorums (impossible because B has no way to obtain PQC and thus cannot send endorsements). @@ -742,7 +801,7 @@ Scenario M3 let test_scenario_m3 () = let stop_on_event0 = function - | Baking_state.New_proposal {block; _} -> + | Baking_state.New_head_proposal {block; _} -> block.shell.level = 1l && Protocol.Alpha_context.Round.to_int32 block.round = 6l | _ -> false @@ -917,7 +976,7 @@ Scenario M5 let test_scenario_m5 () = let stop_on_event0 = function - | Baking_state.New_proposal {block; _} -> block.shell.level >= 2l + | Baking_state.New_head_proposal {block; _} -> block.shell.level >= 2l | _ -> false in let module Node_a_hooks : Hooks = struct @@ -1004,7 +1063,7 @@ Scenario M6 let test_scenario_m6 () = let b_proposal_2_1 = ref None in let stop_on_event0 = function - | Baking_state.New_proposal {block; _} -> block.shell.level > 4l + | Baking_state.New_head_proposal {block; _} -> block.shell.level > 4l | _ -> false in let module Node_a_hooks : Hooks = struct @@ -1134,7 +1193,7 @@ let test_scenario_m7 () = let c_received_2_1 = ref false in let d_received_2_1 = ref false in let stop_on_event0 = function - | Baking_state.New_proposal {block; _} -> block.shell.level > 4l + | Baking_state.New_head_proposal {block; _} -> block.shell.level > 4l | _ -> false in let check_chain_on_success0 node_label ~chain = @@ -1338,7 +1397,7 @@ Scenario M8 let test_scenario_m8 () = let b_proposal_2_0 = ref None in let stop_on_event0 = function - | Baking_state.New_proposal {block; _} -> block.shell.level > 4l + | Baking_state.New_head_proposal {block; _} -> block.shell.level > 4l | _ -> false in let on_inject_operation0 ~op_hash ~op = @@ -1470,6 +1529,7 @@ let tests = let open Tezos_base_test_helpers.Tztest in [ tztest "reaches level 5" `Quick test_level_5; + tztest "cannot progress without new head" `Quick test_preendorse_on_valid; tztest "scenario t1" `Quick test_scenario_t1; tztest "scenario t2" `Quick test_scenario_t2; tztest "scenario t3" `Quick test_scenario_t3; -- GitLab From 191f663c689688301e179f32caa8d0c71c1a7c9c Mon Sep 17 00:00:00 2001 From: vbot Date: Thu, 26 Jan 2023 16:39:11 +0100 Subject: [PATCH 14/14] Changelog: add entry --- CHANGES.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 04b6cc133062..48b5d2ad0fb8 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -90,6 +90,10 @@ Baker greater than 0. Application-dependent checks are re-enabled for re-proposal and fresh blocks at round greater than 0. +- Reduced the preendorsement injection delay by making the baker + preendorse as soon as the node considers a block as valid instead of + waiting for the node to fully apply it. (MR :gl:`!7516`) + Accuser ------- -- GitLab