From fb83cd980f557dd44aedb1a3716872061e4e3ac9 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Wed, 2 Aug 2023 10:22:35 +0200 Subject: [PATCH] Shell: lift monitor_head rpc implementation --- src/lib_shell/monitor_directory.ml | 138 ++++++++++++++-------------- src/lib_shell/monitor_directory.mli | 10 ++ 2 files changed, 81 insertions(+), 67 deletions(-) diff --git a/src/lib_shell/monitor_directory.ml b/src/lib_shell/monitor_directory.ml index a7c983fda6d1..6f8513d4ef80 100644 --- a/src/lib_shell/monitor_directory.ml +++ b/src/lib_shell/monitor_directory.ml @@ -24,6 +24,73 @@ (* *) (*****************************************************************************) +let monitor_head + ~(head_watcher : + (Block_hash.t * Block_header.t) Lwt_stream.t * Lwt_watcher.stopper) store + chain q = + let open Lwt_syntax in + let* chain_store = Chain_directory.get_chain_store_exn store chain in + let block_stream, stopper = head_watcher in + let* head = Store.Chain.current_head chain_store in + let shutdown () = Lwt_watcher.shutdown stopper in + let within_protocols header = + let find_protocol protocol_level = + let+ p = Store.Chain.find_protocol chain_store ~protocol_level in + WithExceptions.Option.to_exn + ~none: + (Failure (Format.sprintf "Cannot find protocol %d" protocol_level)) + p + in + let next_protocol (header : Block_header.t) = + find_protocol header.shell.proto_level + in + let current_protocol (header : Block_header.t) = + let* pred = Store.Block.read_block chain_store header.shell.predecessor in + match pred with + | Error e -> + Format.kasprintf + Stdlib.failwith + "Cannot find current protocol because missing predecessor: %a" + pp_print_trace + e + | Ok pred_block -> + let pred_header = Store.Block.header pred_block in + find_protocol pred_header.shell.proto_level + in + let within protocols get_protocol header = + match protocols with + | [] -> return_true + | _ -> + let+ p = get_protocol header in + List.exists (Protocol_hash.equal p) protocols + in + let* ok_next = within q#next_protocols next_protocol header in + let* ok_current = within q#protocols current_protocol header in + return (ok_current && ok_next) + in + let stream = + Lwt_stream.filter_map_s + (fun (hash, header) -> + let* within_protocols = within_protocols header in + if within_protocols then Lwt.return_some (hash, header) + else Lwt.return_none) + block_stream + in + let* first_block_is_within_protocols = + within_protocols (Store.Block.header head) + in + let first_call = + (* Skip the first block if this is false *) + ref first_block_is_within_protocols + in + let next () = + if !first_call then ( + first_call := false ; + Lwt.return_some (Store.Block.hash head, Store.Block.header head)) + else Lwt_stream.get stream + in + Tezos_rpc.Answer.return_stream {next; shutdown} + let build_rpc_directory ~(commit_info : Octez_node_version.commit_info) validator mainchain_validator = let open Lwt_syntax in @@ -188,78 +255,15 @@ let build_rpc_directory ~(commit_info : Octez_node_version.commit_info) let next () = Lwt_stream.get stream in Tezos_rpc.Answer.return_stream {next; shutdown}) ; gen_register1 Monitor_services.S.heads (fun chain q () -> + let open Lwt_syntax in + let* chain_store = Chain_directory.get_chain_store_exn store chain in (* TODO: when `chain = `Test`, should we reset then stream when the `testnet` change, or dias we currently do ?? *) - let* chain_store = Chain_directory.get_chain_store_exn store chain in match Validator.get validator (Store.Chain.chain_id chain_store) with | Error _ -> Lwt.fail Not_found | Ok chain_validator -> - let block_stream, stopper = - Chain_validator.new_head_watcher chain_validator - in - let* head = Store.Chain.current_head chain_store in - let shutdown () = Lwt_watcher.shutdown stopper in - let within_protocols header = - let find_protocol protocol_level = - let+ p = Store.Chain.find_protocol chain_store ~protocol_level in - WithExceptions.Option.to_exn - ~none: - (Failure - (Format.sprintf "Cannot find protocol %d" protocol_level)) - p - in - let next_protocol (header : Block_header.t) = - find_protocol header.shell.proto_level - in - let current_protocol (header : Block_header.t) = - let* pred = - Store.Block.read_block chain_store header.shell.predecessor - in - match pred with - | Error e -> - Format.kasprintf - Stdlib.failwith - "Cannot find current protocol because missing predecessor: \ - %a" - pp_print_trace - e - | Ok pred_block -> - let pred_header = Store.Block.header pred_block in - find_protocol pred_header.shell.proto_level - in - let within protocols get_protocol header = - match protocols with - | [] -> return_true - | _ -> - let+ p = get_protocol header in - List.exists (Protocol_hash.equal p) protocols - in - let* ok_next = within q#next_protocols next_protocol header in - let* ok_current = within q#protocols current_protocol header in - return (ok_current && ok_next) - in - let stream = - Lwt_stream.filter_map_s - (fun (hash, header) -> - let* within_protocols = within_protocols header in - if within_protocols then Lwt.return_some (hash, header) - else Lwt.return_none) - block_stream - in - let* first_block_is_within_protocols = - within_protocols (Store.Block.header head) - in - let first_call = - (* Skip the first block if this is false *) - ref first_block_is_within_protocols - in - let next () = - if !first_call then ( - first_call := false ; - Lwt.return_some (Store.Block.hash head, Store.Block.header head)) - else Lwt_stream.get stream - in - Tezos_rpc.Answer.return_stream {next; shutdown}) ; + let head_watcher = Chain_validator.new_head_watcher chain_validator in + monitor_head ~head_watcher store chain q) ; gen_register0 Monitor_services.S.protocols (fun () () -> let stream, stopper = Store.Protocol.protocol_watcher store in let shutdown () = Lwt_watcher.shutdown stopper in diff --git a/src/lib_shell/monitor_directory.mli b/src/lib_shell/monitor_directory.mli index 450c3ca6b1bb..9c828b1a6ad3 100644 --- a/src/lib_shell/monitor_directory.mli +++ b/src/lib_shell/monitor_directory.mli @@ -23,6 +23,16 @@ (* *) (*****************************************************************************) +val monitor_head : + head_watcher: + (Block_hash.t * Block_header.t) Lwt_stream.t * Lwt_watcher.stopper -> + Store.t -> + Chain_services.chain -> + < next_protocols : Protocol_hash.t trace + ; protocols : Protocol_hash.t trace + ; .. > -> + (Block_hash.t * Block_header.t) Tezos_rpc.Answer.t Lwt.t + val build_rpc_directory : commit_info:Octez_node_version.commit_info -> Validator.t -> -- GitLab