From ea0893be325808b07f507e81b3b5d4244ee5e012 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Wed, 14 Jun 2023 11:08:44 +0200 Subject: [PATCH 01/10] Store: implement a minimal Store.sync --- src/lib_store/mocked/store.ml | 3 + src/lib_store/shared/naming.ml | 4 + src/lib_store/shared/naming.mli | 5 + src/lib_store/shared/store_events.ml | 41 +++ src/lib_store/store.mli | 15 ++ src/lib_store/unix/block_store.ml | 361 ++++++++++++++++++++------- src/lib_store/unix/block_store.mli | 19 ++ src/lib_store/unix/consistency.ml | 34 ++- src/lib_store/unix/store.ml | 332 ++++++++++++++++-------- src/lib_store/unix/store.mli | 15 ++ 10 files changed, 632 insertions(+), 197 deletions(-) diff --git a/src/lib_store/mocked/store.ml b/src/lib_store/mocked/store.ml index 5157e186f438..4570a75a2f34 100644 --- a/src/lib_store/mocked/store.ml +++ b/src/lib_store/mocked/store.ml @@ -1874,6 +1874,9 @@ let init ?patch_context ?commit_genesis ?history_mode ?(readonly = false) ?history_mode ~allow_testchains) +let sync ?last_status:_ ~trigger_hash:_ _store = + Stdlib.failwith "sync: unimplemented" + let close_store global_store = Lwt_watcher.shutdown_input global_store.protocol_watcher ; Lwt_watcher.shutdown_input global_store.global_block_watcher ; diff --git a/src/lib_store/shared/naming.ml b/src/lib_store/shared/naming.ml index f8716b9d5532..48ad025c5021 100644 --- a/src/lib_store/shared/naming.ml +++ b/src/lib_store/shared/naming.ml @@ -72,8 +72,12 @@ let chain_config_file dir = let lockfile dir = mk_file dir "lock" +let block_store_lockfile dir = mk_file dir "lockblock" + let gc_lockfile dir = mk_file dir "gc_lock" +let stored_data_lockfile dir = mk_file dir "stored_data_lock" + let reconstruction_lock_file dir = mk_file dir "reconstruction_lock" let testchains_dir dir = mk_dir dir "testchains" diff --git a/src/lib_store/shared/naming.mli b/src/lib_store/shared/naming.mli index 7343d0a99625..03f926f62ff4 100644 --- a/src/lib_store/shared/naming.mli +++ b/src/lib_store/shared/naming.mli @@ -77,8 +77,13 @@ val chain_dir : val lockfile : [`Chain_dir] directory -> [`Lockfile] file +val block_store_lockfile : [`Chain_dir] directory -> [`Lockfile] file + val gc_lockfile : [`Chain_dir] directory -> [`Gc_lockfile] file +val stored_data_lockfile : + [`Chain_dir] directory -> [`Stored_data_lockfile] file + val reconstruction_lock_file : [`Chain_dir] directory -> [`Reconstruction_lockfile] file diff --git a/src/lib_store/shared/store_events.ml b/src/lib_store/shared/store_events.ml index 049b4f3e3e8b..761063b4a6bb 100644 --- a/src/lib_store/shared/store_events.ml +++ b/src/lib_store/shared/store_events.ml @@ -291,6 +291,47 @@ let start_context_split = ~pp1:pp_int32 ("level", Data_encoding.int32) +let start_store_sync = + declare_0 + ~section + ~level:Info + ~name:"start_store_sync" + ~msg:"starting store sync" + () + +let store_already_sync = + declare_0 + ~section + ~level:Info + ~name:"store_already_sync" + ~msg:"store already in sync" + () + +let store_quick_sync = + declare_0 + ~section + ~level:Info + ~name:"store_quick_sync" + ~msg:"store quick sync" + () + +let store_full_sync = + declare_0 + ~section + ~level:Info + ~name:"store_full_sync" + ~msg:"store full sync" + () + +let end_store_sync = + declare_1 + ~section + ~level:Info + ~name:"end_store_sync" + ~msg:"store was successfully synced in {time}" + ~pp1:Time.System.Span.pp_hum + ("time", Time.System.Span.encoding) + let context_gc_is_not_allowed = declare_0 ~section diff --git a/src/lib_store/store.mli b/src/lib_store/store.mli index 28a9f942daaa..702970a695e6 100644 --- a/src/lib_store/store.mli +++ b/src/lib_store/store.mli @@ -225,6 +225,21 @@ val init : Genesis.t -> store tzresult Lwt.t +(** [sync ?last_status ~trigger_hash store] performs a store + synchronization to update all the data and file descriptors. This + is useful to keep track of a store opened in readonly mode that is + updated by another read/write instance. + [?last_status] gives a hint regarding the previous synchronization + to speed up the process. + [trigger_hash] corresponds to the last block that aims to be + stored in the store instance to synchronize -- it is typically set + as the last head to synchronize with. *) +val sync : + ?last_status:Block_store_status.t -> + trigger_hash:Block_hash.t -> + t -> + (t * Block_store_status.t * (unit -> unit Lwt.t)) tzresult Lwt.t + (** [main_chain_store global_store] returns the main chain store. *) val main_chain_store : store -> chain_store diff --git a/src/lib_store/unix/block_store.ml b/src/lib_store/unix/block_store.ml index 10599de44ac1..b7801f4de3ef 100644 --- a/src/lib_store/unix/block_store.ml +++ b/src/lib_store/unix/block_store.ml @@ -48,6 +48,8 @@ type block_store = { merge_scheduler : Lwt_idle_waiter.t; (* Target level x Merging thread *) mutable merging_thread : (int32 * unit tzresult Lwt.t) option; + lockfile : Lwt_unix.file_descr; + stored_data_lockfile : Lwt_unix.file_descr; } type t = block_store @@ -1190,7 +1192,7 @@ let compute_lowest_bound_to_preserve_in_floating block_store ~new_head Block_repr.max_operations_ttl new_head_metadata | Some metadata -> Block_repr.max_operations_ttl metadata))) -let instanciate_temporary_floating_store block_store = +let instantiate_temporary_floating_store block_store = let open Lwt_result_syntax in protect ~on_error:(fun err -> @@ -1223,6 +1225,21 @@ let instanciate_temporary_floating_store block_store = block_store.rw_floating_block_store <- new_rw_store ; return (ro_store, rw_store, new_rw_store))) +let create_lockfile path chain_dir = + let open Lwt_syntax in + protect (fun () -> + let* fd = + Lwt_unix.openfile + (path chain_dir |> Naming.file_path) + [Unix.O_CREAT; O_RDWR; O_CLOEXEC; O_SYNC] + 0o777 + in + return_ok fd) + +let lock lockfile = Lwt_unix.lockf lockfile Unix.F_LOCK 0 + +let unlock lockfile = Lwt_unix.lockf lockfile Unix.F_ULOCK 0 + let create_merging_thread block_store ~history_mode ~old_ro_store ~old_rw_store ~new_head ~new_head_lpbl ~lowest_bound_to_preserve_in_floating ~cementing_highwatermark ~cycle_size_limit = @@ -1291,9 +1308,13 @@ let create_merging_thread block_store ~history_mode ~old_ro_store ~old_rw_store in (* Clean-up the files that are below the offset *) let*! () = - Cemented_block_store.trigger_gc - block_store.cemented_store - history_mode + let*! () = lock block_store.lockfile in + Lwt.finalize + (fun () -> + Cemented_block_store.trigger_gc + block_store.cemented_store + history_mode) + (fun () -> unlock block_store.lockfile) in return_unit else (* Don't cement any cycles! *) @@ -1321,9 +1342,13 @@ let create_merging_thread block_store ~history_mode ~old_ro_store ~old_rw_store in (* Clean-up the files that are below the offset *) let*! () = - Cemented_block_store.trigger_gc - block_store.cemented_store - history_mode + let*! () = lock block_store.lockfile in + Lwt.finalize + (fun () -> + Cemented_block_store.trigger_gc + block_store.cemented_store + history_mode) + (fun () -> unlock block_store.lockfile) in return_unit else @@ -1388,7 +1413,13 @@ let merge_stores ?(cycle_size_limit = default_cycle_size_limit) block_store {status = Format.asprintf "%a" Block_store_status.pp store_status}) in (* Mark the store's status as Merging *) - let* () = Block_store_status.set_merge_status block_store.status_data in + let* () = + Lwt.finalize + (fun () -> + let*! () = lock block_store.stored_data_lockfile in + Block_store_status.set_merge_status block_store.status_data) + (fun () -> unlock block_store.stored_data_lockfile) + in let new_head_lpbl = Block_repr.last_preserved_block_level new_head_metadata in @@ -1406,7 +1437,13 @@ let merge_stores ?(cycle_size_limit = default_cycle_size_limit) block_store Lwt_idle_waiter.force_idle block_store.merge_scheduler (fun () -> (* Move the rw in the ro stores and create a new tmp *) let* old_ro_store, old_rw_store, _new_rw_store = - instanciate_temporary_floating_store block_store + Lwt.finalize + (fun () -> + (* Lock the block store to avoid RO instances to open the + state while the file descriptors are being updated. *) + let*! () = lock block_store.lockfile in + instantiate_temporary_floating_store block_store) + (fun () -> unlock block_store.lockfile) in (* Important: do not clean-up the temporary stores on failures as they will delete the recently arrived @@ -1443,17 +1480,34 @@ let merge_stores ?(cycle_size_limit = default_cycle_size_limit) block_store Lwt_idle_waiter.force_idle block_store.merge_scheduler (fun () -> - (* Critical section: update on-disk values *) - let* () = - move_all_floating_stores - block_store - ~new_ro_store - in - let* () = write_caboose block_store new_caboose in - let* () = - write_savepoint block_store new_savepoint - in - return_unit) + Lwt.finalize + (fun () -> + (* Lock the block store to avoid RO + instances to open the state while + the file descriptors are being + updated. *) + let*! () = lock block_store.lockfile in + (* Critical section: update on-disk values *) + let* () = + move_all_floating_stores + block_store + ~new_ro_store + in + let*! () = + lock block_store.stored_data_lockfile + in + let* () = + write_caboose block_store new_caboose + in + let* () = + write_savepoint block_store new_savepoint + in + return_unit) + (fun () -> + let*! () = + unlock block_store.stored_data_lockfile + in + unlock block_store.lockfile)) in (* We can now trigger the context GC: if the GC is performed, this call will block until @@ -1469,11 +1523,18 @@ let merge_stores ?(cycle_size_limit = default_cycle_size_limit) block_store section, in case it needs to access the block store. *) let* () = finalizer new_head_lpbl in - (* The merge operation succeeded, the store is now idle. *) + (* The merge operation succeeded, the store is + now idle. *) block_store.merging_thread <- None ; let* () = - Block_store_status.set_idle_status - block_store.status_data + Lwt.finalize + (fun () -> + let*! () = + lock block_store.stored_data_lockfile + in + Block_store_status.set_idle_status + block_store.status_data) + (fun () -> unlock block_store.stored_data_lockfile) in return_unit)) (fun () -> @@ -1594,70 +1655,202 @@ let may_recover_merge block_store = let load ?block_cache_limit chain_dir ~genesis_block ~readonly = let open Lwt_result_syntax in - let* cemented_store = Cemented_block_store.init chain_dir ~readonly in - let*! ro_floating_block_store = - Floating_block_store.init chain_dir ~readonly RO - in - let ro_floating_block_stores = [ro_floating_block_store] in - let*! rw_floating_block_store = - Floating_block_store.init chain_dir ~readonly RW - in - let genesis_descr = Block_repr.descriptor genesis_block in - let* savepoint = - Stored_data.init - (Naming.savepoint_file chain_dir) - ~initial_data:genesis_descr - in - let*! _, savepoint_level = Stored_data.get savepoint in - Prometheus.Gauge.set - Store_metrics.metrics.savepoint_level - (Int32.to_float savepoint_level) ; - let* caboose = - Stored_data.init (Naming.caboose_file chain_dir) ~initial_data:genesis_descr - in - let*! _, caboose_level = Stored_data.get caboose in - Prometheus.Gauge.set - Store_metrics.metrics.caboose_level - (Int32.to_float caboose_level) ; - let* status_data = - Stored_data.init - (Naming.block_store_status_file chain_dir) - ~initial_data:Block_store_status.create_idle_status - in - let block_cache = - Block_lru_cache.create - (Option.value block_cache_limit ~default:default_block_cache_limit) - in - let merge_scheduler = Lwt_idle_waiter.create () in - let merge_mutex = Lwt_mutex.create () in - let block_store = - { - chain_dir; - genesis_block; - readonly; - cemented_store; - ro_floating_block_stores; - rw_floating_block_store; - caboose; - savepoint; - status_data; - block_cache; - gc_callback = None; - split_callback = None; - merge_mutex; - merge_scheduler; - merging_thread = None; - } - in - let* () = - if not readonly then may_recover_merge block_store else return_unit - in - let*! status = Stored_data.get status_data in - let* () = - fail_unless (Block_store_status.is_idle status) Cannot_load_degraded_store + let* lockfile = create_lockfile Naming.block_store_lockfile chain_dir in + let* block_store = + Lwt.finalize + (fun () -> + let*! () = lock lockfile in + let* cemented_store = Cemented_block_store.init chain_dir ~readonly in + let*! ro_floating_block_store = + Floating_block_store.init chain_dir ~readonly RO + in + let ro_floating_block_stores = [ro_floating_block_store] in + let*! rw_floating_block_store = + Floating_block_store.init chain_dir ~readonly RW + in + let genesis_descr = Block_repr.descriptor genesis_block in + let* savepoint = + Stored_data.init + (Naming.savepoint_file chain_dir) + ~initial_data:genesis_descr + in + let*! _, savepoint_level = Stored_data.get savepoint in + Prometheus.Gauge.set + Store_metrics.metrics.savepoint_level + (Int32.to_float savepoint_level) ; + let* caboose = + Stored_data.init + (Naming.caboose_file chain_dir) + ~initial_data:genesis_descr + in + let*! _, caboose_level = Stored_data.get caboose in + Prometheus.Gauge.set + Store_metrics.metrics.caboose_level + (Int32.to_float caboose_level) ; + let* status_data = + Stored_data.init + (Naming.block_store_status_file chain_dir) + ~initial_data:Block_store_status.create_idle_status + in + let block_cache = + Block_lru_cache.create + (Option.value block_cache_limit ~default:default_block_cache_limit) + in + let merge_scheduler = Lwt_idle_waiter.create () in + let merge_mutex = Lwt_mutex.create () in + let* stored_data_lockfile = + create_lockfile Naming.stored_data_lockfile chain_dir + in + let block_store = + { + chain_dir; + genesis_block; + readonly; + cemented_store; + ro_floating_block_stores; + rw_floating_block_store; + caboose; + savepoint; + status_data; + block_cache; + gc_callback = None; + split_callback = None; + merge_mutex; + merge_scheduler; + merging_thread = None; + lockfile; + stored_data_lockfile; + } + in + let* () = + if not readonly then may_recover_merge block_store else return_unit + in + let*! status = Stored_data.get status_data in + let* () = + fail_when + (Block_store_status.is_merging status && not readonly) + Cannot_load_degraded_store + in + return block_store) + (fun () -> unlock lockfile) in return block_store +let lock_block_store {lockfile; _} = lock lockfile + +let unlock_block_store {lockfile; _} = unlock lockfile + +let sync ~last_status block_store = + let open Lwt_result_syntax in + Lwt.finalize + (fun () -> + let*! () = lock block_store.lockfile in + let chain_dir = block_store.chain_dir in + let readonly = true in + let* stored_status = + Stored_data.load (Naming.block_store_status_file chain_dir) + in + let*! current_status = Stored_data.get stored_status in + let fbs = + block_store.rw_floating_block_store + :: block_store.ro_floating_block_stores + in + (* Prepare resources cleaners for former resources. *) + let cleanups () = List.iter_s Floating_block_store.close fbs in + let no_cleanup () = Lwt.return_unit in + let is_last_status_merging = Block_store_status.is_merging last_status in + let is_current_status_merging = + Block_store_status.is_merging current_status + in + let last_status_id = Block_store_status.get_status_value last_status in + let current_status_id = + Block_store_status.get_status_value current_status + in + let* new_block_store, cleanups = + match (is_last_status_merging, is_current_status_merging) with + | (false, false | true, true) when last_status_id = current_status_id -> + (* The status has not changed (Idle to Idle or Merging to + Merging where status ids are equal)since the last sync: + we only reload the floating stores. *) + let () = List.iter Floating_block_store.may_sync fbs in + return (block_store, no_cleanup) + | false, false | true, false -> + (* A complete merge occurred (Idle to Idle where status + ids are different) since the last sync or a merge (that + was ongoing during the last sync) finished. + We need to: + - open the new floating stores and update some store + values, + - synchronize the new cemented store indexes and block + files. *) + let*! ro_floating_block_store = + Floating_block_store.init chain_dir ~readonly RO + in + let ro_floating_block_stores = [ro_floating_block_store] in + let*! rw_floating_block_store = + Floating_block_store.init chain_dir ~readonly RW + in + let () = + Cemented_block_store.may_synchronize_indexes + block_store.cemented_store + in + let* () = + Cemented_block_store.reload_cemented_blocks_files + block_store.cemented_store + in + let* savepoint = + Stored_data.load (Naming.savepoint_file chain_dir) + in + let* caboose = Stored_data.load (Naming.caboose_file chain_dir) in + let status_data = block_store.status_data in + return + ( { + block_store with + ro_floating_block_stores; + rw_floating_block_store; + savepoint; + caboose; + status_data; + }, + cleanups ) + | false, true | true, true -> + (* A merge has started (Idle to Merging) since the last + sync or a merge that was ongoing during the last sync + finished and a new one is now ongoing (Merging to + Merging where status ids are different): we need to + open the temporary floating stores and update some + store values. *) + let*! ro_floating_block_store = + Floating_block_store.init chain_dir ~readonly RO + in + let*! rw_floating_block_store = + Floating_block_store.init chain_dir ~readonly RW + in + let ro_floating_block_stores = + [ro_floating_block_store; rw_floating_block_store] + in + let*! rw_floating_block_store = + Floating_block_store.init chain_dir ~readonly RW_TMP + in + let* savepoint = + Stored_data.load (Naming.savepoint_file chain_dir) + in + let* caboose = Stored_data.load (Naming.caboose_file chain_dir) in + let status_data = block_store.status_data in + return + ( { + block_store with + ro_floating_block_stores; + rw_floating_block_store; + savepoint; + caboose; + status_data; + }, + cleanups ) + in + return (new_block_store, current_status, cleanups)) + (fun () -> unlock block_store.lockfile) + let create ?block_cache_limit chain_dir ~genesis_block = let open Lwt_result_syntax in let* block_store = diff --git a/src/lib_store/unix/block_store.mli b/src/lib_store/unix/block_store.mli index af6d17d6d685..cc1a23fc7a5a 100644 --- a/src/lib_store/unix/block_store.mli +++ b/src/lib_store/unix/block_store.mli @@ -347,6 +347,25 @@ val load : readonly:bool -> block_store tzresult Lwt.t +val lock_block_store : t -> unit Lwt.t + +val unlock_block_store : t -> unit Lwt.t + +(** [sync ?last_status block_store] updates the [block_store] internal + file descriptors so that the return block store points to the + latest fds. This is useful to keep track of a block store opened + in readonly mode that is updated by another read/write + instance. + If [last_status] is provided, it gives a hint about the previous + sync call and may avoid unnecessary synchronizations. As a result, + a promise to cleanup resources is returned. This closure must be + evaluated after switching to the synchronized block_store to close + former resources and avoid leaks. *) +val sync : + last_status:Block_store_status.t -> + t -> + (t * Block_store_status.t * (unit -> unit Lwt.t)) tzresult Lwt.t + (** [register_gc_callback block_store callback] installs a [callback] that may be triggered during a block store merge in order to garbage-collect old contexts. *) diff --git a/src/lib_store/unix/consistency.ml b/src/lib_store/unix/consistency.ml index cf110916c6f5..82591bfbc8ba 100644 --- a/src/lib_store/unix/consistency.ml +++ b/src/lib_store/unix/consistency.ml @@ -149,6 +149,17 @@ let check_invariant ~genesis ~caboose ~savepoint ~cementing_highwatermark head = snd current_head; }) +let create_lockfile path chain_dir = + let open Lwt_syntax in + protect (fun () -> + let* fd = + Lwt_unix.openfile + (path chain_dir |> Naming.file_path) + [Unix.O_CREAT; O_RDWR; O_CLOEXEC; O_SYNC] + 0o777 + in + return_ok fd) + (* [check_consistency ~store_dir genesis] aims to provide a quick check (in terms of execution time) which checks that files may be read and they are consistent w.r.t to the given invariant. @@ -156,6 +167,16 @@ let check_invariant ~genesis ~caboose ~savepoint ~cementing_highwatermark Hypothesis: an existing store is provided. *) let check_consistency chain_dir genesis = let open Lwt_result_syntax in + let* lockfile_fd = create_lockfile Naming.lockfile chain_dir in + let*! () = Lwt_unix.lockf lockfile_fd Unix.F_LOCK 0 in + let* block_store_lockfile_fd = + create_lockfile Naming.block_store_lockfile chain_dir + in + let*! () = Lwt_unix.lockf block_store_lockfile_fd Unix.F_LOCK 0 in + let* stored_data_lockfile_fd = + create_lockfile Naming.stored_data_lockfile chain_dir + in + let*! () = Lwt_unix.lockf stored_data_lockfile_fd Unix.F_LOCK 0 in (* Try loading all the block's data files *) let* genesis_data = Stored_data.load (Naming.genesis_block_file chain_dir) in let*! genesis_block = Stored_data.get genesis_data in @@ -179,17 +200,13 @@ let check_consistency chain_dir genesis = let* protocol_levels_data = Stored_data.load (Naming.protocol_levels_file chain_dir) in - let* _invalid_blocks_data = Stored_data.load (Naming.invalid_blocks_file chain_dir) in - let* _forked_chains_data = Stored_data.load (Naming.forked_chains_file chain_dir) in - let* _target_data = Stored_data.load (Naming.target_file chain_dir) in - (* Open the store and try to read the blocks *) (* [~readonly:false] to recover from a potential interrupted merge *) let* block_store = Block_store.load chain_dir ~genesis_block ~readonly:true in @@ -239,7 +256,14 @@ let check_consistency chain_dir genesis = ~current_head in return_unit) - (fun () -> Block_store.close block_store) + (fun () -> + let*! () = Lwt_unix.lockf stored_data_lockfile_fd Unix.F_ULOCK 0 in + let*! () = Lwt_unix.close stored_data_lockfile_fd in + let*! () = Lwt_unix.lockf block_store_lockfile_fd Unix.F_ULOCK 0 in + let*! () = Lwt_unix.close block_store_lockfile_fd in + let*! () = Lwt_unix.lockf lockfile_fd Unix.F_ULOCK 0 in + let*! () = Lwt_unix.close lockfile_fd in + Block_store.close block_store) let fix_floating_stores chain_dir = let open Lwt_result_syntax in diff --git a/src/lib_store/unix/store.ml b/src/lib_store/unix/store.ml index b71c90a7ecff..3a28ae4bc138 100644 --- a/src/lib_store/unix/store.ml +++ b/src/lib_store/unix/store.ml @@ -90,6 +90,7 @@ and chain_store = { (chain_store * block) Tezos_rpc.Directory.t Protocol_hash.Map.t Protocol_hash.Table.t; lockfile : Lwt_unix.file_descr; + stored_data_lockfile : Lwt_unix.file_descr; } and chain_state = { @@ -213,14 +214,14 @@ let expect_predecessor_context_hash chain_store ~protocol_level = return b) (fun _ -> tzfail (Protocol_not_found {protocol_level})) -let create_lockfile chain_dir = +let create_lockfile path chain_dir = let open Lwt_syntax in protect (fun () -> let* fd = Lwt_unix.openfile - (Naming.lockfile chain_dir |> Naming.file_path) + (path chain_dir |> Naming.file_path) [Unix.O_CREAT; O_RDWR; O_CLOEXEC; O_SYNC] - 0o644 + 0o777 in return_ok fd) @@ -1530,31 +1531,39 @@ module Chain = struct Lwt.return_some l let merge_finalizer chain_store (new_highest_cemented_level : int32) = - let open Lwt_syntax in + let open Lwt_result_syntax in (* Assumed invariant: two merges cannot occur concurrently *) (* new_highest_cemented_block should be set, even after a merge 0 *) (* Take the lock on the chain_state to avoid concurrent updates *) Shared.locked_use chain_store.chain_state (fun chain_state -> - let* current_cementing_highwatermark = - Stored_data.get chain_state.cementing_highwatermark_data - in - match current_cementing_highwatermark with - | None -> - Stored_data.write - chain_state.cementing_highwatermark_data - (Some new_highest_cemented_level) - | Some current_cementing_highwatermark -> - if - Compare.Int32.( - current_cementing_highwatermark > new_highest_cemented_level) - then - (* Invariant error: should not happen but if it does, don't - mess anything by modifying the value. *) - return_ok_unit - else - Stored_data.write - chain_state.cementing_highwatermark_data - (Some new_highest_cemented_level)) + Lwt.finalize + (fun () -> + let*! () = lock_for_write chain_store.stored_data_lockfile in + let*! current_cementing_highwatermark = + Stored_data.get chain_state.cementing_highwatermark_data + in + let* () = + match current_cementing_highwatermark with + | None -> + Stored_data.write + chain_state.cementing_highwatermark_data + (Some new_highest_cemented_level) + | Some current_cementing_highwatermark -> + if + Compare.Int32.( + current_cementing_highwatermark + > new_highest_cemented_level) + then + (* Invariant error: should not happen but if it does, don't + mess anything by modifying the value. *) + return_unit + else + Stored_data.write + chain_state.cementing_highwatermark_data + (Some new_highest_cemented_level) + in + return_unit) + (fun () -> unlock chain_store.stored_data_lockfile)) let may_update_checkpoint_and_target chain_store ~new_head ~new_head_lfbl ~checkpoint ~target = @@ -1829,27 +1838,33 @@ module Chain = struct | Some h -> Lwt.return (h, new_cementing_highwatermark)) in let* () = - if Compare.Int32.(snd new_checkpoint > snd checkpoint) then - (* Remove potentially outdated invalid blocks if the - checkpoint changed *) - let* () = - Stored_data.update_with - chain_state.invalid_blocks_data - (fun invalid_blocks -> - Lwt.return - (Block_hash.Map.filter - (fun _k {level; _} -> level > snd new_checkpoint) - invalid_blocks)) - in - write_checkpoint chain_state new_checkpoint - else return_unit - in - (* Update values on disk but not the cementing highwatermark - which will be updated by the merge finalizer. *) - let* () = - Stored_data.write chain_state.current_head_data new_head_descr + Lwt.finalize + (fun () -> + let*! () = lock_for_write chain_store.stored_data_lockfile in + let* () = + if Compare.Int32.(snd new_checkpoint > snd checkpoint) then + (* Remove potentially outdated invalid blocks if the + checkpoint changed *) + let* () = + Stored_data.update_with + chain_state.invalid_blocks_data + (fun invalid_blocks -> + Lwt.return + (Block_hash.Map.filter + (fun _k {level; _} -> level > snd new_checkpoint) + invalid_blocks)) + in + write_checkpoint chain_state new_checkpoint + else return_unit + in + (* Update values on disk but not the cementing highwatermark + which will be updated by the merge finalizer. *) + let* () = + Stored_data.write chain_state.current_head_data new_head_descr + in + Stored_data.write chain_state.target_data new_target) + (fun () -> unlock chain_store.stored_data_lockfile) in - let* () = Stored_data.write chain_state.target_data new_target in (* Update live_data *) let* live_blocks, live_operations = locked_compute_live_blocks @@ -2114,7 +2129,7 @@ module Chain = struct let* forked_chains_data = Stored_data.load (Naming.forked_chains_file chain_dir) in - let*! current_head_hash, _ = Stored_data.get current_head_data in + let*! current_head_hash, _level = Stored_data.get current_head_data in let* o = Block_store.read_block ~read_metadata:true @@ -2184,7 +2199,10 @@ module Chain = struct let chain_state = Shared.create chain_state in let validated_block_watcher = Lwt_watcher.create_input () in let block_rpc_directories = Protocol_hash.Table.create 7 in - let* lockfile = create_lockfile chain_dir in + let* lockfile = create_lockfile Naming.lockfile chain_dir in + let* stored_data_lockfile = + create_lockfile Naming.stored_data_lockfile chain_dir + in let chain_store : chain_store = { global_store; @@ -2197,6 +2215,7 @@ module Chain = struct validated_block_watcher; block_rpc_directories; lockfile; + stored_data_lockfile; } in return chain_store @@ -2212,47 +2231,63 @@ module Chain = struct Stored_data.load (Naming.genesis_block_file chain_dir) in let*! genesis_block = Stored_data.get genesis_block_data in - let* block_store = - Block_store.load ?block_cache_limit chain_dir ~genesis_block ~readonly + let* block_store_lockfile = + create_lockfile Naming.block_store_lockfile chain_dir in - let* chain_state = load_chain_state chain_dir block_store in - let chain_state = Shared.create chain_state in - let validated_block_watcher = Lwt_watcher.create_input () in - let block_rpc_directories = Protocol_hash.Table.create 7 in - let* lockfile = create_lockfile chain_dir in - let chain_store = - { - global_store; - chain_id; - chain_dir; - chain_config; - (* let the state handle the test chain initialization *) - block_store; - chain_state; - genesis_block_data; - validated_block_watcher; - block_rpc_directories; - lockfile; - } + let* stored_data_lockfile = + create_lockfile Naming.stored_data_lockfile chain_dir in - (* Also initalize the live blocks *) - let*! head = current_head chain_store in - let*! o = Block.get_block_metadata_opt chain_store head in - match o with - | None -> tzfail Inconsistent_chain_store - | Some metadata -> - Shared.update_with chain_state (fun chain_state -> - let* live_blocks, live_operations = - locked_compute_live_blocks - ~force:true - ~update_cache:true - chain_store - chain_state - head - metadata - in - return - (Some {chain_state with live_blocks; live_operations}, chain_store)) + Lwt.finalize + (fun () -> + let*! () = Lwt_unix.lockf block_store_lockfile Unix.F_LOCK 0 in + let*! () = Lwt_unix.lockf stored_data_lockfile Unix.F_LOCK 0 in + let* block_store = + Block_store.load ?block_cache_limit chain_dir ~genesis_block ~readonly + in + let* chain_state = load_chain_state chain_dir block_store in + let chain_state = Shared.create chain_state in + let validated_block_watcher = Lwt_watcher.create_input () in + let block_rpc_directories = Protocol_hash.Table.create 7 in + let* lockfile = create_lockfile Naming.lockfile chain_dir in + let chain_store = + { + global_store; + chain_id; + chain_dir; + chain_config; + (* let the state handle the test chain initialization *) + block_store; + chain_state; + genesis_block_data; + validated_block_watcher; + block_rpc_directories; + lockfile; + stored_data_lockfile; + } + in + (* Also initalize the live blocks *) + let*! head = current_head chain_store in + let*! o = Block.get_block_metadata_opt chain_store head in + match o with + | None -> tzfail Inconsistent_chain_store + | Some metadata -> + Shared.update_with chain_state (fun chain_state -> + let* live_blocks, live_operations = + locked_compute_live_blocks + ~force:true + ~update_cache:true + chain_store + chain_state + head + metadata + in + return + ( Some {chain_state with live_blocks; live_operations}, + chain_store ))) + (fun () -> + let*! () = Lwt_unix.lockf stored_data_lockfile Unix.F_ULOCK 0 in + let*! () = Lwt_unix.lockf block_store_lockfile Unix.F_ULOCK 0 in + Lwt_unix.close block_store_lockfile) (* Recursively closes all test chain stores *) let close_chain_store chain_store = @@ -2380,14 +2415,20 @@ module Chain = struct history_mode in let* () = - Stored_data.update_with - chain_state.forked_chains_data - (fun forked_chains -> - Lwt.return - (Chain_id.Map.add - testchain_id - forked_block_hash - forked_chains)) + Lwt.finalize + (fun () -> + let*! () = + lock_for_write chain_store.stored_data_lockfile + in + Stored_data.update_with + chain_state.forked_chains_data + (fun forked_chains -> + Lwt.return + (Chain_id.Map.add + testchain_id + forked_block_hash + forked_chains))) + (fun () -> unlock chain_store.stored_data_lockfile) in let*! () = Store_events.(emit fork_testchain) @@ -2830,6 +2871,95 @@ let init ?patch_context ?commit_genesis ?history_mode ?(readonly = false) let*! () = Store_events.(emit end_init_store) () in return store +let get_chain_store store chain_id = + let chain_store = main_chain_store store in + let rec loop chain_store = + let open Lwt_result_syntax in + if Chain_id.equal (Chain.chain_id chain_store) chain_id then + return chain_store + else + Shared.use chain_store.chain_state (fun {active_testchain; _} -> + match active_testchain with + | None -> tzfail (Validation_errors.Unknown_chain chain_id) + | Some {testchain_store; _} -> loop testchain_store) + in + loop chain_store + +let sync ?(last_status = Block_store_status.create_idle_status) ~trigger_hash + (store : store) = + let open Lwt_result_syntax in + let*! () = Store_events.(emit start_store_sync) () in + let sync_start = Time.System.now () in + let main_chain_store = main_chain_store store in + let*! already_synced = Block.is_known main_chain_store trigger_hash in + let* store, current_status, cleanups = + if already_synced then + (* Nothing to do, the block is already known, and thus, the + store was already synchronized. *) + let*! () = Store_events.(emit store_already_sync) () in + return (store, last_status, fun () -> Lwt.return_unit) + else + let*! head_before_sync = Chain.current_head main_chain_store in + let store_dir = store.store_dir in + let chain_id = Chain_id.of_block_hash (genesis main_chain_store).block in + let chain_dir = Naming.chain_dir store_dir chain_id in + let* current_head_data = + Stored_data.load (Naming.current_head_file chain_dir) + in + let*! current_head_hash, _ = Stored_data.get current_head_data in + (* current_head_hash is availalbe in synchronized store *) + let* new_block_store, current_status, cleanups = + Block_store.sync ~last_status main_chain_store.block_store + in + let* new_chain_state = + if Block_store_status.equal last_status current_status then + (* When no merge occured since the last sync, we only need to + sync: + - current_head + - invalid_blocks + - protocols (if a new protocol is detected) *) + let*! () = Store_events.(emit store_quick_sync) () in + let* invalid_blocks_data = + Stored_data.load (Naming.invalid_blocks_file chain_dir) + in + let* chain_store = get_chain_store store chain_id in + let* current_head = Block.read_block chain_store current_head_hash in + Shared.use main_chain_store.chain_state (fun chain_state -> + let* protocol_levels_data = + if + Block.proto_level head_before_sync + = Block.proto_level current_head + then return chain_state.protocol_levels_data + else Stored_data.load (Naming.protocol_levels_file chain_dir) + in + return + { + chain_state with + current_head_data; + current_head; + protocol_levels_data; + invalid_blocks_data; + }) + else + (* Status has changed, synchronize everything. *) + let*! () = Store_events.(emit store_full_sync) () in + Chain.load_chain_state chain_dir new_block_store + in + let new_main_chain_store = + { + main_chain_store with + block_store = new_block_store; + chain_state = Shared.create new_chain_state; + } + in + store.main_chain_store <- Some new_main_chain_store ; + return (store, current_status, cleanups) + in + let sync_end = Time.System.now () in + let sync_time = Ptime.diff sync_end sync_start in + let*! () = Store_events.(emit end_store_sync) sync_time in + return (store, current_status, cleanups) + let close_store global_store = let open Lwt_syntax in Lwt_watcher.shutdown_input global_store.protocol_watcher ; @@ -2913,20 +3043,6 @@ let may_switch_history_mode ~store_dir ~context_dir genesis ~new_history_mode = let*! () = unlock chain_store.lockfile in close_store store) -let get_chain_store store chain_id = - let chain_store = main_chain_store store in - let rec loop chain_store = - let open Lwt_result_syntax in - if Chain_id.equal (Chain.chain_id chain_store) chain_id then - return chain_store - else - Shared.use chain_store.chain_state (fun {active_testchain; _} -> - match active_testchain with - | None -> tzfail (Validation_errors.Unknown_chain chain_id) - | Some {testchain_store; _} -> loop testchain_store) - in - loop chain_store - let get_chain_store_opt store chain_id = let open Lwt_syntax in let* r = get_chain_store store chain_id in @@ -3219,7 +3335,7 @@ module Unsafe = struct let store_dir = Naming.store_dir ~dir_path:store_dir in let chain_id = Chain_id.of_block_hash genesis.Genesis.block in let chain_dir = Naming.chain_dir store_dir chain_id in - let* lockfile = create_lockfile chain_dir in + let* lockfile = create_lockfile Naming.lockfile chain_dir in let*! is_locked = Lwt.catch (fun () -> diff --git a/src/lib_store/unix/store.mli b/src/lib_store/unix/store.mli index c36fe7eda230..3dd88e9e8037 100644 --- a/src/lib_store/unix/store.mli +++ b/src/lib_store/unix/store.mli @@ -223,6 +223,21 @@ val init : Genesis.t -> store tzresult Lwt.t +(** [sync ?last_status ~trigger_hash store] performs a store + synchronization to update all the data and file descriptors. This + is useful to keep track of a store opened in readonly mode that is + updated by another read/write instance. + [?last_status] gives a hint regarding the previous synchronization + to speed up the process. + [trigger_hash] corresponds to the last block that aims to be + stored in the store instance to synchronize -- it is typically set + as the last head to synchronize with. *) +val sync : + ?last_status:Block_store_status.t -> + trigger_hash:Block_hash.t -> + t -> + (t * Block_store_status.t * (unit -> unit Lwt.t)) tzresult Lwt.t + (** [main_chain_store global_store] returns the main chain store. *) val main_chain_store : store -> chain_store -- GitLab From e0abe8089bbfeff2469b188c65dff0796b2ed689 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Mon, 3 Jun 2024 10:11:49 +0200 Subject: [PATCH 02/10] Rpc_process/store: plug and use RPC process --- manifest/product_octez.ml | 3 + src/lib_rpc_process/directory.ml | 54 +++++++- src/lib_rpc_process/directory.mli | 10 +- src/lib_rpc_process/dune | 8 +- src/lib_rpc_process/forward_handler.ml | 4 +- src/lib_rpc_process/head_daemon.ml | 122 +++++++++++------- src/lib_rpc_process/main.ml | 25 ++-- ...process_event.ml => rpc_process_events.ml} | 47 ++++++- src/lib_shell/block_directory.ml | 39 ++++-- src/lib_shell/monitor_directory.ml | 15 ++- src/lib_shell/monitor_directory.mli | 2 +- tezt/tests/rpc_process.ml | 2 +- 12 files changed, 248 insertions(+), 83 deletions(-) rename src/lib_rpc_process/{rpc_process_event.ml => rpc_process_events.ml} (70%) diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index 2d73b5b17e42..bf071cfed414 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -4876,12 +4876,15 @@ let octez_rpc_process = octez_shell |> open_; octez_base_unix |> open_; octez_node_config |> open_; + octez_protocol_updater |> open_; octez_rpc_http |> open_; octez_rpc_http_server |> open_; octez_rpc_http_client_unix |> open_; octez_rpc_http_client |> open_; octez_shell_services; + octez_stdlib_unix |> open_; octez_store |> open_; + octez_store_shared |> open_; lwt_unix; lwt_exit; prometheus_app; diff --git a/src/lib_rpc_process/directory.ml b/src/lib_rpc_process/directory.ml index 3b8622151d4a..20404ee996bc 100644 --- a/src/lib_rpc_process/directory.ml +++ b/src/lib_rpc_process/directory.ml @@ -5,7 +5,32 @@ (* *) (*****************************************************************************) -let build_rpc_directory node_version config _store = +(* This is a wrapper on top of the monitor_head streamed RPC so that + the stream to monitor is handled by the RPC process itself. It + allows the RPC process to run some internal synchronization before + notifying data on this stream. + For example, as soon as a new block is advertized to the RPC + process, it synchronizes its internal state to make sure the new + block can be queried, and then notifies it on this stream. *) +let register_monitor_head dir (store : Store.t option ref) + (head_watcher : (Block_hash.t * Block_header.t) Lwt_watcher.input) = + let dir = ref dir in + let gen_register1 s f = + dir := Tezos_rpc.Directory.gen_register !dir s (fun ((), a) p q -> f a p q) + in + gen_register1 Tezos_shell_services.Monitor_services.S.heads (fun chain q () -> + let open Lwt_syntax in + let head_watcher = Lwt_watcher.create_stream head_watcher in + let* store = + match !store with + | Some store -> return (ref store) + | None -> Lwt.fail Not_found + in + Tezos_shell.Monitor_directory.monitor_head ~head_watcher store chain q) ; + !dir + +let build_rpc_directory node_version config dynamic_store + ~(head_watcher : (Block_hash.t * Block_header.t) Lwt_watcher.input) = let static_dir = Tezos_shell.Version_directory.rpc_directory node_version in let static_dir = Tezos_shell.Config_directory.build_rpc_directory_for_rpc_process @@ -22,6 +47,29 @@ let build_rpc_directory node_version config _store = Node_services.S.config (fun () () -> Lwt.return_ok config) in - Tezos_rpc.Directory.merge + let static_dir = + Tezos_rpc.Directory.merge + static_dir + (Tezos_shell.Health_directory.build_rpc_directory ()) + in + let static_dir = + register_monitor_head static_dir dynamic_store head_watcher + in + Tezos_rpc.Directory.register_dynamic_directory static_dir - (Tezos_shell.Health_directory.build_rpc_directory ()) + (Tezos_rpc.Path.subst1 Tezos_shell_services.Chain_services.path) + (fun ((), chain) -> + let dir = + Tezos_shell.Chain_directory.rpc_directory_without_validator () + in + let dir = + Tezos_rpc.Directory.map + (fun ((), _chain) -> + let store = !dynamic_store in + match store with + | None -> Lwt.fail Not_found + | Some store -> + Tezos_shell.Chain_directory.get_chain_store_exn store chain) + dir + in + Lwt.return dir) diff --git a/src/lib_rpc_process/directory.mli b/src/lib_rpc_process/directory.mli index 546fa8d0b97f..63157bfe2d55 100644 --- a/src/lib_rpc_process/directory.mli +++ b/src/lib_rpc_process/directory.mli @@ -5,12 +5,16 @@ (* *) (*****************************************************************************) -(** [build_rpc_directory node_version config store] builds the - Tezos RPC directory for the rpc process. RPCs handled here are not - forwarded to the node. +(** [build_rpc_directory node_version config dynamic_store + ~head_watcher] builds the Tezos RPC directory for the rpc + process. RPCs handled here are not forwarded to the node. + + [head_watcher] is the wrapped monitor_head stream on which the + clients will listen to. *) val build_rpc_directory : Tezos_version.Octez_node_version.t -> Octez_node_config.Config_file.t -> Store.t option ref -> + head_watcher:(Block_hash.t * Block_header.t) Lwt_watcher.input -> unit Tezos_rpc.Directory.t diff --git a/src/lib_rpc_process/dune b/src/lib_rpc_process/dune index 08ac42481b8f..a00988a7ec0c 100644 --- a/src/lib_rpc_process/dune +++ b/src/lib_rpc_process/dune @@ -10,12 +10,15 @@ octez-shell-libs.shell octez-libs.base.unix octez-node-config + octez-shell-libs.protocol-updater octez-libs.rpc-http octez-libs.rpc-http-server octez-libs.rpc-http-client-unix octez-libs.rpc-http-client octez-shell-libs.shell-services + octez-libs.stdlib-unix octez-shell-libs.store + octez-shell-libs.store.shared lwt.unix lwt-exit prometheus-app) @@ -26,8 +29,11 @@ -open Tezos_shell -open Tezos_base_unix -open Octez_node_config + -open Tezos_protocol_updater -open Tezos_rpc_http -open Tezos_rpc_http_server -open Tezos_rpc_http_client_unix -open Tezos_rpc_http_client - -open Tezos_store)) + -open Tezos_stdlib_unix + -open Tezos_store + -open Tezos_store_shared)) diff --git a/src/lib_rpc_process/forward_handler.ml b/src/lib_rpc_process/forward_handler.ml index 6ac0b9435071..73e8a5d8d96a 100644 --- a/src/lib_rpc_process/forward_handler.ml +++ b/src/lib_rpc_process/forward_handler.ml @@ -41,10 +41,10 @@ let callback ~acl server socket_path = in let forwarding_endpoint = Uri.of_string socket_forwarding_uri in let on_forwarding req = - Rpc_process_event.(emit forwarding_rpc (Cohttp.Request.resource req)) + Rpc_process_events.(emit forwarding_rpc (Cohttp.Request.resource req)) in let on_locally_handled req = - Rpc_process_event.(emit locally_handled_rpc (Cohttp.Request.resource req)) + Rpc_process_events.(emit locally_handled_rpc (Cohttp.Request.resource req)) in let ctx = build_socket_redirection_ctx socket_path in RPC_middleware.proxy_server_query_forwarder diff --git a/src/lib_rpc_process/head_daemon.ml b/src/lib_rpc_process/head_daemon.ml index 2e97a69a8856..33c5ff593528 100644 --- a/src/lib_rpc_process/head_daemon.ml +++ b/src/lib_rpc_process/head_daemon.ml @@ -7,45 +7,6 @@ open Parameters -module Events = struct - include Internal_event.Simple - - let section = ["octez_rpc_server"] - - let daemon_error = - declare_1 - ~section - ~name:"octez_rpc_server_daemon_error" - ~msg:"Daemon thrown an error: {error}" - ~level:Notice - ~pp1:Error_monad.pp_print_trace - ("error", Error_monad.trace_encoding) - - let new_head = - declare_1 - ~section - ~name:"new_head" - ~msg:"New head received at level ({level})" - ~level:Notice - ("level", Data_encoding.int32) - - let synchronized = - declare_1 - ~section - ~name:"synchronized" - ~msg:"Store synchronized up to level {level}" - ~level:Notice - ("level", Data_encoding.int32) - - let shutting_head_daemon = - declare_0 - ~section - ~name:"shutting_head_daemon" - ~msg:"shutting down head daemon" - ~level:Info - () -end - module Daemon = struct type t = { daemon : unit tzresult Lwt.t; @@ -70,7 +31,7 @@ module Daemon = struct let*! () = match processed_head with | Ok () -> Lwt.return_unit - | Error trace -> Events.(emit daemon_error) trace + | Error trace -> Rpc_process_events.(emit daemon_error) trace in stream_processor () in @@ -78,19 +39,73 @@ module Daemon = struct let shutdown {head_stream_stopper; _} = let open Lwt_syntax in - let* () = Events.(emit shutting_head_daemon) () in + let* () = Rpc_process_events.(emit shutting_head_daemon) () in head_stream_stopper () ; return_unit end -let handle_new_head _dynamic_store _parameters - (_block_hash, (header : Tezos_base.Block_header.t)) = +let init_store ~allow_testchains ~readonly parameters = + (* Invariant: the Store.init must be called after the Store.init of + the node is finished. Otherwise, it may create an race because of + the consistency checks. *) + let config = parameters.config in + let store_dir = Data_version.store_dir config.data_dir in + let context_dir = Data_version.context_dir config.data_dir in + Store.init + ?history_mode:config.shell.history_mode + ~store_dir + ~context_dir + ~allow_testchains + ~readonly + config.blockchain_network.genesis + +let sync_store (dynamic_store : Store.t option ref) last_status parameters + (block_hash, (header : Tezos_base.Block_header.t)) = let open Lwt_result_syntax in - let*! () = Events.(emit new_head) header.shell.level in - (* TODO: Synchronize the store *) + let block_level = header.shell.level in + let* () = + match !dynamic_store with + | Some store -> + let*! () = + Rpc_process_events.(emit start_synchronization) + (block_level, block_hash) + in + let* store, current_status, cleanups = + Store.sync ~last_status:!last_status ~trigger_hash:block_hash store + in + last_status := current_status ; + dynamic_store := Some store ; + let*! () = cleanups () in + let*! () = + Rpc_process_events.(emit synchronized) (block_hash, block_level) + in + return_unit + | None -> + let* store = + init_store ~allow_testchains:false ~readonly:true parameters + in + dynamic_store := Some store ; + return_unit + in return_unit -let init dynamic_store parameters = +let handle_new_head (dynamic_store : Store.t option ref) last_status parameters + (head_watcher : (Block_hash.t * Block_header.t) Lwt_watcher.input) + (block_hash, (header : Tezos_base.Block_header.t)) = + let open Lwt_result_syntax in + let block_level = header.shell.level in + let*! () = Rpc_process_events.(emit new_head) block_level in + let* () = + sync_store dynamic_store last_status parameters (block_hash, header) + in + (* The monitor_head wrapped stream used by clients is finally + notified. *) + Lwt_watcher.notify head_watcher (block_hash, header) ; + return_unit + +let init (dynamic_store : Store.t option ref) parameters + (stream : (Block_hash.t * Block_header.t) Lwt_watcher.input) = + let open Lwt_result_syntax in let ctx = Forward_handler.build_socket_redirection_ctx parameters.rpc_comm_socket_path in @@ -114,6 +129,19 @@ let init dynamic_store parameters = rpc_config (Media_type.Command_line.of_command_line rpc_config.media_type) in + let config = parameters.config in + let store_dir = Data_version.store_dir config.data_dir in + let store_directory = Naming.store_dir ~dir_path:store_dir in + let chain_id = + Chain_id.of_block_hash config.blockchain_network.genesis.Genesis.block + in + let chain_dir = Naming.chain_dir store_directory chain_id in + let status_file = Naming.block_store_status_file chain_dir in + let* stored_status = Stored_data.load status_file in + let*! initial_status = Stored_data.get stored_status in + let* store = init_store ~allow_testchains:false ~readonly:true parameters in + dynamic_store := Some store ; Daemon.make_stream_daemon - ~on_head:(handle_new_head dynamic_store parameters) + ~on_head: + (handle_new_head dynamic_store (ref initial_status) parameters stream) ~head_stream:(Tezos_shell_services.Monitor_services.heads rpc_ctxt `Main) diff --git a/src/lib_rpc_process/main.ml b/src/lib_rpc_process/main.ml index de14ad765f9d..90c551cc4292 100644 --- a/src/lib_rpc_process/main.ml +++ b/src/lib_rpc_process/main.ml @@ -66,7 +66,8 @@ let sanitize_cors_headers ~default headers = |> String.Set.(union (of_list default)) |> String.Set.elements -let launch_rpc_server dynamic_store (params : Parameters.t) (addr, port) = +let launch_rpc_server dynamic_store (params : Parameters.t) (addr, port) + head_watcher = let open Lwt_result_syntax in let media_types = params.config.rpc.media_type in let*! acl_policy = @@ -85,7 +86,7 @@ let launch_rpc_server dynamic_store (params : Parameters.t) (addr, port) = |> Option.value_f ~default:(fun () -> default addr) in let*! () = - Rpc_process_event.(emit starting_rpc_server) + Rpc_process_events.(emit starting_rpc_server) (host, port, params.config.rpc.tls <> None, RPC_server.Acl.policy_type acl) in let cors_headers = @@ -105,6 +106,7 @@ let launch_rpc_server dynamic_store (params : Parameters.t) (addr, port) = params.node_version params.config dynamic_store + ~head_watcher in let server = RPC_server.init_server @@ -135,7 +137,7 @@ let launch_rpc_server dynamic_store (params : Parameters.t) (addr, port) = tzfail (RPC_Process_Port_already_in_use [(addr, port)]) | exn -> fail_with_exn exn) -let init_rpc dynamic_store parameters = +let init_rpc dynamic_store parameters stream = let open Lwt_result_syntax in let* server = let* p2p_point = @@ -150,7 +152,7 @@ let init_rpc dynamic_store parameters = assert false in match p2p_point with - | [point] -> launch_rpc_server dynamic_store parameters point + | [point] -> launch_rpc_server dynamic_store parameters point stream | _ -> (* Same as above: only one p2p_point is expected here. *) assert false @@ -194,16 +196,21 @@ let run socket_dir = ~config:parameters.Parameters.internal_events () in + (* Updater needs to be initialized to be able to read the protocol + sources from the store when a protocol is injected and + compiled. *) + Updater.init (Data_version.protocol_dir parameters.config.data_dir) ; + let head_watcher = Lwt_watcher.create_input () in let dynamic_store : Store.t option ref = ref None in - let* () = init_rpc dynamic_store parameters in - (* Send the config ack as synchronisation barrier for the init_rpc - phase. *) - let* () = Socket.send init_socket_fd Data_encoding.unit () in - let* daemon = Head_daemon.init dynamic_store parameters in + let* daemon = Head_daemon.init dynamic_store parameters head_watcher in let (_ccid : Lwt_exit.clean_up_callback_id) = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> Head_daemon.Daemon.shutdown daemon) in + let* () = init_rpc dynamic_store parameters head_watcher in + (* Send the config ack as synchronisation barrier for the init_rpc + phase. *) + let* () = Socket.send init_socket_fd Data_encoding.unit () in Lwt_utils.never_ending () let process socket_dir = diff --git a/src/lib_rpc_process/rpc_process_event.ml b/src/lib_rpc_process/rpc_process_events.ml similarity index 70% rename from src/lib_rpc_process/rpc_process_event.ml rename to src/lib_rpc_process/rpc_process_events.ml index bd23c03fcae9..83d8b14c5612 100644 --- a/src/lib_rpc_process/rpc_process_event.ml +++ b/src/lib_rpc_process/rpc_process_events.ml @@ -25,7 +25,7 @@ include Internal_event.Simple -let section = ["rpc-process"] +let section = ["rpc"; "process"] let starting_rpc_server = declare_4 @@ -53,3 +53,48 @@ let locally_handled_rpc = ~msg:"locally handled: {uri}" ~level:Debug ("uri", Data_encoding.string) + +let daemon_error = + declare_1 + ~section + ~name:"octez_rpc_server_daemon_error" + ~msg:"Daemon thrown an error: {error}" + ~level:Notice + ~pp1:Error_monad.pp_print_trace + ("error", Error_monad.trace_encoding) + +let new_head = + declare_1 + ~section + ~name:"new_head" + ~msg:"New head received at level ({level})" + ~level:Notice + ("level", Data_encoding.int32) + +let start_synchronization = + declare_2 + ~section + ~name:"start_synchronization" + ~msg:"Starting store synchronization for block {level} ({hash})" + ~level:Notice + ("level", Data_encoding.int32) + ~pp2:Block_hash.pp_short + ("hash", Block_hash.encoding) + +let shutting_head_daemon = + declare_0 + ~section + ~name:"shutting_head_daemon" + ~msg:"shutting down head daemon" + ~level:Info + () + +let synchronized = + declare_2 + ~section + ~name:"synchronized" + ~msg:"Store synchronized on head {hash} ({level})" + ~level:Notice + ~pp1:Block_hash.pp_short + ("hash", Block_hash.encoding) + ("level", Data_encoding.int32) diff --git a/src/lib_shell/block_directory.ml b/src/lib_shell/block_directory.ml index cce2295cafe2..1ff41022c1cc 100644 --- a/src/lib_shell/block_directory.ml +++ b/src/lib_shell/block_directory.ml @@ -853,19 +853,34 @@ let build_raw_rpc_directory_without_validator proto_services) ; !dir -let get_protocol hash = +let get_protocol ?(allow_retry = false) chain_store hash = + let open Lwt_syntax in match Registered_protocol.get hash with - | None -> raise Not_found - | Some protocol -> protocol + | None when allow_retry -> ( + let store = Store.Chain.global_store chain_store in + let* proto = Store.Protocol.read store hash in + match proto with + | Some proto -> ( + (* After the injection of a protocol, the rpc_process is not + aware of that newly registered protocol. We need to try, + at least once, to compile and register that new + protocol. *) + let* (_ : bool) = Updater.compile hash proto in + match Registered_protocol.get hash with + | Some protocol -> return protocol + | None -> Lwt.fail Not_found) + | None -> Lwt.fail Not_found) + | None -> Lwt.fail Not_found + | Some protocol -> return protocol -let load_proto chain_store block (module Next_proto : Registered_protocol.T) = +let get_proto_hash chain_store block ~next_protocol_hash = let open Lwt_syntax in let* o = Store.Block.read_predecessor_opt chain_store block in match o with | None -> (* No predecessors (e.g. pruned caboose), return the current protocol *) - Lwt.return (module Next_proto : Registered_protocol.T) + return next_protocol_hash | Some pred -> let* _, savepoint_level = Store.Chain.savepoint chain_store in let* protocol_hash = @@ -881,7 +896,7 @@ let load_proto chain_store block (module Next_proto : Registered_protocol.T) = Lwt.return protocol_hash else Store.Block.protocol_hash_exn chain_store pred in - Lwt.return (get_protocol protocol_hash) + return protocol_hash let get_directory_with_validator chain_store block = let open Lwt_syntax in @@ -892,7 +907,7 @@ let get_directory_with_validator chain_store block = let* next_protocol_hash = Store.Block.protocol_hash_exn chain_store block in - let (module Next_proto) = get_protocol next_protocol_hash in + let* (module Next_proto) = get_protocol chain_store next_protocol_hash in if Store.Block.is_genesis chain_store (Store.Block.hash block) then let dir = build_raw_rpc_directory_without_validator @@ -907,7 +922,8 @@ let get_directory_with_validator chain_store block = Lwt.return (Tezos_rpc.Directory.merge dir dir_with_validator) else let* (module Proto) = - load_proto chain_store block (module Next_proto) + let* h = get_proto_hash chain_store block ~next_protocol_hash in + get_protocol chain_store h in let* o = Store.Chain.get_rpc_directory chain_store block in match o with @@ -944,7 +960,9 @@ let get_directory_without_validator chain_store block = let* next_protocol_hash = Store.Block.protocol_hash_exn chain_store block in - let (module Next_proto) = get_protocol next_protocol_hash in + let* (module Next_proto) = + get_protocol chain_store next_protocol_hash ~allow_retry:true + in if Store.Block.is_genesis chain_store (Store.Block.hash block) then let dir = build_raw_rpc_directory_without_validator @@ -954,7 +972,8 @@ let get_directory_without_validator chain_store block = Lwt.return dir else let* (module Proto) = - load_proto chain_store block (module Next_proto) + let* h = get_proto_hash chain_store block ~next_protocol_hash in + get_protocol chain_store h ~allow_retry:true in let* o = Store.Chain.get_rpc_directory chain_store block in match o with diff --git a/src/lib_shell/monitor_directory.ml b/src/lib_shell/monitor_directory.ml index 6f8513d4ef80..10653c524a9d 100644 --- a/src/lib_shell/monitor_directory.ml +++ b/src/lib_shell/monitor_directory.ml @@ -26,16 +26,21 @@ let monitor_head ~(head_watcher : - (Block_hash.t * Block_header.t) Lwt_stream.t * Lwt_watcher.stopper) store - chain q = + (Block_hash.t * Block_header.t) Lwt_stream.t * Lwt_watcher.stopper) + (store : Store.t ref) chain q = let open Lwt_syntax in - let* chain_store = Chain_directory.get_chain_store_exn store chain in + let* chain_store = Chain_directory.get_chain_store_exn !store chain in let block_stream, stopper = head_watcher in let* head = Store.Chain.current_head chain_store in let shutdown () = Lwt_watcher.shutdown stopper in let within_protocols header = let find_protocol protocol_level = - let+ p = Store.Chain.find_protocol chain_store ~protocol_level in + (* Here, we need to resolve the given store ref to make sure + that we are accessing the latest store value. *) + let* updated_chain_store = + Chain_directory.get_chain_store_exn !store chain + in + let+ p = Store.Chain.find_protocol updated_chain_store ~protocol_level in WithExceptions.Option.to_exn ~none: (Failure (Format.sprintf "Cannot find protocol %d" protocol_level)) @@ -263,7 +268,7 @@ let build_rpc_directory ~(commit_info : Octez_node_version.commit_info) | Error _ -> Lwt.fail Not_found | Ok chain_validator -> let head_watcher = Chain_validator.new_head_watcher chain_validator in - monitor_head ~head_watcher store chain q) ; + monitor_head ~head_watcher (ref store) chain q) ; gen_register0 Monitor_services.S.protocols (fun () () -> let stream, stopper = Store.Protocol.protocol_watcher store in let shutdown () = Lwt_watcher.shutdown stopper in diff --git a/src/lib_shell/monitor_directory.mli b/src/lib_shell/monitor_directory.mli index 9c828b1a6ad3..ff8ed05a9b71 100644 --- a/src/lib_shell/monitor_directory.mli +++ b/src/lib_shell/monitor_directory.mli @@ -26,7 +26,7 @@ val monitor_head : head_watcher: (Block_hash.t * Block_header.t) Lwt_stream.t * Lwt_watcher.stopper -> - Store.t -> + Store.t ref -> Chain_services.chain -> < next_protocols : Protocol_hash.t trace ; protocols : Protocol_hash.t trace diff --git a/tezt/tests/rpc_process.ml b/tezt/tests/rpc_process.ml index cc1acf4e9c9f..f4b1a7477d23 100644 --- a/tezt/tests/rpc_process.ml +++ b/tezt/tests/rpc_process.ml @@ -104,7 +104,7 @@ let test_forward = let* node, client = Client.init_with_protocol ~rpc_external:true - ~event_sections_levels:[("rpc-process", `Debug)] + ~event_sections_levels:[("rpc.process", `Debug)] ~protocol `Client () -- GitLab From bc5654b559ee7cae15b328c6de7096f05a734260 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Thu, 2 Nov 2023 12:18:49 +0100 Subject: [PATCH 03/10] Rpc_process: follow applied blocks --- src/lib_rpc_process/directory.ml | 36 ++++- src/lib_rpc_process/directory.mli | 10 +- src/lib_rpc_process/head_daemon.ml | 172 +++++++++++++++++----- src/lib_rpc_process/main.ml | 26 +++- src/lib_rpc_process/rpc_process_events.ml | 20 ++- src/lib_shell/monitor_directory.ml | 119 ++++++++------- src/lib_shell/monitor_directory.mli | 11 ++ src/lib_store/mocked/store.ml | 3 +- src/lib_store/store.mli | 6 +- src/lib_store/unix/store.ml | 124 ++++++++-------- src/lib_store/unix/store.mli | 6 +- 11 files changed, 354 insertions(+), 179 deletions(-) diff --git a/src/lib_rpc_process/directory.ml b/src/lib_rpc_process/directory.ml index 20404ee996bc..06b378268eed 100644 --- a/src/lib_rpc_process/directory.ml +++ b/src/lib_rpc_process/directory.ml @@ -29,8 +29,39 @@ let register_monitor_head dir (store : Store.t option ref) Tezos_shell.Monitor_directory.monitor_head ~head_watcher store chain q) ; !dir +type applied_watcher_kind = + | Empty + | Filled of (Store.chain_store * Store.Block.t) Lwt_watcher.input + +let register_monitor_applied_blocks dir + (applied_blocks_watcher : applied_watcher_kind ref) = + let dir = ref dir in + let gen_register0 s f = + dir := Tezos_rpc.Directory.gen_register !dir s (fun () p q -> f p q) + in + gen_register0 + Tezos_shell_services.Monitor_services.S.applied_blocks + (fun q () -> + let open Lwt_syntax in + let* applied_blocks_watcher = + match !applied_blocks_watcher with + | Filled v -> return v + | Empty -> + (* The applied_blocks_watcher is initialized only if it is + requested at least once. *) + let watcher = Lwt_watcher.create_input () in + applied_blocks_watcher := Filled watcher ; + return watcher + in + let applied_blocks_watcher = + Lwt_watcher.create_stream applied_blocks_watcher + in + Tezos_shell.Monitor_directory.applied_blocks ~applied_blocks_watcher q) ; + !dir + let build_rpc_directory node_version config dynamic_store - ~(head_watcher : (Block_hash.t * Block_header.t) Lwt_watcher.input) = + ~(head_watcher : (Block_hash.t * Block_header.t) Lwt_watcher.input) + ~(applied_blocks_watcher : applied_watcher_kind ref) = let static_dir = Tezos_shell.Version_directory.rpc_directory node_version in let static_dir = Tezos_shell.Config_directory.build_rpc_directory_for_rpc_process @@ -55,6 +86,9 @@ let build_rpc_directory node_version config dynamic_store let static_dir = register_monitor_head static_dir dynamic_store head_watcher in + let static_dir = + register_monitor_applied_blocks static_dir applied_blocks_watcher + in Tezos_rpc.Directory.register_dynamic_directory static_dir (Tezos_rpc.Path.subst1 Tezos_shell_services.Chain_services.path) diff --git a/src/lib_rpc_process/directory.mli b/src/lib_rpc_process/directory.mli index 63157bfe2d55..8a16294ae3b7 100644 --- a/src/lib_rpc_process/directory.mli +++ b/src/lib_rpc_process/directory.mli @@ -5,16 +5,24 @@ (* *) (*****************************************************************************) +type applied_watcher_kind = + | Empty + | Filled of (Store.chain_store * Store.Block.t) Lwt_watcher.input + (** [build_rpc_directory node_version config dynamic_store - ~head_watcher] builds the Tezos RPC directory for the rpc + ~head_watcher ~applied_blocks_watcher] builds the Tezos RPC directory for the rpc process. RPCs handled here are not forwarded to the node. [head_watcher] is the wrapped monitor_head stream on which the clients will listen to. + + [applied_blocks_watcher] is similar to [head_watcher] but for the + applied_blocks RPC. *) val build_rpc_directory : Tezos_version.Octez_node_version.t -> Octez_node_config.Config_file.t -> Store.t option ref -> head_watcher:(Block_hash.t * Block_header.t) Lwt_watcher.input -> + applied_blocks_watcher:applied_watcher_kind ref -> unit Tezos_rpc.Directory.t diff --git a/src/lib_rpc_process/head_daemon.ml b/src/lib_rpc_process/head_daemon.ml index 33c5ff593528..959c1e1ac1d7 100644 --- a/src/lib_rpc_process/head_daemon.ml +++ b/src/lib_rpc_process/head_daemon.ml @@ -11,42 +11,109 @@ module Daemon = struct type t = { daemon : unit tzresult Lwt.t; head_stream_stopper : Tezos_rpc.Context.stopper; + applied_block_stream_stopper : Tezos_rpc.Context.stopper; } - (** [make_stream_daemon ~on_head ~head_stream] calls [on_head] on - each newly received value from [head_stream]. + (** [fair_lwt_stream_get push s1 s2] aims to get the value available + from [s1] and [s2] and [push] them to a stream, so that, all the + values pushed to that stream are interleaved to preserve some + fairness. *) + let fair_lwt_stream_get push s1 s2 = + let s1l = Lwt_stream.get_available s1 in + let s2l = Lwt_stream.get_available s2 in + match (s1l, s2l) with + | [], [] -> () (* assert false *) + | l1, l2 -> + Seq.iter + (fun v -> push (Some v)) + (Seq.interleave (List.to_seq l1) (List.to_seq l2)) + (** [make_stream_daemon ~on_head ~on_applied_block ~head_stream + ~applied_block_stream] calls [on_head] or [on_applied_block] + depending on the value received from the stream composed of + [head_stream] and [applied_block_stream]. The composed stream is + interleaved for fairness. It returns a couple [(p, stopper)] where [p] is a promise resolving when the stream closes and [stopper] is a function closing the stream. *) - let make_stream_daemon ~on_head ~head_stream = + let make_stream_daemon ~on_head ~on_applied_block + ~(head_stream : + ((Block_hash.t * Block_header.t) Lwt_stream.t + * Tezos_rpc.Context.stopper) + tzresult + Lwt.t) + ~(applied_block_stream : + ((Chain_id.t * Block_hash.t * Block_header.t * Operation.t trace trace) + Lwt_stream.t + * Tezos_rpc.Context.stopper) + tzresult + Lwt.t) = let open Lwt_result_syntax in + let master_stream, push = Lwt_stream.create () in let* head_stream, head_stream_stopper = head_stream in + let head_stream = Lwt_stream.map (fun v -> (`Head, v)) head_stream in + let* applied_block_stream, applied_block_stream_stopper = + applied_block_stream + in + let applied_block_stream = + Lwt_stream.map + (fun (_, hash, header, _) -> (`Applied, (hash, header))) + applied_block_stream + in + let rec stream_aggregator () = + let*! block_element = + Lwt.choose + [Lwt_stream.peek head_stream; Lwt_stream.peek applied_block_stream] + in + let*! () = Lwt.pause () in + match block_element with + | None -> stream_aggregator () + | Some _ -> + fair_lwt_stream_get push head_stream applied_block_stream ; + stream_aggregator () + in + let _ = stream_aggregator () in let rec stream_processor () = - let*! head_element = Lwt_stream.get head_stream in - match head_element with + let*! block_element = Lwt_stream.get master_stream in + match block_element with | None -> return_unit - | Some element -> - let*! processed_head = on_head element in + | Some (`Head, v) -> + let*! processed_head = on_head v in let*! () = match processed_head with | Ok () -> Lwt.return_unit | Error trace -> Rpc_process_events.(emit daemon_error) trace in stream_processor () + | Some (`Applied, v) -> + let*! processed_block = on_applied_block v in + let*! () = + match processed_block with + | Ok () -> Lwt.return_unit + | Error trace -> + let*! () = Rpc_process_events.(emit daemon_error) trace in + Lwt.return_unit + in + stream_processor () in - return {daemon = stream_processor (); head_stream_stopper} + return + { + daemon = stream_processor (); + head_stream_stopper; + applied_block_stream_stopper; + } - let shutdown {head_stream_stopper; _} = + let shutdown {head_stream_stopper; applied_block_stream_stopper; _} = let open Lwt_syntax in let* () = Rpc_process_events.(emit shutting_head_daemon) () in head_stream_stopper () ; + applied_block_stream_stopper () ; return_unit end let init_store ~allow_testchains ~readonly parameters = (* Invariant: the Store.init must be called after the Store.init of - the node is finished. Otherwise, it may create an race because of + the node is finished. Otherwise, it may create a race because of the consistency checks. *) let config = parameters.config in let store_dir = Data_version.store_dir config.data_dir in @@ -63,31 +130,25 @@ let sync_store (dynamic_store : Store.t option ref) last_status parameters (block_hash, (header : Tezos_base.Block_header.t)) = let open Lwt_result_syntax in let block_level = header.shell.level in - let* () = - match !dynamic_store with - | Some store -> - let*! () = - Rpc_process_events.(emit start_synchronization) - (block_level, block_hash) - in - let* store, current_status, cleanups = - Store.sync ~last_status:!last_status ~trigger_hash:block_hash store - in - last_status := current_status ; - dynamic_store := Some store ; - let*! () = cleanups () in - let*! () = - Rpc_process_events.(emit synchronized) (block_hash, block_level) - in - return_unit - | None -> - let* store = - init_store ~allow_testchains:false ~readonly:true parameters - in - dynamic_store := Some store ; - return_unit - in - return_unit + match !dynamic_store with + | Some store -> + let*! () = + Rpc_process_events.(emit start_synchronization) (block_level, block_hash) + in + let* store, current_status, cleanups = + Store.sync ~last_status:!last_status store + in + last_status := current_status ; + dynamic_store := Some store ; + let*! () = cleanups () in + let*! () = Rpc_process_events.(emit store_synchronized) () in + return store + | None -> + let* store = + init_store ~allow_testchains:false ~readonly:true parameters + in + dynamic_store := Some store ; + return store let handle_new_head (dynamic_store : Store.t option ref) last_status parameters (head_watcher : (Block_hash.t * Block_header.t) Lwt_watcher.input) @@ -95,16 +156,38 @@ let handle_new_head (dynamic_store : Store.t option ref) last_status parameters let open Lwt_result_syntax in let block_level = header.shell.level in let*! () = Rpc_process_events.(emit new_head) block_level in - let* () = + let* (_ : Store.t) = sync_store dynamic_store last_status parameters (block_hash, header) in + let*! () = + Rpc_process_events.(emit store_synchronized_on_head) + (block_hash, block_level) + in (* The monitor_head wrapped stream used by clients is finally notified. *) Lwt_watcher.notify head_watcher (block_hash, header) ; return_unit +let handle_new_applied_block (dynamic_store : Store.t option ref) last_status + parameters (applied_block_watcher : Directory.applied_watcher_kind ref) + (block_hash, (header : Tezos_base.Block_header.t)) = + let open Lwt_result_syntax in + match !applied_block_watcher with + | Empty -> return_unit + | Filled w -> + let block_level = header.shell.level in + let*! () = Rpc_process_events.(emit new_applied_block) block_level in + let* store = + sync_store dynamic_store last_status parameters (block_hash, header) + in + let chain_store = Store.main_chain_store store in + let* block = Store.Block.read_block chain_store block_hash in + Lwt_watcher.notify w (chain_store, block) ; + return_unit + let init (dynamic_store : Store.t option ref) parameters - (stream : (Block_hash.t * Block_header.t) Lwt_watcher.input) = + (head_watcher : (Block_hash.t * Block_header.t) Lwt_watcher.input) + (applied_block_watcher : Directory.applied_watcher_kind ref) = let open Lwt_result_syntax in let ctx = Forward_handler.build_socket_redirection_ctx parameters.rpc_comm_socket_path @@ -140,8 +223,21 @@ let init (dynamic_store : Store.t option ref) parameters let* stored_status = Stored_data.load status_file in let*! initial_status = Stored_data.get stored_status in let* store = init_store ~allow_testchains:false ~readonly:true parameters in + let _ = Tezos_shell_services.Monitor_services.applied_blocks rpc_ctxt () in dynamic_store := Some store ; Daemon.make_stream_daemon ~on_head: - (handle_new_head dynamic_store (ref initial_status) parameters stream) + (handle_new_head + dynamic_store + (ref initial_status) + parameters + head_watcher) ~head_stream:(Tezos_shell_services.Monitor_services.heads rpc_ctxt `Main) + ~on_applied_block: + (handle_new_applied_block + dynamic_store + (ref initial_status) + parameters + applied_block_watcher) + ~applied_block_stream: + (Tezos_shell_services.Monitor_services.applied_blocks rpc_ctxt ()) diff --git a/src/lib_rpc_process/main.ml b/src/lib_rpc_process/main.ml index 90c551cc4292..08bb142e0c03 100644 --- a/src/lib_rpc_process/main.ml +++ b/src/lib_rpc_process/main.ml @@ -67,7 +67,7 @@ let sanitize_cors_headers ~default headers = |> String.Set.elements let launch_rpc_server dynamic_store (params : Parameters.t) (addr, port) - head_watcher = + head_watcher applied_blocks_watcher = let open Lwt_result_syntax in let media_types = params.config.rpc.media_type in let*! acl_policy = @@ -107,6 +107,7 @@ let launch_rpc_server dynamic_store (params : Parameters.t) (addr, port) params.config dynamic_store ~head_watcher + ~applied_blocks_watcher in let server = RPC_server.init_server @@ -137,7 +138,7 @@ let launch_rpc_server dynamic_store (params : Parameters.t) (addr, port) tzfail (RPC_Process_Port_already_in_use [(addr, port)]) | exn -> fail_with_exn exn) -let init_rpc dynamic_store parameters stream = +let init_rpc dynamic_store parameters head_watcher applied_blocks_watcher = let open Lwt_result_syntax in let* server = let* p2p_point = @@ -152,7 +153,13 @@ let init_rpc dynamic_store parameters stream = assert false in match p2p_point with - | [point] -> launch_rpc_server dynamic_store parameters point stream + | [point] -> + launch_rpc_server + dynamic_store + parameters + point + head_watcher + applied_blocks_watcher | _ -> (* Same as above: only one p2p_point is expected here. *) assert false @@ -202,12 +209,21 @@ let run socket_dir = Updater.init (Data_version.protocol_dir parameters.config.data_dir) ; let head_watcher = Lwt_watcher.create_input () in let dynamic_store : Store.t option ref = ref None in - let* daemon = Head_daemon.init dynamic_store parameters head_watcher in + let applied_blocks_watcher = ref Directory.Empty in + let* daemon = + Head_daemon.init + dynamic_store + parameters + head_watcher + applied_blocks_watcher + in let (_ccid : Lwt_exit.clean_up_callback_id) = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> Head_daemon.Daemon.shutdown daemon) in - let* () = init_rpc dynamic_store parameters head_watcher in + let* () = + init_rpc dynamic_store parameters head_watcher applied_blocks_watcher + in (* Send the config ack as synchronisation barrier for the init_rpc phase. *) let* () = Socket.send init_socket_fd Data_encoding.unit () in diff --git a/src/lib_rpc_process/rpc_process_events.ml b/src/lib_rpc_process/rpc_process_events.ml index 83d8b14c5612..fb26989ae639 100644 --- a/src/lib_rpc_process/rpc_process_events.ml +++ b/src/lib_rpc_process/rpc_process_events.ml @@ -71,6 +71,14 @@ let new_head = ~level:Notice ("level", Data_encoding.int32) +let new_applied_block = + declare_1 + ~section + ~name:"new_applied_block" + ~msg:"New applied block received ({level})" + ~level:Notice + ("level", Data_encoding.int32) + let start_synchronization = declare_2 ~section @@ -89,10 +97,18 @@ let shutting_head_daemon = ~level:Info () -let synchronized = +let store_synchronized = + declare_0 + ~section + ~name:"store_synchronized" + ~msg:"Store synchronized" + ~level:Notice + () + +let store_synchronized_on_head = declare_2 ~section - ~name:"synchronized" + ~name:"store_synchronized_on_head" ~msg:"Store synchronized on head {hash} ({level})" ~level:Notice ~pp1:Block_hash.pp_short diff --git a/src/lib_shell/monitor_directory.ml b/src/lib_shell/monitor_directory.ml index 10653c524a9d..9632417b23ed 100644 --- a/src/lib_shell/monitor_directory.ml +++ b/src/lib_shell/monitor_directory.ml @@ -96,6 +96,66 @@ let monitor_head in Tezos_rpc.Answer.return_stream {next; shutdown} +let applied_blocks + ~(applied_blocks_watcher : + (Store.chain_store * Store.Block.t) Lwt_stream.t * Lwt_watcher.stopper) q + = + let open Lwt_syntax in + let block_stream, stopper = applied_blocks_watcher in + let shutdown () = Lwt_watcher.shutdown stopper in + let in_chains (chain_store, _block) = + match q#chains with + | [] -> Lwt.return_true + | chains -> + let that_chain_id = Store.Chain.chain_id chain_store in + List.exists_p + (fun chain -> + let store = Store.Chain.global_store chain_store in + let+ o = Chain_directory.get_chain_id_opt store chain in + match o with + | None -> false + | Some this_chain_id -> Chain_id.equal this_chain_id that_chain_id) + chains + in + let in_protocols (chain_store, block) = + match q#protocols with + | [] -> Lwt.return_true + | protocols -> ( + let* o = Store.Block.read_predecessor_opt chain_store block in + match o with + | None -> Lwt.return_false (* won't happen *) + | Some pred -> + let* context = Store.Block.context_exn chain_store pred in + let* protocol = Context_ops.get_protocol context in + Lwt.return (List.exists (Protocol_hash.equal protocol) protocols)) + in + let in_next_protocols (chain_store, block) = + match q#next_protocols with + | [] -> Lwt.return_true + | protocols -> + let* context = Store.Block.context_exn chain_store block in + let* next_protocol = Context_ops.get_protocol context in + Lwt.return (List.exists (Protocol_hash.equal next_protocol) protocols) + in + let stream = + Lwt_stream.filter_map_s + (fun ((chain_store, block) as elt) -> + let* in_chains = in_chains elt in + let* in_next_protocols = in_next_protocols elt in + let* in_protocols = in_protocols elt in + if in_chains && in_protocols && in_next_protocols then + let chain_id = Store.Chain.chain_id chain_store in + Lwt.return_some + ( chain_id, + Store.Block.hash block, + Store.Block.header block, + Store.Block.operations block ) + else Lwt.return_none) + block_stream + in + let next () = Lwt_stream.get stream in + Tezos_rpc.Answer.return_stream {next; shutdown} + let build_rpc_directory ~(commit_info : Octez_node_version.commit_info) validator mainchain_validator = let open Lwt_syntax in @@ -136,62 +196,9 @@ let build_rpc_directory ~(commit_info : Octez_node_version.commit_info) let shutdown () = Lwt_watcher.shutdown stopper in Tezos_rpc.Answer.return_stream {next; shutdown}) ; gen_register0 Monitor_services.S.applied_blocks (fun q () -> - let block_stream, stopper = Store.global_block_watcher store in - let shutdown () = Lwt_watcher.shutdown stopper in - let in_chains (chain_store, _block) = - match q#chains with - | [] -> Lwt.return_true - | chains -> - let that_chain_id = Store.Chain.chain_id chain_store in - List.exists_p - (fun chain -> - let+ o = Chain_directory.get_chain_id_opt store chain in - match o with - | None -> false - | Some this_chain_id -> - Chain_id.equal this_chain_id that_chain_id) - chains - in - let in_protocols (chain_store, block) = - match q#protocols with - | [] -> Lwt.return_true - | protocols -> ( - let* o = Store.Block.read_predecessor_opt chain_store block in - match o with - | None -> Lwt.return_false (* won't happen *) - | Some pred -> - let* context = Store.Block.context_exn chain_store pred in - let* protocol = Context_ops.get_protocol context in - Lwt.return - (List.exists (Protocol_hash.equal protocol) protocols)) - in - let in_next_protocols (chain_store, block) = - match q#next_protocols with - | [] -> Lwt.return_true - | protocols -> - let* context = Store.Block.context_exn chain_store block in - let* next_protocol = Context_ops.get_protocol context in - Lwt.return - (List.exists (Protocol_hash.equal next_protocol) protocols) - in - let stream = - Lwt_stream.filter_map_s - (fun ((chain_store, block) as elt) -> - let* in_chains = in_chains elt in - let* in_next_protocols = in_next_protocols elt in - let* in_protocols = in_protocols elt in - if in_chains && in_protocols && in_next_protocols then - let chain_id = Store.Chain.chain_id chain_store in - Lwt.return_some - ( chain_id, - Store.Block.hash block, - Store.Block.header block, - Store.Block.operations block ) - else Lwt.return_none) - block_stream - in - let next () = Lwt_stream.get stream in - Tezos_rpc.Answer.return_stream {next; shutdown}) ; + applied_blocks + ~applied_blocks_watcher:(Store.global_block_watcher store) + q) ; gen_register0 Monitor_services.S.validated_blocks (fun q () -> let* chains = match q#chains with diff --git a/src/lib_shell/monitor_directory.mli b/src/lib_shell/monitor_directory.mli index ff8ed05a9b71..009ec53cff78 100644 --- a/src/lib_shell/monitor_directory.mli +++ b/src/lib_shell/monitor_directory.mli @@ -33,6 +33,17 @@ val monitor_head : ; .. > -> (Block_hash.t * Block_header.t) Tezos_rpc.Answer.t Lwt.t +val applied_blocks : + applied_blocks_watcher: + (Store.chain_store * Store.Block.t) Lwt_stream.t * Lwt_watcher.stopper -> + < chains : Chain_services.chain trace + ; next_protocols : Protocol_hash.t trace + ; protocols : Protocol_hash.t trace + ; .. > -> + (Chain_id.t * Block_hash.t * Block_header.t * Operation.t trace trace) + Tezos_rpc.Answer.t + Lwt.t + val build_rpc_directory : commit_info:Octez_node_version.commit_info -> Validator.t -> diff --git a/src/lib_store/mocked/store.ml b/src/lib_store/mocked/store.ml index 4570a75a2f34..b823548d3932 100644 --- a/src/lib_store/mocked/store.ml +++ b/src/lib_store/mocked/store.ml @@ -1874,8 +1874,7 @@ let init ?patch_context ?commit_genesis ?history_mode ?(readonly = false) ?history_mode ~allow_testchains) -let sync ?last_status:_ ~trigger_hash:_ _store = - Stdlib.failwith "sync: unimplemented" +let sync ?last_status:_ _store = Stdlib.failwith "sync: unimplemented" let close_store global_store = Lwt_watcher.shutdown_input global_store.protocol_watcher ; diff --git a/src/lib_store/store.mli b/src/lib_store/store.mli index 702970a695e6..abb437c0fee1 100644 --- a/src/lib_store/store.mli +++ b/src/lib_store/store.mli @@ -230,13 +230,9 @@ val init : is useful to keep track of a store opened in readonly mode that is updated by another read/write instance. [?last_status] gives a hint regarding the previous synchronization - to speed up the process. - [trigger_hash] corresponds to the last block that aims to be - stored in the store instance to synchronize -- it is typically set - as the last head to synchronize with. *) + to speed up the process. *) val sync : ?last_status:Block_store_status.t -> - trigger_hash:Block_hash.t -> t -> (t * Block_store_status.t * (unit -> unit Lwt.t)) tzresult Lwt.t diff --git a/src/lib_store/unix/store.ml b/src/lib_store/unix/store.ml index 3a28ae4bc138..d3751be22a83 100644 --- a/src/lib_store/unix/store.ml +++ b/src/lib_store/unix/store.ml @@ -2885,75 +2885,71 @@ let get_chain_store store chain_id = in loop chain_store -let sync ?(last_status = Block_store_status.create_idle_status) ~trigger_hash - (store : store) = +let sync ?(last_status = Block_store_status.create_idle_status) (store : store) + = let open Lwt_result_syntax in let*! () = Store_events.(emit start_store_sync) () in let sync_start = Time.System.now () in let main_chain_store = main_chain_store store in - let*! already_synced = Block.is_known main_chain_store trigger_hash in + (*FIXME: https://gitlab.com/tezos/tezos/-/issues/7250 + We should synchronization only when the head is not in sync. + *) let* store, current_status, cleanups = - if already_synced then - (* Nothing to do, the block is already known, and thus, the - store was already synchronized. *) - let*! () = Store_events.(emit store_already_sync) () in - return (store, last_status, fun () -> Lwt.return_unit) - else - let*! head_before_sync = Chain.current_head main_chain_store in - let store_dir = store.store_dir in - let chain_id = Chain_id.of_block_hash (genesis main_chain_store).block in - let chain_dir = Naming.chain_dir store_dir chain_id in - let* current_head_data = - Stored_data.load (Naming.current_head_file chain_dir) - in - let*! current_head_hash, _ = Stored_data.get current_head_data in - (* current_head_hash is availalbe in synchronized store *) - let* new_block_store, current_status, cleanups = - Block_store.sync ~last_status main_chain_store.block_store - in - let* new_chain_state = - if Block_store_status.equal last_status current_status then - (* When no merge occured since the last sync, we only need to - sync: - - current_head - - invalid_blocks - - protocols (if a new protocol is detected) *) - let*! () = Store_events.(emit store_quick_sync) () in - let* invalid_blocks_data = - Stored_data.load (Naming.invalid_blocks_file chain_dir) - in - let* chain_store = get_chain_store store chain_id in - let* current_head = Block.read_block chain_store current_head_hash in - Shared.use main_chain_store.chain_state (fun chain_state -> - let* protocol_levels_data = - if - Block.proto_level head_before_sync - = Block.proto_level current_head - then return chain_state.protocol_levels_data - else Stored_data.load (Naming.protocol_levels_file chain_dir) - in - return - { - chain_state with - current_head_data; - current_head; - protocol_levels_data; - invalid_blocks_data; - }) - else - (* Status has changed, synchronize everything. *) - let*! () = Store_events.(emit store_full_sync) () in - Chain.load_chain_state chain_dir new_block_store - in - let new_main_chain_store = - { - main_chain_store with - block_store = new_block_store; - chain_state = Shared.create new_chain_state; - } - in - store.main_chain_store <- Some new_main_chain_store ; - return (store, current_status, cleanups) + let*! head_before_sync = Chain.current_head main_chain_store in + let store_dir = store.store_dir in + let chain_id = Chain_id.of_block_hash (genesis main_chain_store).block in + let chain_dir = Naming.chain_dir store_dir chain_id in + let* current_head_data = + Stored_data.load (Naming.current_head_file chain_dir) + in + let*! current_head_hash, _ = Stored_data.get current_head_data in + (* current_head_hash is available in synchronized store *) + let* new_block_store, current_status, cleanups = + Block_store.sync ~last_status main_chain_store.block_store + in + let* new_chain_state = + if Block_store_status.equal last_status current_status then + (* When no merge occurred since the last sync, we only need to + sync: + - current_head + - invalid_blocks + - protocols (if a new protocol is detected) *) + let*! () = Store_events.(emit store_quick_sync) () in + let* invalid_blocks_data = + Stored_data.load (Naming.invalid_blocks_file chain_dir) + in + let* chain_store = get_chain_store store chain_id in + let* current_head = Block.read_block chain_store current_head_hash in + Shared.use main_chain_store.chain_state (fun chain_state -> + let* protocol_levels_data = + if + Block.proto_level head_before_sync + = Block.proto_level current_head + then return chain_state.protocol_levels_data + else Stored_data.load (Naming.protocol_levels_file chain_dir) + in + return + { + chain_state with + current_head_data; + current_head; + protocol_levels_data; + invalid_blocks_data; + }) + else + (* Status has changed, synchronize everything. *) + let*! () = Store_events.(emit store_full_sync) () in + Chain.load_chain_state chain_dir new_block_store + in + let new_main_chain_store = + { + main_chain_store with + block_store = new_block_store; + chain_state = Shared.create new_chain_state; + } + in + store.main_chain_store <- Some new_main_chain_store ; + return (store, current_status, cleanups) in let sync_end = Time.System.now () in let sync_time = Ptime.diff sync_end sync_start in diff --git a/src/lib_store/unix/store.mli b/src/lib_store/unix/store.mli index 3dd88e9e8037..e3746db55d9e 100644 --- a/src/lib_store/unix/store.mli +++ b/src/lib_store/unix/store.mli @@ -228,13 +228,9 @@ val init : is useful to keep track of a store opened in readonly mode that is updated by another read/write instance. [?last_status] gives a hint regarding the previous synchronization - to speed up the process. - [trigger_hash] corresponds to the last block that aims to be - stored in the store instance to synchronize -- it is typically set - as the last head to synchronize with. *) + to speed up the process. *) val sync : ?last_status:Block_store_status.t -> - trigger_hash:Block_hash.t -> t -> (t * Block_store_status.t * (unit -> unit Lwt.t)) tzresult Lwt.t -- GitLab From fe405fd72ae83dc60fce035c213d5cbc00f1ae2c Mon Sep 17 00:00:00 2001 From: Eugen Zalinescu Date: Tue, 10 Oct 2023 13:37:58 +0200 Subject: [PATCH 04/10] Rpc_process: initialize DAL --- src/lib_rpc_process/main.ml | 3 +++ src/lib_rpc_process/parameters.ml | 24 +++++++++++++++++------ src/lib_rpc_process/parameters.mli | 1 + src/lib_rpc_process/rpc_process_worker.ml | 1 + 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/lib_rpc_process/main.ml b/src/lib_rpc_process/main.ml index 08bb142e0c03..6b9dab809d8e 100644 --- a/src/lib_rpc_process/main.ml +++ b/src/lib_rpc_process/main.ml @@ -203,6 +203,9 @@ let run socket_dir = ~config:parameters.Parameters.internal_events () in + let*? () = + Tezos_crypto_dal.Cryptobox.Config.init_verifier_dal parameters.dal_config + in (* Updater needs to be initialized to be able to read the protocol sources from the store when a protocol is injected and compiled. *) diff --git a/src/lib_rpc_process/parameters.ml b/src/lib_rpc_process/parameters.ml index aeef0b5b25c6..b1399b04fc15 100644 --- a/src/lib_rpc_process/parameters.ml +++ b/src/lib_rpc_process/parameters.ml @@ -28,17 +28,29 @@ type t = { rpc_comm_socket_path : string; internal_events : Tezos_base.Internal_event_config.t; node_version : Tezos_version.Octez_node_version.t; + dal_config : Tezos_crypto_dal.Cryptobox.Config.t; } let parameters_encoding = let open Data_encoding in conv - (fun {config; rpc_comm_socket_path; internal_events; node_version} -> - (config, rpc_comm_socket_path, internal_events, node_version)) - (fun (config, rpc_comm_socket_path, internal_events, node_version) -> - {config; rpc_comm_socket_path; internal_events; node_version}) - (obj4 + (fun { + config; + rpc_comm_socket_path; + internal_events; + node_version; + dal_config; + } -> + (config, rpc_comm_socket_path, internal_events, node_version, dal_config)) + (fun ( config, + rpc_comm_socket_path, + internal_events, + node_version, + dal_config ) -> + {config; rpc_comm_socket_path; internal_events; node_version; dal_config}) + (obj5 (req "config" Config_file.encoding) (req "rpc_comm_socket_path" Data_encoding.string) (req "internal_events" Tezos_base.Internal_event_config.encoding) - (req "node_version" Tezos_version.Octez_node_version.encoding)) + (req "node_version" Tezos_version.Octez_node_version.encoding) + (req "dal_config" Tezos_crypto_dal.Cryptobox.Config.encoding)) diff --git a/src/lib_rpc_process/parameters.mli b/src/lib_rpc_process/parameters.mli index 2ed84390f8ba..9d9e47b6f920 100644 --- a/src/lib_rpc_process/parameters.mli +++ b/src/lib_rpc_process/parameters.mli @@ -29,6 +29,7 @@ type t = { rpc_comm_socket_path : string; internal_events : Tezos_base.Internal_event_config.t; node_version : Tezos_version.Octez_node_version.t; + dal_config : Tezos_crypto_dal.Cryptobox.Config.t; } (** Encoding for parameters type {!t} *) diff --git a/src/lib_rpc_process/rpc_process_worker.ml b/src/lib_rpc_process/rpc_process_worker.ml index 89389e604936..a166ea4c3ee1 100644 --- a/src/lib_rpc_process/rpc_process_worker.ml +++ b/src/lib_rpc_process/rpc_process_worker.ml @@ -130,6 +130,7 @@ let create ~comm_socket_path (config : Config_file.t) node_version events_config config; rpc_comm_socket_path = comm_socket_path; node_version; + dal_config = config.blockchain_network.dal_config; }; } -- GitLab From d9a2f1dbf366eaca376bed7337abb141740fec25 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Tue, 31 Oct 2023 17:39:09 +0100 Subject: [PATCH 05/10] Rpc_process: add timeout for slow DAL init --- src/lib_rpc_process/rpc_process_worker.ml | 57 ++++++++++++++--------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/src/lib_rpc_process/rpc_process_worker.ml b/src/lib_rpc_process/rpc_process_worker.ml index a166ea4c3ee1..0705a86a50f7 100644 --- a/src/lib_rpc_process/rpc_process_worker.ml +++ b/src/lib_rpc_process/rpc_process_worker.ml @@ -1,28 +1,27 @@ (*****************************************************************************) (* *) -(* Open Source License *) -(* Copyright (c) 2023 Nomadic Labs, *) -(* *) -(* Permission is hereby granted, free of charge, to any person obtaining a *) -(* copy of this software and associated documentation files (the "Software"),*) -(* to deal in the Software without restriction, including without limitation *) -(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) -(* and/or sell copies of the Software, and to permit persons to whom the *) -(* Software is furnished to do so, subject to the following conditions: *) -(* *) -(* The above copyright notice and this permission notice shall be included *) -(* in all copies or substantial portions of the Software. *) -(* *) -(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) -(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) -(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) -(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) -(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) -(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) -(* DEALINGS IN THE SOFTWARE. *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2023-2024 Nomadic Labs *) (* *) (*****************************************************************************) +type error += RPC_process_init_too_slow + +let () = + register_error_kind + `Permanent + ~id:"rpc_process_worker.RPC_process_init_too_slow" + ~title:"RPC process init too slow" + ~description:"RPC process init too slow" + ~pp:(fun ppf () -> + Format.fprintf + ppf + "RPC process init timeout: too slow to start. This is certainly due to \ + the slow DAL initialization.") + Data_encoding.unit + (function RPC_process_init_too_slow -> Some () | _ -> None) + (fun () -> RPC_process_init_too_slow) + module Event = struct include Internal_event.Simple @@ -180,7 +179,23 @@ let run_server t () = Parameters.parameters_encoding t.external_process_parameters in - let* () = Socket.recv init_socket_fd Data_encoding.unit in + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/6579 + Workaround: increase default timeout. If the timeout is still not + enough and an Lwt_unix.Timeout is triggered, we display a + comprehensive message. + *) + let timeout = Ptime.Span.of_int_s 120 in + let* () = + protect + (fun () -> Socket.recv ~timeout init_socket_fd Data_encoding.unit) + ~on_error:(function + | err + when List.exists + (function Exn Lwt_unix.Timeout -> true | _ -> false) + err -> + tzfail RPC_process_init_too_slow + | e -> fail e) + in let*! () = Lwt_unix.close init_socket_fd in let*! () = Event.(emit rpc_process_started) pid in t.server <- Some process ; -- GitLab From f454c125d1f8fe0662b24582e8d05cf3b6924eb0 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Thu, 13 Jul 2023 13:44:49 +0200 Subject: [PATCH 06/10] Tezt: handle RPC process synchronization --- tezt/lib_tezos/node.ml | 36 ++++++++++++++++++++++++++++-------- tezt/lib_tezos/node.mli | 6 +++++- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/tezt/lib_tezos/node.ml b/tezt/lib_tezos/node.ml index 0ba56e3240f8..122bc77b0726 100644 --- a/tezt/lib_tezos/node.ml +++ b/tezt/lib_tezos/node.ml @@ -599,13 +599,31 @@ let handle_event node {name; value; timestamp = _} = match name with | "node_is_ready.v0" -> set_ready node | "head_increment.v0" | "branch_switch.v0" -> ( - match JSON.(value |-> "level" |> as_int_opt) with - | None -> - (* Names [head_increment] and [branch_switch] correspond to - multiple different events. Some of those events carry a - [level], some do not. *) - () - | Some level -> update_level node level) + if + (* Consider [head_increment] and [branch_switch] events only + with the local RPC server. *) + not node.persistent_state.rpc_external + then + match JSON.(value |-> "level" |> as_int_opt) with + | None -> + (* Names [head_increment] and [branch_switch] correspond to + multiple different events. Some of those events carry a + [level], some do not. *) + () + | Some level -> update_level node level) + | "store_synchronized_on_head.v0" -> ( + if + (* Consider [store_synchronized_on_head] event only with the + local RPC server. *) + node.persistent_state.rpc_external + then + match JSON.(value |-> "level" |> as_int_opt) with + | None -> + (* Names [head_increment] and [branch_switch] correspond to + multiple different events. Some of those events carry a + [level], some do not. *) + () + | Some level -> update_level node level) | "read_identity.v0" -> update_identity node (JSON.as_string value) | "compilation_error.v0" -> ( match JSON.as_string_opt value with @@ -623,7 +641,9 @@ let handle_event node {name; value; timestamp = _} = | "set_head.v0" -> ( match JSON.(value |> geti 1 |> as_int_opt) with | None -> () - | Some level -> update_level node level) + | Some level -> + if not node.persistent_state.rpc_external then update_level node level + ) | _ -> () let check_event ?where node name promise = diff --git a/tezt/lib_tezos/node.mli b/tezt/lib_tezos/node.mli index 4406335bd0f2..4718c40b2079 100644 --- a/tezt/lib_tezos/node.mli +++ b/tezt/lib_tezos/node.mli @@ -141,6 +141,10 @@ type t Default value for [max_active_rpc_connections] is [500]. + [local_rpc_server] specify whether or not the RPC server must be + run locally, if true (default), or on a external process. It is + not allowed yet to run both server kinds at the same time. + The argument list is a list of configuration options that the node should run with. It is passed to the first run of [octez-node config init]. It is also passed to all runs of [octez-node run] that occur before @@ -608,7 +612,7 @@ val init : port of [node]. *) val send_raw_data : t -> data:string -> unit Lwt.t -(** [upgrade_storage node] upprades the given [node] storage. *) +(** [upgrade_storage node] upgrades the given [node] storage. *) val upgrade_storage : t -> unit Lwt.t (** Run [octez-node --version] and return the node's version. *) -- GitLab From 4f5da5bf479527877a8f24712b88b2c650268f7c Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Thu, 13 Jul 2023 15:08:10 +0200 Subject: [PATCH 07/10] Tezt: add RPC process node sync test --- tezt/tests/rpc_process.ml | 48 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/tezt/tests/rpc_process.ml b/tezt/tests/rpc_process.ml index f4b1a7477d23..262656a22f37 100644 --- a/tezt/tests/rpc_process.ml +++ b/tezt/tests/rpc_process.ml @@ -351,6 +351,52 @@ let test_local_and_process_rpc_servers = Log.info "Checking if external RPC servers have been well started" ; Lwt_list.iter_p (fun p -> p) external_event_promises +let wait_for_RPC_process_sync node = + let filter json = JSON.(json |-> "level" |> as_int_opt) in + Node.wait_for node "store_synchronized_on_head.v0" filter + +let wait_for_local_rpc_server node = + Node.wait_for node "starting_local_rpc_server.v0" (fun _ -> Some ()) + +let test_sync_with_node = + Protocol.register_test + ~__FILE__ + ~title:"RPC is sync with node" + ~tags:["rpc"; "process"; "node"; "sync"] + @@ fun protocol -> + let node_arguments = Node.[Synchronisation_threshold 0] in + (* Default node running with the RPC_process *) + let* node_rpc_process = + Node.init ~name:"node_rpc_process" ~rpc_external:true node_arguments + in + let* () = Node.wait_for_ready node_rpc_process in + let node_local_rpc = Node.create ~name:"node_local_rpc" node_arguments in + let waiter_for_local_rpc_server = wait_for_local_rpc_server node_local_rpc in + let* () = Node.run node_local_rpc node_arguments in + let* () = Node.wait_for_ready node_local_rpc in + (* Wait for the event that states that the local RPC server is + used. *) + let* () = waiter_for_local_rpc_server in + let* client = Client.init ~endpoint:(Node node_local_rpc) () in + let* () = Client.Admin.connect_address ~peer:node_rpc_process client in + let waiter_for_RPC_process_sync = + wait_for_RPC_process_sync node_rpc_process + in + let* () = Client.activate_protocol_and_wait ~protocol client in + Log.info + "Wait for RPC_process sync at level 1 on the %s" + (Node.name node_rpc_process) ; + let* (_ : int) = waiter_for_RPC_process_sync in + Log.info "Wait for level 1 on the %s" (Node.name node_rpc_process) ; + (* Node.wait_for_level relies on the RPC_process synchronization + event.*) + let* (_ : int) = Node.wait_for_level node_rpc_process 1 in + (* Node.wait_for_level relies on the set_head/branch_switch + events. *) + let* (_ : int) = Node.wait_for_level node_local_rpc 1 in + unit + let register ~protocols = test_kill protocols ; - test_forward protocols + test_forward protocols ; + test_sync_with_node protocols -- GitLab From 2143ae183ba9f81aa322204aec46c3d265b2b49d Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Thu, 23 Nov 2023 11:32:12 +0100 Subject: [PATCH 08/10] Tezt: update rpc_process test to suit the new forward policy --- tezt/tests/rpc_process.ml | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/tezt/tests/rpc_process.ml b/tezt/tests/rpc_process.ml index 262656a22f37..01d5d5ae91df 100644 --- a/tezt/tests/rpc_process.ml +++ b/tezt/tests/rpc_process.ml @@ -141,7 +141,7 @@ let test_forward = let* () = test_rpc Handle RPC.get_config ~rpc_prefix:"/config" in let* () = test_rpc - Forward + Handle (RPC.get_chain_chain_id ()) ~rpc_prefix:"/chains/main/chain_id" in @@ -154,27 +154,29 @@ let test_forward = in let* () = test_rpc - Forward + Handle (RPC.get_chain_chain_id ~chain:"nonexistent" ()) ~rpc_prefix:"/chains/nonexistent/chain_id" ~error:"Cannot parse chain identifier" in let* () = test_rpc - Forward + Forward (* Only the node deals with the mempool. *) (RPC.post_chain_mempool_filter ~data:(Data (Ezjsonm.from_string "{}")) ()) ~rpc_prefix:"/chains/main/mempool/filter" in let* () = test_rpc Forward + (* The path is unknown by the RPC process, we try to + forward it to the node. *) RPC.nonexistent_path ~rpc_prefix:"/nonexistent/path" ~error:"No service found at this URL" in - let test_streaming_rpc ~rpc_prefix rpc = + let test_streaming_rpc expected_behavior ~rpc_prefix rpc = Log.info "Test streaming RPC: %s" rpc_prefix ; - let waiter = wait_for Forward ~rpc_prefix in + let waiter = wait_for expected_behavior ~rpc_prefix in let url = RPC_core.make_uri (Node.as_rpc_endpoint node) rpc |> Uri.to_string in @@ -183,25 +185,32 @@ let test_forward = in let* () = test_streaming_rpc + Forward (* Only the node deals with the mempool. *) (RPC.get_chain_mempool_monitor_operations ()) ~rpc_prefix:"/chains/main/mempool/monitor_operations" in let* () = test_streaming_rpc + Handle (RPC.get_monitor_heads_chain ()) ~rpc_prefix:"/monitor/heads/main" in let* () = test_streaming_rpc + Forward + (* The path is unknown by the RPC process, we try to + forward it to the node. *) (RPC.get_monitor_heads_chain ~chain:"test" ()) ~rpc_prefix:"/monitor/heads/test" in let* () = test_streaming_rpc + Forward (* FIXME/TODO: Not handled by the RPC process for now. *) RPC.get_monitor_validated_blocks ~rpc_prefix:"/monitor/validated_blocks" in test_streaming_rpc + Handle RPC.get_monitor_applied_blocks ~rpc_prefix:"/monitor/applied_blocks" -- GitLab From c5b9eaf8b548d87e9228fa4a4ae528f6c0679c33 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Fri, 24 Nov 2023 16:21:33 +0100 Subject: [PATCH 09/10] Tezt: restart node to update lazily synchronized store values --- tezt/tests/bootstrap.ml | 18 ++++++++++++++++-- tezt/tests/storage_snapshots.ml | 12 ++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/tezt/tests/bootstrap.ml b/tezt/tests/bootstrap.ml index 4c1d7945eac6..3fc3958a77c0 100644 --- a/tezt/tests/bootstrap.ml +++ b/tezt/tests/bootstrap.ml @@ -190,8 +190,10 @@ let check_bootstrap_with_history_modes hmode1 hmode2 = if !last_cycle_being_merged then Some () else None in (* Initialize nodes and client. *) - let* node_1 = - Node.init [Synchronisation_threshold 0; Connections 1; History_mode hmode1] + let node_1_args = + Node.[Synchronisation_threshold 0; Connections 1; History_mode hmode1] + in + let* node_1 = Node.init node_1_args and* node_2 = Node.init [Connections 1; History_mode hmode2] in let endpoint_1 = Client.(Node node_1) in let* node2_identity = Node.wait_for_identity node_2 in @@ -225,6 +227,18 @@ let check_bootstrap_with_history_modes hmode1 hmode2 = let final_level = 1 + bakes_before_kill + bakes_during_kill in let* _ = Node.wait_for_level node_1 final_level in let* () = wait_for_last_cycle in + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7251 + Restarting node_1 to make sure that the store invariant + (savepoint/checkpoint/…) are well synchronized. Indeed, these + invariant are synchronized with the RPC-process in a best effort + way and as the RPC-process synchronization can be faster than a + store merge, it can return an unexpected value (the value before + the last expected update). *) + let* () = + let* () = Node.terminate node_1 in + let* () = Node.run node_1 node_1_args in + Node.wait_for_ready node_1 + in let* () = Node.run node_2 [Synchronisation_threshold 1; Connections 1] in let* _ = Node.wait_for_ready node_2 in (* Register the unknown ancestor event before connecting node 2 to node 1 diff --git a/tezt/tests/storage_snapshots.ml b/tezt/tests/storage_snapshots.ml index 3331d6e5500f..0daefba50904 100644 --- a/tezt/tests/storage_snapshots.ml +++ b/tezt/tests/storage_snapshots.ml @@ -489,6 +489,18 @@ let test_drag_after_rolling_import = in (checkpoint, savepoint, caboose) in + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7251 + Restarting fresh_node to make sure that the store invariant + (savepoint/checkpoint/…) are well synchronized. Indeed, these + invariant are synchronized with the RPC-process in a best effort + way and as the RPC-process synchronization can be faster than a + store merge, it can return an unexpected value (the value before + the last expected update). *) + let* () = + let* () = Node.terminate fresh_node in + let* () = Node.run fresh_node node_arguments in + Node.wait_for_ready fresh_node + in let* () = check_consistency_after_import fresh_node -- GitLab From 4bf28fdc2c92a601957fd4d9644cc506a90a8140 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Fri, 24 May 2024 11:02:46 +0200 Subject: [PATCH 10/10] Node: warn external-rpc-process as experimental --- src/bin_node/node_run_command.ml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/bin_node/node_run_command.ml b/src/bin_node/node_run_command.ml index 080750afefcc..0ac4c6871e7e 100644 --- a/src/bin_node/node_run_command.ml +++ b/src/bin_node/node_run_command.ml @@ -196,6 +196,16 @@ module Event = struct ("given_history_mode", History_mode.encoding) ~pp2:History_mode.pp ("stored_history_mode", History_mode.encoding) + + let warn_external_rpc_process_usage = + declare_0 + ~section + ~name:"warn_external_rpc_process_usage" + ~msg: + "the external RPC process is enabled. This is an unstable feature that \ + should be use with care. Please report encountered issues if any." + ~level:Warning + () end open Filename.Infix @@ -649,6 +659,8 @@ let init_rpc (config : Config_file.t) (node : Node.t) internal_events = let* rpc_server = if config.rpc.external_listen_addrs = [] then return No_server else + (* Warn that the feature is experimental.*) + let*! () = Event.(emit warn_external_rpc_process_usage) () in (* Starts the node's local RPC server that aims to handle the RPCs forwarded by the rpc_process, if they cannot be processed by the rpc_process itself. *) -- GitLab