diff --git a/CHANGES.rst b/CHANGES.rst index 845cfe534d9fc8bce58088a2113538444a4c9c24..266e524c3a84e2ba06874d7167abd741ec82c573 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -44,6 +44,15 @@ Node - Fixed a bug raising an error when a context split was called on a context that was created with Octez v13 (or earlier). +- Deprecated the RPC ``GET /monitor/valid_blocks`` and introduced + ``GET /monitor/validated_blocks`` and ``GET /monitor/applied_blocks`` + which respectively returns validated blocks, which are not yet applied + nor stored, and applied blocks which are fully applied and stored by + the node. (MR :gl: `!7513`) + +- Replaced some "precheck" occurrences with "validate" in event and + error identifiers and messages. (MR :gl: `!7513`) + Client ------ diff --git a/src/lib_shell/block_validator_process.ml b/src/lib_shell/block_validator_process.ml index 82aab9f4861bf40fe056d6de374f73c3d1192b60..c96f1f9e738810da70a5fe77ea4958f418380290 100644 --- a/src/lib_shell/block_validator_process.ml +++ b/src/lib_shell/block_validator_process.ml @@ -1014,12 +1014,12 @@ let apply_block ?(simulate = false) ?(should_precheck = true) let block_hash = Block_header.hash header in let* () = when_ (not should_precheck) (fun () -> - let*! is_prechecked = - Store.Block.is_known_prechecked chain_store block_hash + let*! is_validated = + Store.Block.is_known_validated chain_store block_hash in fail_unless - is_prechecked - (Block_validator_errors.Applying_non_prechecked_block block_hash)) + is_validated + (Block_validator_errors.Applying_non_validated_block block_hash)) in let* metadata = Store.Block.get_block_metadata chain_store predecessor in let max_operations_ttl = Store.Block.max_operations_ttl metadata in diff --git a/src/lib_shell/distributed_db.ml b/src/lib_shell/distributed_db.ml index 7ca276f20c0a267e564b0054585bcfbf270bf611..29d926470e0b8f2ff039c337316c58117baed8fa 100644 --- a/src/lib_shell/distributed_db.ml +++ b/src/lib_shell/distributed_db.ml @@ -291,7 +291,7 @@ let inject_operation chain_db h op = op let inject_prechecked_block chain_db hash block_header operations = - Store.Block.store_prechecked_block + Store.Block.store_validated_block chain_db.reader_chain_db.chain_store ~hash ~block_header diff --git a/src/lib_shell/distributed_db_requester.ml b/src/lib_shell/distributed_db_requester.ml index b62ad7ba3d2e0398a7c14a656d2357cc707a21bc..db45b4590f9699aea18d1a85a563dbbe5720eaed 100644 --- a/src/lib_shell/distributed_db_requester.ml +++ b/src/lib_shell/distributed_db_requester.ml @@ -207,7 +207,7 @@ module Block_header_storage = struct let* b = Store.Block.is_known_valid chain_store hash in match b with | true -> Lwt.return_true - | false -> Store.Block.is_known_prechecked chain_store hash + | false -> Store.Block.is_known_validated chain_store hash let read chain_store h = let open Lwt_result_syntax in @@ -215,7 +215,7 @@ module Block_header_storage = struct let*! r = Store.Block.read_block chain_store h in match r with | Ok b -> return b - | Error _ -> Store.Block.read_prechecked_block chain_store h + | Error _ -> Store.Block.read_validated_block chain_store h in return (Store.Block.header b) @@ -225,7 +225,7 @@ module Block_header_storage = struct let* o = Store.Block.read_block_opt chain_store h in match o with | Some b -> Lwt.return_some b - | None -> Store.Block.read_prechecked_block_opt chain_store h + | None -> Store.Block.read_validated_block_opt chain_store h in Lwt.return (Option.map Store.Block.header b) end diff --git a/src/lib_shell/monitor_directory.ml b/src/lib_shell/monitor_directory.ml index 1e410538b420952d6ea491191ad874ed0edbad7c..30a61126c17bbaedb2c29b687463443de614d65f 100644 --- a/src/lib_shell/monitor_directory.ml +++ b/src/lib_shell/monitor_directory.ml @@ -61,7 +61,7 @@ let build_rpc_directory validator mainchain_validator = in let shutdown () = Lwt_watcher.shutdown stopper in Tezos_rpc.Answer.return_stream {next; shutdown}) ; - gen_register0 Monitor_services.S.valid_blocks (fun q () -> + gen_register0 Monitor_services.S.legacy_valid_blocks (fun q () -> let block_stream, stopper = Store.global_block_watcher store in let shutdown () = Lwt_watcher.shutdown stopper in let in_chains (chain_store, _block) = @@ -115,6 +115,130 @@ let build_rpc_directory validator mainchain_validator = in let next () = Lwt_stream.get stream in Tezos_rpc.Answer.return_stream {next; shutdown}) ; + gen_register0 Monitor_services.S.applied_blocks (fun q () -> + let block_stream, stopper = Store.global_block_watcher store in + let shutdown () = Lwt_watcher.shutdown stopper in + let in_chains (chain_store, _block) = + match q#chains with + | [] -> Lwt.return_true + | chains -> + let that_chain_id = Store.Chain.chain_id chain_store in + List.exists_p + (fun chain -> + let+ o = Chain_directory.get_chain_id_opt store chain in + match o with + | None -> false + | Some this_chain_id -> + Chain_id.equal this_chain_id that_chain_id) + chains + in + let in_protocols (chain_store, block) = + match q#protocols with + | [] -> Lwt.return_true + | protocols -> ( + let* o = Store.Block.read_predecessor_opt chain_store block in + match o with + | None -> Lwt.return_false (* won't happen *) + | Some pred -> + let* context = Store.Block.context_exn chain_store pred in + let* protocol = Context_ops.get_protocol context in + Lwt.return + (List.exists (Protocol_hash.equal protocol) protocols)) + in + let in_next_protocols (chain_store, block) = + match q#next_protocols with + | [] -> Lwt.return_true + | protocols -> + let* context = Store.Block.context_exn chain_store block in + let* next_protocol = Context_ops.get_protocol context in + Lwt.return + (List.exists (Protocol_hash.equal next_protocol) protocols) + in + let stream = + Lwt_stream.filter_map_s + (fun ((chain_store, block) as elt) -> + let* in_chains = in_chains elt in + let* in_next_protocols = in_next_protocols elt in + let* in_protocols = in_protocols elt in + if in_chains && in_protocols && in_next_protocols then + let chain_id = Store.Chain.chain_id chain_store in + Lwt.return_some + ( chain_id, + Store.Block.hash block, + Store.Block.header block, + Store.Block.operations block ) + else Lwt.return_none) + block_stream + in + let next () = Lwt_stream.get stream in + Tezos_rpc.Answer.return_stream {next; shutdown}) ; + gen_register0 Monitor_services.S.validated_blocks (fun q () -> + let* chains = + match q#chains with + | [] -> + let* all_chain_stores = Store.all_chain_stores store in + Lwt.return + (List.map (fun cs -> Store.Chain.chain_id cs) all_chain_stores) + | l -> + List.map_s (fun chain -> Chain_directory.get_chain_id store chain) l + in + let* block_streams, stoppers = + List.fold_left_s + (fun (block_streams, stoppers) chain_id -> + let* r = Store.get_chain_store store chain_id in + match r with + | Error _ -> Lwt.fail Not_found + | Ok chain_store -> + let block_stream, stopper = + Store.Chain.validated_watcher chain_store + in + let bs = + Lwt_stream.map + (fun b -> (chain_id, chain_store, b)) + block_stream + in + return (bs :: block_streams, stopper :: stoppers)) + ([], []) + chains + in + let block_stream = Lwt_stream.choose block_streams in + let shutdown () = List.iter Lwt_watcher.shutdown stoppers in + let in_next_protocols chain_store block = + match q#next_protocols with + | [] -> Lwt.return_true + | protocols -> ( + let* block_protocol_opt = + Store.Chain.find_protocol + chain_store + ~protocol_level:(Store.Block.proto_level block) + in + match block_protocol_opt with + | None -> + (* If we do not know the protocol hash associated to the + proto level, it means either that it is a transition + block for which we do not know the protocol yet, or + that something really bad occurred. In both cases, we + do not advertise it. *) + Lwt.return_false + | Some next_protocol -> + Lwt.return + (List.exists (Protocol_hash.equal next_protocol) protocols)) + in + let stream = + Lwt_stream.filter_map_s + (fun (chain_id, chain_store, block) -> + let* in_next_protocols = in_next_protocols chain_store block in + if in_next_protocols then + Lwt.return_some + ( chain_id, + Store.Block.hash block, + Store.Block.header block, + Store.Block.operations block ) + else Lwt.return_none) + block_stream + in + let next () = Lwt_stream.get stream in + Tezos_rpc.Answer.return_stream {next; shutdown}) ; gen_register1 Monitor_services.S.heads (fun chain q () -> (* TODO: when `chain = `Test`, should we reset then stream when the `testnet` change, or dias we currently do ?? *) diff --git a/src/lib_shell/p2p_reader.ml b/src/lib_shell/p2p_reader.ml index 945db5b63dcb518845d7935def7e8589f3d95f80..b30090736b7a6ae42923130ac0d9bd8c0b7f9e4a 100644 --- a/src/lib_shell/p2p_reader.ml +++ b/src/lib_shell/p2p_reader.ml @@ -120,7 +120,7 @@ let read_block {disk; _} h = let* o = Store.Block.read_block_opt chain_store h in let* o = match o with - | None -> Store.Block.read_prechecked_block_opt chain_store h + | None -> Store.Block.read_validated_block_opt chain_store h | Some b -> Lwt.return_some b in Option.map_s (fun b -> Lwt.return (Store.Chain.chain_id chain_store, b)) o) diff --git a/src/lib_shell_services/block_validator_errors.ml b/src/lib_shell_services/block_validator_errors.ml index 4a0731054cefdee713318c2f45b3dbc8062e4aad..136a94aa5533f8b2ad36ddc1491813f8de40c53e 100644 --- a/src/lib_shell_services/block_validator_errors.ml +++ b/src/lib_shell_services/block_validator_errors.ml @@ -405,7 +405,7 @@ type error += expected : Operation_list_list_hash.t; found : Operation_list_list_hash.t; } - | Applying_non_prechecked_block of Block_hash.t + | Applying_non_validated_block of Block_hash.t | Failed_to_checkout_context of Context_hash.t | System_error of {errno : string; fn : string; msg : string} | Missing_test_protocol of Protocol_hash.t @@ -483,18 +483,18 @@ let () = Inconsistent_operations_hash {block; expected; found}) ; Error_monad.register_error_kind `Permanent - ~id:"Block_validator_process.applying_non_prechecked_block" - ~title:"Applying non prechecked block" - ~description:"Applying non prechecked block" + ~id:"Block_validator_process.applying_non_validated_block" + ~title:"Applying non validated block" + ~description:"Applying non validated block" ~pp:(fun ppf (hash : Block_hash.t) -> Format.fprintf ppf - "Applying non prechecked block %a" + "Applying non validated block %a" Block_hash.pp_short hash) Data_encoding.(obj1 (req "hash" Block_hash.encoding)) - (function Applying_non_prechecked_block bh -> Some bh | _ -> None) - (fun bh -> Applying_non_prechecked_block bh) ; + (function Applying_non_validated_block bh -> Some bh | _ -> None) + (fun bh -> Applying_non_validated_block bh) ; Error_monad.register_error_kind `Permanent ~id:"Block_validator_process.failed_to_checkout_context" diff --git a/src/lib_shell_services/block_validator_errors.mli b/src/lib_shell_services/block_validator_errors.mli index 1a5c333cc9010a62837dea789da3e0ebec212c46..104f9d80ce17ac2eafe85a1f998e769045d85c1a 100644 --- a/src/lib_shell_services/block_validator_errors.mli +++ b/src/lib_shell_services/block_validator_errors.mli @@ -70,7 +70,7 @@ type error += expected : Operation_list_list_hash.t; found : Operation_list_list_hash.t; } - | Applying_non_prechecked_block of Block_hash.t + | Applying_non_validated_block of Block_hash.t | Failed_to_checkout_context of Context_hash.t | System_error of {errno : string; fn : string; msg : string} | Missing_test_protocol of Protocol_hash.t diff --git a/src/lib_shell_services/monitor_services.ml b/src/lib_shell_services/monitor_services.ml index f409c78c3b5275e3ecaf9fc9eac8277917db60b7..6a668e87d893a2e2ed9c19fb44cd36f821646d7b 100644 --- a/src/lib_shell_services/monitor_services.ml +++ b/src/lib_shell_services/monitor_services.ml @@ -84,7 +84,7 @@ module S = struct (req "timestamp" Time.Protocol.encoding)) Tezos_rpc.Path.(path / "bootstrapped") - let valid_blocks_query = + let validated_or_apply_blocks_query = let open Tezos_rpc.Query in query (fun protocols next_protocols chains -> object @@ -100,12 +100,12 @@ module S = struct |+ multi_field "chain" Chain_services.chain_arg (fun t -> t#chains) |> seal - let valid_blocks = + let legacy_valid_blocks = Tezos_rpc.Service.get_service ~description: - "Monitor all blocks that are successfully validated by the node, \ - disregarding whether they were selected as the new head or not." - ~query:valid_blocks_query + "(Deprecated) Monitor all blocks that are successfully applied by the \ + node, disregarding whether they were selected as the new head or not." + ~query:validated_or_apply_blocks_query ~output: (merge_objs (obj2 @@ -114,6 +114,35 @@ module S = struct Block_header.encoding) Tezos_rpc.Path.(path / "valid_blocks") + let validated_blocks = + Tezos_rpc.Service.get_service + ~description: + "Monitor all blocks that were successfully validated by the node but \ + are not applied nor stored yet, disregarding whether they are going \ + to be selected as the new head or not." + ~query:validated_or_apply_blocks_query + ~output: + (obj4 + (req "chain_id" Chain_id.encoding) + (req "hash" Block_hash.encoding) + (req "header" (dynamic_size Block_header.encoding)) + (req "operations" (list (list (dynamic_size Operation.encoding))))) + Tezos_rpc.Path.(path / "validated_blocks") + + let applied_blocks = + Tezos_rpc.Service.get_service + ~description: + "Monitor all blocks that are successfully applied and stored by the \ + node, disregarding whether they were selected as the new head or not." + ~query:validated_or_apply_blocks_query + ~output: + (obj4 + (req "chain_id" Chain_id.encoding) + (req "hash" Block_hash.encoding) + (req "header" (dynamic_size Block_header.encoding)) + (req "operations" (list (list (dynamic_size Operation.encoding))))) + Tezos_rpc.Path.(path / "applied_blocks") + let heads_query = let open Tezos_rpc.Query in query (fun next_protocols -> @@ -127,8 +156,8 @@ module S = struct let heads = Tezos_rpc.Service.get_service ~description: - "Monitor all blocks that are successfully validated by the node and \ - selected as the new head of the given chain." + "Monitor all blocks that are successfully validated and applied by the \ + node and selected as the new head of the given chain." ~query:heads_query ~output: (merge_objs @@ -167,10 +196,40 @@ open Tezos_rpc.Context let bootstrapped ctxt = make_streamed_call S.bootstrapped ctxt () () () -let valid_blocks ctxt ?(chains = [`Main]) ?(protocols = []) +let legacy_valid_blocks ctxt ?(chains = [`Main]) ?(protocols = []) + ?(next_protocols = []) () = + make_streamed_call + S.legacy_valid_blocks + ctxt + () + (object + method chains = chains + + method protocols = protocols + + method next_protocols = next_protocols + end) + () + +let validated_blocks ctxt ?(chains = [`Main]) ?(protocols = []) + ?(next_protocols = []) () = + make_streamed_call + S.validated_blocks + ctxt + () + (object + method chains = chains + + method protocols = protocols + + method next_protocols = next_protocols + end) + () + +let applied_blocks ctxt ?(chains = [`Main]) ?(protocols = []) ?(next_protocols = []) () = make_streamed_call - S.valid_blocks + S.applied_blocks ctxt () (object diff --git a/src/lib_shell_services/monitor_services.mli b/src/lib_shell_services/monitor_services.mli index 47aee81f5f52cd542385249e7a80d459e75cd789..a2635743d64ee58acfe48d46aa43316161049e0f 100644 --- a/src/lib_shell_services/monitor_services.mli +++ b/src/lib_shell_services/monitor_services.mli @@ -38,7 +38,12 @@ val bootstrapped : #streamed -> ((Block_hash.t * Time.Protocol.t) Lwt_stream.t * stopper) tzresult Lwt.t -val valid_blocks : +(** Call RPC GET /monitor/valid_blocks (Deprecated) + + - Default [chains] is [Main]. + - Default [protocols] is [[]]. + - Default [next_protocols] is [[]]. *) +val legacy_valid_blocks : #streamed -> ?chains:Chain_services.chain list -> ?protocols:Protocol_hash.t list -> @@ -48,6 +53,40 @@ val valid_blocks : tzresult Lwt.t +(** Call RPC GET /monitor/validated_blocks + + - Default [chains] is [Main]. + - Default [protocols] is [[]]. + - Default [next_protocols] is [[]]. *) +val validated_blocks : + #streamed -> + ?chains:Chain_services.chain list -> + ?protocols:Protocol_hash.t list -> + ?next_protocols:Protocol_hash.t list -> + unit -> + ((Chain_id.t * Block_hash.t * Block_header.t * Operation.t list list) + Lwt_stream.t + * stopper) + tzresult + Lwt.t + +(** Call RPC GET /monitor/applied_blocks + + - Default [chains] is [Main]. + - Default [protocols] is [[]]. + - Default [next_protocols] is [[]]. *) +val applied_blocks : + #streamed -> + ?chains:Chain_services.chain list -> + ?protocols:Protocol_hash.t list -> + ?next_protocols:Protocol_hash.t list -> + unit -> + ((Chain_id.t * Block_hash.t * Block_header.t * Operation.t list list) + Lwt_stream.t + * stopper) + tzresult + Lwt.t + val heads : #streamed -> ?next_protocols:Protocol_hash.t list -> @@ -72,7 +111,12 @@ module S : sig Block_hash.t * Time.Protocol.t ) Tezos_rpc.Service.t - val valid_blocks : + (** Define RPC GET /monitor/valid_blocks (Deprecated) + + - Default [chains] is [Main]. + - Default [protocols] is [[]]. + - Default [next_protocols] is [[]]. *) + val legacy_valid_blocks : ( [`GET], unit, unit, @@ -83,6 +127,38 @@ module S : sig (Chain_id.t * Block_hash.t) * Block_header.t ) Tezos_rpc.Service.t + (** Define RPC GET /monitor/validated_blocks + + - Default [chains] is [Main]. + - Default [protocols] is [[]]. + - Default [next_protocols] is [[]]. *) + val validated_blocks : + ( [`GET], + unit, + unit, + < chains : Chain_services.chain list + ; next_protocols : Protocol_hash.t list + ; protocols : Protocol_hash.t list >, + unit, + Chain_id.t * Block_hash.t * Block_header.t * Operation.t list list ) + Tezos_rpc.Service.t + + (** Define RPC GET /monitor/applied_blocks + + - Default [chains] is [Main]. + - Default [protocols] is [[]]. + - Default [next_protocols] is [[]]. *) + val applied_blocks : + ( [`GET], + unit, + unit, + < chains : Chain_services.chain list + ; next_protocols : Protocol_hash.t list + ; protocols : Protocol_hash.t list >, + unit, + Chain_id.t * Block_hash.t * Block_header.t * Operation.t list list ) + Tezos_rpc.Service.t + val heads : ( [`GET], unit, diff --git a/src/lib_store/mocked/store.ml b/src/lib_store/mocked/store.ml index 7b340f64543658a4ac5109c27bc0db191e6fa245..89ab66d92104923944020d67c9896419945d9ebd 100644 --- a/src/lib_store/mocked/store.ml +++ b/src/lib_store/mocked/store.ml @@ -74,6 +74,7 @@ and chain_store = { (* Genesis is only on-disk: read-only except at creation *) genesis_block_data : block Stored_data.t; block_watcher : block Lwt_watcher.input; + validated_block_watcher : block Lwt_watcher.input; block_rpc_directories : (chain_store * block) Tezos_rpc.Directory.t Protocol_hash.Map.t Protocol_hash.Table.t; @@ -99,7 +100,7 @@ and chain_state = { live_operations : Operation_hash.Set.t; mutable live_data_cache : (Block_hash.t * Operation_hash.Set.t) Ringo.Ring.t option; - prechecked_blocks : Block_repr.t Block_lru_cache.t; + validated_blocks : Block_repr.t Block_lru_cache.t; } and testchain = {forked_block : Block_hash.t; testchain_store : chain_store} @@ -215,10 +216,10 @@ module Block = struct Shared.use chain_state (fun chain_state -> locked_is_known_invalid chain_state hash) - let is_known_prechecked {chain_state; _} hash = - Shared.use chain_state (fun {prechecked_blocks; _} -> + let is_known_validated {chain_state; _} hash = + Shared.use chain_state (fun {validated_blocks; _} -> Option.value ~default:Lwt.return_false - @@ Block_lru_cache.bind prechecked_blocks hash (function + @@ Block_lru_cache.bind validated_blocks hash (function | None -> Lwt.return_false | Some _ -> Lwt.return_true)) @@ -350,14 +351,14 @@ module Block = struct let* current_head = current_head chain_store in locked_read_block_by_level_opt chain_store current_head level - let read_prechecked_block_opt {chain_state; _} hash = - Shared.use chain_state (fun {prechecked_blocks; _} -> + let read_validated_block_opt {chain_state; _} hash = + Shared.use chain_state (fun {validated_blocks; _} -> Option.value ~default:Lwt.return_none - @@ Block_lru_cache.bind prechecked_blocks hash Lwt.return) + @@ Block_lru_cache.bind validated_blocks hash Lwt.return) - let read_prechecked_block chain_store hash = + let read_validated_block chain_store hash = let open Lwt_result_syntax in - let*! o = read_prechecked_block_opt chain_store hash in + let*! o = read_validated_block_opt chain_store hash in match o with | Some b -> return b | None -> tzfail (Block_not_found {hash; distance = 0}) @@ -527,8 +528,8 @@ module Block = struct Store_events.(emit store_block) (hash, block_header.shell.level) in let*! () = - Shared.use chain_store.chain_state (fun {prechecked_blocks; _} -> - Block_lru_cache.remove prechecked_blocks hash ; + Shared.use chain_store.chain_state (fun {validated_blocks; _} -> + Block_lru_cache.remove validated_blocks hash ; Lwt.return_unit) in Lwt_watcher.notify chain_store.block_watcher block ; @@ -537,7 +538,7 @@ module Block = struct (chain_store, block) ; return_some block - let store_prechecked_block chain_store ~hash ~block_header ~operations = + let store_validated_block chain_store ~hash ~block_header ~operations = let open Lwt_result_syntax in let operations_length = List.length operations in let validation_passes = block_header.Block_header.shell.validation_passes in @@ -563,12 +564,12 @@ module Block = struct } in let*! () = - Shared.use chain_store.chain_state (fun {prechecked_blocks; _} -> - Block_lru_cache.put prechecked_blocks hash (Lwt.return_some block) ; + Shared.use chain_store.chain_state (fun {validated_blocks; _} -> + Block_lru_cache.put validated_blocks hash (Lwt.return_some block) ; Lwt.return_unit) in let*! () = - Store_events.(emit store_prechecked_block) (hash, block_header.shell.level) + Store_events.(emit store_validated_block) (hash, block_header.shell.level) in return_unit @@ -1422,7 +1423,7 @@ module Chain = struct let live_blocks = Block_hash.Set.singleton genesis_block.hash in let live_operations = Operation_hash.Set.empty in let live_data_cache = None in - let prechecked_blocks = Block_lru_cache.create 10 in + let validated_blocks = Block_lru_cache.create 10 in return { current_head_data; @@ -1438,7 +1439,7 @@ module Chain = struct live_blocks; live_operations; live_data_cache; - prechecked_blocks; + validated_blocks; } let create_chain_store ?block_cache_limit global_store chain_dir ?target @@ -1473,6 +1474,7 @@ module Chain = struct in let chain_state = Shared.create chain_state in let block_watcher = Lwt_watcher.create_input () in + let validated_block_watcher = Lwt_watcher.create_input () in let block_rpc_directories = Protocol_hash.Table.create 7 in let chain_store : chain_store = { @@ -1484,6 +1486,7 @@ module Chain = struct genesis_block_data; block_store; block_watcher; + validated_block_watcher; block_rpc_directories; } in @@ -1665,6 +1668,9 @@ module Chain = struct Shared.use chain_store.chain_state (fun {protocol_levels_data; _} -> Stored_data.get protocol_levels_data) + let validated_watcher chain_store = + Lwt_watcher.create_stream chain_store.validated_block_watcher + let watcher chain_store = Lwt_watcher.create_stream chain_store.block_watcher let get_rpc_directory chain_store block = diff --git a/src/lib_store/shared/store_events.ml b/src/lib_store/shared/store_events.ml index 1b50b75f887104513d978d053fca864e99160ab3..4584fa9c53e513bce782f33a279da07f699f97c0 100644 --- a/src/lib_store/shared/store_events.ml +++ b/src/lib_store/shared/store_events.ml @@ -83,12 +83,12 @@ let store_block = ~pp1:pp_block_descriptor ("block", block_descriptor_encoding) -let store_prechecked_block = +let store_validated_block = declare_1 ~section ~level:Info - ~name:"store_prechecked_block" - ~msg:"prechecked block {block} was stored" + ~name:"store_validated_block" + ~msg:"validated block {block} was stored" ~pp1:pp_block_descriptor ("block", block_descriptor_encoding) diff --git a/src/lib_store/store.mli b/src/lib_store/store.mli index 5dd0d26b469d8b7af89adf3c4b9ca0ea1788e2a0..430a310e6d853eaa7b08471a2840d785e1878806 100644 --- a/src/lib_store/store.mli +++ b/src/lib_store/store.mli @@ -310,10 +310,10 @@ module Block : sig invalid blocks file). *) val is_known_invalid : chain_store -> Block_hash.t -> bool Lwt.t - (** [is_known_prechecked chain_store bh] tests that the block [bh] - is prechecked in [chain_store] (i.e. the block is present in the - prechecked block cache). *) - val is_known_prechecked : chain_store -> Block_hash.t -> bool Lwt.t + (** [is_known_validated chain_store bh] tests that the block [bh] + is validated in [chain_store] (i.e. the block is present in the + validated block cache). *) + val is_known_validated : chain_store -> Block_hash.t -> bool Lwt.t (** [is_known chain_store bh] tests that the block [bh] is either known valid or known invalid in [chain_store]. *) @@ -411,14 +411,13 @@ module Block : sig val read_predecessor_of_hash_opt : chain_store -> Block_hash.t -> block option Lwt.t - (** [read_prechecked_block chain_store bh] tries to read in the - [chain_store]'s prechecked block cache the block [bh].*) - val read_prechecked_block : - chain_store -> Block_hash.t -> block tzresult Lwt.t + (** [read_validated_block chain_store bh] tries to read in the + [chain_store]'s validated block cache the block [bh].*) + val read_validated_block : chain_store -> Block_hash.t -> block tzresult Lwt.t - (** [read_prechecked_block_opt chain_store bh] optional version of - [read_prechecked_block].*) - val read_prechecked_block_opt : + (** [read_validated_block_opt chain_store bh] optional version of + [read_validated_block].*) + val read_validated_block_opt : chain_store -> Block_hash.t -> block option Lwt.t (** [store_block chain_store ~block_header ~operations @@ -429,7 +428,7 @@ module Block : sig the newly created block is returned. If the block was successfully stored, then the block is removed - from the prechecked block cache. + from the validated block cache. {b Warning} The store will refuse to store blocks with no associated context's commit. *) @@ -440,10 +439,10 @@ module Block : sig Block_validation.result -> block option tzresult Lwt.t - (** [store_prechecked_block chain_store ~hash ~block_header ~operations] - stores in [chain_store]'s prechecked block cache the block with + (** [store_validated_block chain_store ~hash ~block_header ~operations] + stores in [chain_store]'s validated block cache the block with its [block_header] and [operations]. *) - val store_prechecked_block : + val store_validated_block : chain_store -> hash:Block_hash.t -> block_header:Block_header.t -> @@ -916,6 +915,11 @@ module Chain : sig [chain_store]. *) val watcher : chain_store -> Block.t Lwt_stream.t * Lwt_watcher.stopper + (** [validated_watcher chain_store] instantiates a new validated block + watcher for [chain_store]. *) + val validated_watcher : + chain_store -> Block.t Lwt_stream.t * Lwt_watcher.stopper + (** [get_rpc_directory chain_store block] returns the RPC directory associated to the [block]. *) val get_rpc_directory : diff --git a/src/lib_store/unix/store.ml b/src/lib_store/unix/store.ml index f4d79e2d76883493264747eff6f3b83b4ab6d868..197e062868050e7612ccbf3567c6334010fa59f3 100644 --- a/src/lib_store/unix/store.ml +++ b/src/lib_store/unix/store.ml @@ -77,6 +77,7 @@ and chain_store = { (* Genesis is only on-disk: read-only except at creation *) genesis_block_data : block Stored_data.t; block_watcher : block Lwt_watcher.input; + validated_block_watcher : block Lwt_watcher.input; block_rpc_directories : (chain_store * block) Tezos_rpc.Directory.t Protocol_hash.Map.t Protocol_hash.Table.t; @@ -103,7 +104,7 @@ and chain_state = { live_operations : Operation_hash.Set.t; mutable live_data_cache : (Block_hash.t * Operation_hash.Set.t) Ringo.Ring.t option; - prechecked_blocks : Block_repr.t Block_lru_cache.t; + validated_blocks : Block_repr.t Block_lru_cache.t; } and testchain = {forked_block : Block_hash.t; testchain_store : chain_store} @@ -268,10 +269,10 @@ module Block = struct Shared.use chain_state (fun chain_state -> locked_is_known_invalid chain_state hash) - let is_known_prechecked {chain_state; _} hash = - Shared.use chain_state (fun {prechecked_blocks; _} -> + let is_known_validated {chain_state; _} hash = + Shared.use chain_state (fun {validated_blocks; _} -> Option.value ~default:Lwt.return_false - @@ Block_lru_cache.bind prechecked_blocks hash (function + @@ Block_lru_cache.bind validated_blocks hash (function | None -> Lwt.return_false | Some _ -> Lwt.return_true)) @@ -403,14 +404,14 @@ module Block = struct let* current_head = current_head chain_store in locked_read_block_by_level_opt chain_store current_head level - let read_prechecked_block_opt {chain_state; _} hash = - Shared.use chain_state (fun {prechecked_blocks; _} -> + let read_validated_block_opt {chain_state; _} hash = + Shared.use chain_state (fun {validated_blocks; _} -> Option.value ~default:Lwt.return_none - @@ Block_lru_cache.bind prechecked_blocks hash Lwt.return) + @@ Block_lru_cache.bind validated_blocks hash Lwt.return) - let read_prechecked_block chain_store hash = + let read_validated_block chain_store hash = let open Lwt_result_syntax in - let*! o = read_prechecked_block_opt chain_store hash in + let*! o = read_validated_block_opt chain_store hash in match o with | Some b -> return b | None -> tzfail (Block_not_found {hash; distance = 0}) @@ -580,8 +581,8 @@ module Block = struct Store_events.(emit store_block) (hash, block_header.shell.level) in let*! () = - Shared.use chain_store.chain_state (fun {prechecked_blocks; _} -> - Block_lru_cache.remove prechecked_blocks hash ; + Shared.use chain_store.chain_state (fun {validated_blocks; _} -> + Block_lru_cache.remove validated_blocks hash ; Lwt.return_unit) in Lwt_watcher.notify chain_store.block_watcher block ; @@ -590,7 +591,7 @@ module Block = struct (chain_store, block) ; return_some block - let store_prechecked_block chain_store ~hash ~block_header ~operations = + let store_validated_block chain_store ~hash ~block_header ~operations = let open Lwt_result_syntax in let operations_length = List.length operations in let validation_passes = block_header.Block_header.shell.validation_passes in @@ -616,13 +617,14 @@ module Block = struct } in let*! () = - Shared.use chain_store.chain_state (fun {prechecked_blocks; _} -> - Block_lru_cache.put prechecked_blocks hash (Lwt.return_some block) ; + Shared.use chain_store.chain_state (fun {validated_blocks; _} -> + Block_lru_cache.put validated_blocks hash (Lwt.return_some block) ; Lwt.return_unit) in let*! () = - Store_events.(emit store_prechecked_block) (hash, block_header.shell.level) + Store_events.(emit store_validated_block) (hash, block_header.shell.level) in + Lwt_watcher.notify chain_store.validated_block_watcher block ; return_unit let resulting_context_hash chain_store block = @@ -1768,7 +1770,7 @@ module Chain = struct let live_blocks = Block_hash.Set.singleton genesis_block.hash in let live_operations = Operation_hash.Set.empty in let live_data_cache = None in - let prechecked_blocks = Block_lru_cache.create 10 in + let validated_blocks = Block_lru_cache.create 10 in return { current_head_data; @@ -1784,7 +1786,7 @@ module Chain = struct live_blocks; live_operations; live_data_cache; - prechecked_blocks; + validated_blocks; } (* In some case, when a merge was interrupted, the highest cemented @@ -1864,7 +1866,7 @@ module Chain = struct let live_blocks = Block_hash.Set.empty in let live_operations = Operation_hash.Set.empty in let live_data_cache = None in - let prechecked_blocks = Block_lru_cache.create 10 in + let validated_blocks = Block_lru_cache.create 10 in return { current_head_data; @@ -1880,7 +1882,7 @@ module Chain = struct live_blocks; live_operations; live_data_cache; - prechecked_blocks; + validated_blocks; } let create_chain_store ?block_cache_limit global_store chain_dir ?target @@ -1915,6 +1917,7 @@ module Chain = struct in let chain_state = Shared.create chain_state in let block_watcher = Lwt_watcher.create_input () in + let validated_block_watcher = Lwt_watcher.create_input () in let block_rpc_directories = Protocol_hash.Table.create 7 in let* lockfile = create_lockfile chain_dir in let chain_store : chain_store = @@ -1927,6 +1930,7 @@ module Chain = struct genesis_block_data; block_store; block_watcher; + validated_block_watcher; block_rpc_directories; lockfile; } @@ -1950,6 +1954,7 @@ module Chain = struct let* chain_state = load_chain_state chain_dir block_store in let chain_state = Shared.create chain_state in let block_watcher = Lwt_watcher.create_input () in + let validated_block_watcher = Lwt_watcher.create_input () in let block_rpc_directories = Protocol_hash.Table.create 7 in let* lockfile = create_lockfile chain_dir in let chain_store = @@ -1963,6 +1968,7 @@ module Chain = struct chain_state; genesis_block_data; block_watcher; + validated_block_watcher; block_rpc_directories; lockfile; } @@ -2297,6 +2303,9 @@ module Chain = struct Shared.use chain_store.chain_state (fun {protocol_levels_data; _} -> Stored_data.get protocol_levels_data) + let validated_watcher chain_store = + Lwt_watcher.create_stream chain_store.validated_block_watcher + let watcher chain_store = Lwt_watcher.create_stream chain_store.block_watcher let get_rpc_directory chain_store block = diff --git a/src/lib_store/unix/store.mli b/src/lib_store/unix/store.mli index 9088771a10753fb7e60e4293260c9587b500e413..96827bbf0e87b5ba87435a3a3fa1b95df2cf46b4 100644 --- a/src/lib_store/unix/store.mli +++ b/src/lib_store/unix/store.mli @@ -309,10 +309,10 @@ module Block : sig invalid blocks file). *) val is_known_invalid : chain_store -> Block_hash.t -> bool Lwt.t - (** [is_known_prechecked chain_store bh] tests that the block [bh] - is prechecked in [chain_store] (i.e. the block is present in the - prechecked block cache). *) - val is_known_prechecked : chain_store -> Block_hash.t -> bool Lwt.t + (** [is_known_validated chain_store bh] tests that the block [bh] + is validated in [chain_store] (i.e. the block is present in the + validated block cache). *) + val is_known_validated : chain_store -> Block_hash.t -> bool Lwt.t (** [is_known chain_store bh] tests that the block [bh] is either known valid or known invalid in [chain_store]. *) @@ -410,14 +410,13 @@ module Block : sig val read_predecessor_of_hash_opt : chain_store -> Block_hash.t -> block option Lwt.t - (** [read_prechecked_block chain_store bh] tries to read in the - [chain_store]'s prechecked block cache the block [bh].*) - val read_prechecked_block : - chain_store -> Block_hash.t -> block tzresult Lwt.t + (** [read_validated_block chain_store bh] tries to read in the + [chain_store]'s validated block cache the block [bh].*) + val read_validated_block : chain_store -> Block_hash.t -> block tzresult Lwt.t - (** [read_prechecked_block_opt chain_store bh] optional version of - [read_prechecked_block].*) - val read_prechecked_block_opt : + (** [read_validated_block_opt chain_store bh] optional version of + [read_validated_block].*) + val read_validated_block_opt : chain_store -> Block_hash.t -> block option Lwt.t (** [store_block chain_store ~block_header ~operations @@ -428,7 +427,7 @@ module Block : sig the newly created block is returned. If the block was successfully stored, then the block is removed - from the prechecked block cache. *) + from the validated block cache. *) val store_block : chain_store -> block_header:Block_header.t -> @@ -436,10 +435,10 @@ module Block : sig Block_validation.result -> block option tzresult Lwt.t - (** [store_prechecked_block chain_store ~hash ~block_header ~operations] - stores in [chain_store]'s prechecked block cache the block with + (** [store_validated_block chain_store ~hash ~block_header ~operations] + stores in [chain_store]'s validated block cache the block with its [block_header] and [operations]. *) - val store_prechecked_block : + val store_validated_block : chain_store -> hash:Block_hash.t -> block_header:Block_header.t -> @@ -910,6 +909,11 @@ module Chain : sig [chain_store]. *) val watcher : chain_store -> Block.t Lwt_stream.t * Lwt_watcher.stopper + (** [validated_watcher chain_store] instantiates a new validated block + watcher for [chain_store]. *) + val validated_watcher : + chain_store -> Block.t Lwt_stream.t * Lwt_watcher.stopper + (** [get_rpc_directory chain_store block] returns the RPC directory associated to the [block]. *) val get_rpc_directory : diff --git a/src/proto_015_PtLimaPt/lib_delegate/baking_vdf.ml b/src/proto_015_PtLimaPt/lib_delegate/baking_vdf.ml index e8994dc556a54e33cdee330422c777197f5fff70..3bfccb10bb945b39471615daabfec6ae186fdfdd 100644 --- a/src/proto_015_PtLimaPt/lib_delegate/baking_vdf.ml +++ b/src/proto_015_PtLimaPt/lib_delegate/baking_vdf.ml @@ -51,7 +51,7 @@ type 'a state = { } let init_block_stream_with_stopper cctxt chain = - Client_baking_blocks.monitor_valid_blocks + Client_baking_blocks.monitor_applied_blocks ~next_protocols:(Some [Protocol.hash]) cctxt ~chains:[chain] diff --git a/src/proto_015_PtLimaPt/lib_delegate/client_baking_blocks.ml b/src/proto_015_PtLimaPt/lib_delegate/client_baking_blocks.ml index d7064545b7bd8efa0e0006d20db6ef7d0312f553..0c255d63327f331b24ce77078521ccf142f15fd7 100644 --- a/src/proto_015_PtLimaPt/lib_delegate/client_baking_blocks.ml +++ b/src/proto_015_PtLimaPt/lib_delegate/client_baking_blocks.ml @@ -143,12 +143,12 @@ module Block_seen_event = struct module Event = Internal_event.Make (Definition) end -let monitor_valid_blocks cctxt ?chains ?protocols ~next_protocols () = - Monitor_services.valid_blocks cctxt ?chains ?protocols ?next_protocols () +let monitor_applied_blocks cctxt ?chains ?protocols ~next_protocols () = + Monitor_services.applied_blocks cctxt ?chains ?protocols ?next_protocols () >>=? fun (block_stream, stop) -> return ( Lwt_stream.map_s - (fun ((chain, block), header) -> + (fun (chain, block, header, _ops) -> Block_seen_event.( Event.emit (make block header (`Valid_blocks chain))) >>=? fun () -> diff --git a/src/proto_015_PtLimaPt/lib_delegate/client_baking_blocks.mli b/src/proto_015_PtLimaPt/lib_delegate/client_baking_blocks.mli index c3358f61f82fe49b043dd42ef343bff002a3e3e6..41b9f70b8619914da4a5e3ff34f157279a60edc9 100644 --- a/src/proto_015_PtLimaPt/lib_delegate/client_baking_blocks.mli +++ b/src/proto_015_PtLimaPt/lib_delegate/client_baking_blocks.mli @@ -45,7 +45,7 @@ val info : Block_services.block -> block_info tzresult Lwt.t -val monitor_valid_blocks : +val monitor_applied_blocks : #Protocol_client_context.rpc_context -> ?chains:Chain_services.chain list -> ?protocols:Protocol_hash.t list -> diff --git a/src/proto_015_PtLimaPt/lib_delegate/client_daemon.ml b/src/proto_015_PtLimaPt/lib_delegate/client_daemon.ml index ba2cfdac03b3b0acbefcc66990a0cca6383e5a8b..afc9f52d6aa775ad86805dd15e433fd01d80724b 100644 --- a/src/proto_015_PtLimaPt/lib_delegate/client_daemon.ml +++ b/src/proto_015_PtLimaPt/lib_delegate/client_daemon.ml @@ -124,7 +124,7 @@ module Accuser = struct Protocol_hash.pp_short Protocol.hash >>= fun () -> - Client_baking_blocks.monitor_valid_blocks + Client_baking_blocks.monitor_applied_blocks ~next_protocols:(Some [Protocol.hash]) cctxt ~chains:[chain] diff --git a/src/proto_016_PtMumbai/lib_delegate/baking_vdf.ml b/src/proto_016_PtMumbai/lib_delegate/baking_vdf.ml index 6c948c31a4b3f3d3d46a4fc37a02129ac0638e49..cdceb772344abca5048c1fed5458bdb288e711c4 100644 --- a/src/proto_016_PtMumbai/lib_delegate/baking_vdf.ml +++ b/src/proto_016_PtMumbai/lib_delegate/baking_vdf.ml @@ -51,7 +51,7 @@ type 'a state = { } let init_block_stream_with_stopper cctxt chain = - Client_baking_blocks.monitor_valid_blocks + Client_baking_blocks.monitor_applied_blocks ~next_protocols:(Some [Protocol.hash]) cctxt ~chains:[chain] diff --git a/src/proto_016_PtMumbai/lib_delegate/client_baking_blocks.ml b/src/proto_016_PtMumbai/lib_delegate/client_baking_blocks.ml index d7064545b7bd8efa0e0006d20db6ef7d0312f553..0c255d63327f331b24ce77078521ccf142f15fd7 100644 --- a/src/proto_016_PtMumbai/lib_delegate/client_baking_blocks.ml +++ b/src/proto_016_PtMumbai/lib_delegate/client_baking_blocks.ml @@ -143,12 +143,12 @@ module Block_seen_event = struct module Event = Internal_event.Make (Definition) end -let monitor_valid_blocks cctxt ?chains ?protocols ~next_protocols () = - Monitor_services.valid_blocks cctxt ?chains ?protocols ?next_protocols () +let monitor_applied_blocks cctxt ?chains ?protocols ~next_protocols () = + Monitor_services.applied_blocks cctxt ?chains ?protocols ?next_protocols () >>=? fun (block_stream, stop) -> return ( Lwt_stream.map_s - (fun ((chain, block), header) -> + (fun (chain, block, header, _ops) -> Block_seen_event.( Event.emit (make block header (`Valid_blocks chain))) >>=? fun () -> diff --git a/src/proto_016_PtMumbai/lib_delegate/client_baking_blocks.mli b/src/proto_016_PtMumbai/lib_delegate/client_baking_blocks.mli index c3358f61f82fe49b043dd42ef343bff002a3e3e6..41b9f70b8619914da4a5e3ff34f157279a60edc9 100644 --- a/src/proto_016_PtMumbai/lib_delegate/client_baking_blocks.mli +++ b/src/proto_016_PtMumbai/lib_delegate/client_baking_blocks.mli @@ -45,7 +45,7 @@ val info : Block_services.block -> block_info tzresult Lwt.t -val monitor_valid_blocks : +val monitor_applied_blocks : #Protocol_client_context.rpc_context -> ?chains:Chain_services.chain list -> ?protocols:Protocol_hash.t list -> diff --git a/src/proto_016_PtMumbai/lib_delegate/client_daemon.ml b/src/proto_016_PtMumbai/lib_delegate/client_daemon.ml index ba2cfdac03b3b0acbefcc66990a0cca6383e5a8b..afc9f52d6aa775ad86805dd15e433fd01d80724b 100644 --- a/src/proto_016_PtMumbai/lib_delegate/client_daemon.ml +++ b/src/proto_016_PtMumbai/lib_delegate/client_daemon.ml @@ -124,7 +124,7 @@ module Accuser = struct Protocol_hash.pp_short Protocol.hash >>= fun () -> - Client_baking_blocks.monitor_valid_blocks + Client_baking_blocks.monitor_applied_blocks ~next_protocols:(Some [Protocol.hash]) cctxt ~chains:[chain] diff --git a/src/proto_alpha/lib_delegate/baking_vdf.ml b/src/proto_alpha/lib_delegate/baking_vdf.ml index 8b8d472abbdc161ff8c5cada32f7fa2db95708a4..c0e97b55ea74b76b00a2368a7f96d14ab4dc9880 100644 --- a/src/proto_alpha/lib_delegate/baking_vdf.ml +++ b/src/proto_alpha/lib_delegate/baking_vdf.ml @@ -51,7 +51,7 @@ type 'a state = { } let init_block_stream_with_stopper cctxt chain = - Client_baking_blocks.monitor_valid_blocks + Client_baking_blocks.monitor_applied_blocks ~next_protocols:(Some [Protocol.hash]) cctxt ~chains:[chain] diff --git a/src/proto_alpha/lib_delegate/client_baking_blocks.ml b/src/proto_alpha/lib_delegate/client_baking_blocks.ml index d7064545b7bd8efa0e0006d20db6ef7d0312f553..0c255d63327f331b24ce77078521ccf142f15fd7 100644 --- a/src/proto_alpha/lib_delegate/client_baking_blocks.ml +++ b/src/proto_alpha/lib_delegate/client_baking_blocks.ml @@ -143,12 +143,12 @@ module Block_seen_event = struct module Event = Internal_event.Make (Definition) end -let monitor_valid_blocks cctxt ?chains ?protocols ~next_protocols () = - Monitor_services.valid_blocks cctxt ?chains ?protocols ?next_protocols () +let monitor_applied_blocks cctxt ?chains ?protocols ~next_protocols () = + Monitor_services.applied_blocks cctxt ?chains ?protocols ?next_protocols () >>=? fun (block_stream, stop) -> return ( Lwt_stream.map_s - (fun ((chain, block), header) -> + (fun (chain, block, header, _ops) -> Block_seen_event.( Event.emit (make block header (`Valid_blocks chain))) >>=? fun () -> diff --git a/src/proto_alpha/lib_delegate/client_baking_blocks.mli b/src/proto_alpha/lib_delegate/client_baking_blocks.mli index c3358f61f82fe49b043dd42ef343bff002a3e3e6..41b9f70b8619914da4a5e3ff34f157279a60edc9 100644 --- a/src/proto_alpha/lib_delegate/client_baking_blocks.mli +++ b/src/proto_alpha/lib_delegate/client_baking_blocks.mli @@ -45,7 +45,7 @@ val info : Block_services.block -> block_info tzresult Lwt.t -val monitor_valid_blocks : +val monitor_applied_blocks : #Protocol_client_context.rpc_context -> ?chains:Chain_services.chain list -> ?protocols:Protocol_hash.t list -> diff --git a/src/proto_alpha/lib_delegate/client_daemon.ml b/src/proto_alpha/lib_delegate/client_daemon.ml index 80eb3ba7f05fdeaae2ac5a91609c27fa6258ad2b..0afbd46b8d4e0d49192fcf1d96c17bab96aca482 100644 --- a/src/proto_alpha/lib_delegate/client_daemon.ml +++ b/src/proto_alpha/lib_delegate/client_daemon.ml @@ -125,7 +125,7 @@ module Accuser = struct Protocol_hash.pp_short Protocol.hash >>= fun () -> - Client_baking_blocks.monitor_valid_blocks + Client_baking_blocks.monitor_applied_blocks ~next_protocols:(Some [Protocol.hash]) cctxt ~chains:[chain]