diff --git a/CHANGES.rst b/CHANGES.rst index f8e0d307e4ef04f7a7f7eea2514347b8cc9fdafe..3add8b87bed561bdc873c375a3a3a3949d29b2fd 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -249,6 +249,9 @@ Smart Rollup node - Introduce the 6th version of the WASM PVM. (MR :gl:`!14493`) +- New RPC ``GET /admin/cancel_gc`` to cancel any on-going garbage collection in + the rollup node. (MR :gl:`!14693`) + Smart Rollup WASM Debugger -------------------------- diff --git a/src/lib_layer2_store/context.ml b/src/lib_layer2_store/context.ml index 6cdd75466af02a72c306b6bdfa9241354f671476..cb160124ec931a3e3a8a514e84e82b4405776f08 100644 --- a/src/lib_layer2_store/context.ml +++ b/src/lib_layer2_store/context.ml @@ -119,6 +119,11 @@ let is_gc_finished [> `Write] t) = Pvm_Context_Impl.is_gc_finished index +let cancel_gc + (Context {pvm_context_impl = (module Pvm_Context_Impl); index; _} : + [> `Write] t) = + Pvm_Context_Impl.cancel_gc index + let split (type a) (Context {pvm_context_impl = (module Pvm_Context_Impl); index; _} : a t) = Pvm_Context_Impl.split index diff --git a/src/lib_layer2_store/context.mli b/src/lib_layer2_store/context.mli index d8365241fc378cb6fb5886542a7b4a0f74d88f54..e6d2986e046e0bfbd06c93876f067f674a67bd37 100644 --- a/src/lib_layer2_store/context.mli +++ b/src/lib_layer2_store/context.mli @@ -108,6 +108,10 @@ val commit : ?message:string -> [`Read | `Write] t -> hash Lwt.t if a GC is running for [index]. *) val is_gc_finished : [`Read | `Write] t -> bool +(** [cancel_gc index] stops the Irmin GC if it is currently running for + [index]. It returns [true] if a GC was canceled. *) +val cancel_gc : [`Read | `Write] t -> bool + (** [split ctxt] creates a new suffix file, also called "chunk", into the context's file hierarchy. This split function is expected to be called after committing a commit that will be a future candidate for a GC target. *) diff --git a/src/lib_layer2_store/context_sigs.ml b/src/lib_layer2_store/context_sigs.ml index b7c8b805be6ccc4863bbb090c300733fa13b7b6f..334a1aec2836df08fae6005236c0c82eade0652c 100644 --- a/src/lib_layer2_store/context_sigs.ml +++ b/src/lib_layer2_store/context_sigs.ml @@ -120,6 +120,10 @@ module type S = sig if a GC is running for [index]. *) val is_gc_finished : [> `Write] index -> bool + (** [cancel_gc index] stops the Irmin GC if it is currently running for + [index]. It returns [true] if a GC was canceled. *) + val cancel_gc : [> `Write] index -> bool + (** [split ctxt] splits the current context in order to chunk the file if the backend supports it. This split function is expected to be called after committing a commit that will be a future candidate for a GC target. *) diff --git a/src/lib_layer2_store/indexed_store.ml b/src/lib_layer2_store/indexed_store.ml index b587db91c580d01a36148b602ca45f5fca87e47c..769182987178bf3329f8a4acf1930918554d1d38 100644 --- a/src/lib_layer2_store/indexed_store.ml +++ b/src/lib_layer2_store/indexed_store.ml @@ -107,6 +107,8 @@ module type INDEXABLE_STORE = sig val wait_gc_completion : 'a t -> unit Lwt.t val is_gc_finished : 'a t -> bool + + val cancel_gc : 'a t -> bool Lwt.t end module type INDEXABLE_REMOVABLE_STORE = sig @@ -158,6 +160,8 @@ module type INDEXED_FILE = sig val wait_gc_completion : 'a t -> unit Lwt.t val is_gc_finished : 'a t -> bool + + val cancel_gc : 'a t -> bool Lwt.t end module type SIMPLE_INDEXED_FILE = sig @@ -252,7 +256,7 @@ module Make_indexable (N : NAME) (K : INDEX_KEY) (V : Index.Value.S) = struct type gc_status = | No_gc - | Ongoing of {tmp_index : internal_index; promise : unit Lwt.t} + | Ongoing of {promise : unit Lwt.t; canceler : Lwt_canceler.t} (** In order to periodically clean up the store with the {!gc} function, each pure index store is split in multiple indexes: one fresh index and @@ -386,16 +390,15 @@ module Make_indexable (N : NAME) (K : INDEX_KEY) (V : Index.Value.S) = struct protect @@ fun () -> Lwt_idle_waiter.force_idle store.scheduler @@ fun () -> let open Lwt_result_syntax in - (match store.gc_status with - | Ongoing {promise; _} -> Lwt.cancel promise - | No_gc -> ()) ; - close_internal_index store.fresh ; - List.iter close_internal_index store.stales ; - let*! () = + let* () = + Lwt_result.map_error (fun tr -> List.map (fun e -> Exn e) tr) + @@ match store.gc_status with - | No_gc -> Lwt.return_unit - | Ongoing {tmp_index; _} -> rm_internal_index tmp_index + | Ongoing {canceler; _} -> Lwt_canceler.cancel canceler + | No_gc -> return_unit in + close_internal_index store.fresh ; + List.iter close_internal_index store.stales ; return_unit let readonly x = (x :> [`Read] t) @@ -428,20 +431,20 @@ module Make_indexable (N : NAME) (K : INDEX_KEY) (V : Index.Value.S) = struct store.fresh <- new_index_fresh ; store.stales <- new_index_stale :: store.stales ; let promise, resolve = Lwt.task () in - store.gc_status <- Ongoing {tmp_index; promise} ; - return (tmp_index, promise, resolve) + let canceler = Lwt_canceler.create () in + store.gc_status <- Ongoing {promise; canceler} ; + return (tmp_index, promise, resolve, canceler) (** If a gc operation fails, reverting simply consists in removing the temporary index. We keep the two stale indexes as is, they will be merged by the next successful gc. *) - let revert_failed_gc store = + let cancel_gc store = let open Lwt_syntax in match store.gc_status with - | No_gc -> return_unit - | Ongoing {tmp_index; promise} -> - Lwt.cancel promise ; - store.gc_status <- No_gc ; - rm_internal_index tmp_index + | No_gc -> return_false + | Ongoing {canceler; _} -> + let* _res = Lwt_canceler.cancel canceler in + return_true (** When the gc operation finishes, i.e. we have copied all elements to retain to the temporary index, we can replace all the stale indexes by the @@ -470,12 +473,15 @@ module Make_indexable (N : NAME) (K : INDEX_KEY) (V : Index.Value.S) = struct [tmp_index]. While this is happening, the stale indexes can still be queried and new bindings can still be added to the store because only the fresh index is modified. *) - let gc_background_task store tmp_index filter resolve = + let gc_background_task store tmp_index filter resolve canceler = let open Lwt_syntax in + Lwt_canceler.on_cancel canceler (fun () -> + store.gc_status <- No_gc ; + rm_internal_index tmp_index) ; Lwt.dont_wait (fun () -> let* res = - trace (Gc_failed N.name) @@ protect + trace (Gc_failed N.name) @@ protect ~canceler @@ fun () -> let open Lwt_result_syntax in let process_key_value (k, v) = @@ -499,7 +505,8 @@ module Make_indexable (N : NAME) (K : INDEX_KEY) (V : Index.Value.S) = struct | Ok () -> return_unit | Error error -> let* () = Store_events.failed_gc N.name error in - revert_failed_gc store + let* (_canceled : bool) = cancel_gc store in + return_unit in Lwt.wakeup_later resolve () ; return_unit) @@ -515,8 +522,8 @@ module Make_indexable (N : NAME) (K : INDEX_KEY) (V : Index.Value.S) = struct match store.gc_status with | Ongoing _ -> Store_events.ignore_gc N.name | No_gc -> - let* tmp_index, promise, resolve = initiate_gc store in - gc_background_task store tmp_index filter resolve ; + let* tmp_index, promise, resolve, canceler = initiate_gc store in + gc_background_task store tmp_index filter resolve canceler ; if async then return_unit else Lwt.catch @@ -771,7 +778,7 @@ struct type gc_status = | No_gc - | Ongoing of {tmp_store : internal_store; promise : unit Lwt.t} + | Ongoing of {promise : unit Lwt.t; canceler : Lwt_canceler.t} (** In order to periodically clean up the store with the {!gc} function, each store is split in multiple stores: one fresh store and multiple stale @@ -994,16 +1001,15 @@ struct protect @@ fun () -> Lwt_idle_waiter.force_idle store.scheduler @@ fun () -> let open Lwt_result_syntax in - (match store.gc_status with - | Ongoing {promise; _} -> Lwt.cancel promise - | No_gc -> ()) ; - let* () = close_internal_store store.fresh - and* () = List.iter_ep close_internal_store store.stales - and* () = + let* () = + Lwt_result.map_error (fun tr -> List.map (fun e -> Exn e) tr) + @@ match store.gc_status with + | Ongoing {canceler; _} -> Lwt_canceler.cancel canceler | No_gc -> return_unit - | Ongoing {tmp_store; _} -> rm_internal_store tmp_store in + let* () = close_internal_store store.fresh + and* () = List.iter_ep close_internal_store store.stales in return_unit let readonly x = (x :> [`Read] t) @@ -1037,22 +1043,20 @@ struct store.fresh <- new_store_fresh ; store.stales <- new_store_stale :: store.stales ; let promise, resolve = Lwt.task () in - store.gc_status <- Ongoing {tmp_store; promise} ; - return (tmp_store, promise, resolve) + let canceler = Lwt_canceler.create () in + store.gc_status <- Ongoing {promise; canceler} ; + return (tmp_store, promise, resolve, canceler) (** If a gc operation fails, reverting simply consists in removing the temporary store. We keep the two stale stores as is, they will be merged by the next successful gc. *) - let revert_failed_gc store = + let cancel_gc store = let open Lwt_syntax in match store.gc_status with - | No_gc -> return_unit - | Ongoing {tmp_store; promise} -> ( - Lwt.cancel promise ; - let+ res = rm_internal_store tmp_store in - match res with - | Ok () -> () - | Error _e -> (* ignore error when reverting *) ()) + | No_gc -> return_false + | Ongoing {canceler; _} -> + let* _res = Lwt_canceler.cancel canceler in + return_true (** When the gc operation finishes, i.e. we have copied all elements to retain to the temporary store, we can replace all the stale stores by the @@ -1082,12 +1086,16 @@ struct [tmp_store]. While this is happening, the stale stores can still be queried and new bindings can still be added to the store because only the fresh store is modified. *) - let gc_background_task store tmp_store filter resolve = + let gc_background_task store tmp_store filter resolve canceler = let open Lwt_result_syntax in + Lwt_canceler.on_cancel canceler (fun () -> + let*! _res = rm_internal_store tmp_store in + store.gc_status <- No_gc ; + Lwt.return_unit) ; Lwt.dont_wait (fun () -> let*! res = - trace (Gc_failed N.name) @@ protect + trace (Gc_failed N.name) @@ protect ~canceler @@ fun () -> let process_key_header internal_store (key, {IHeader.offset; header}) = @@ -1115,7 +1123,8 @@ struct | Ok () -> return_unit | Error error -> let* () = Store_events.failed_gc N.name error in - revert_failed_gc store + let* (_canceled : bool) = cancel_gc store in + return_unit in Lwt.wakeup_later resolve () ; Lwt.return_unit) @@ -1132,8 +1141,8 @@ struct let*! () = Store_events.ignore_gc N.name in return_unit | No_gc -> - let* tmp_store, promise, resolve = initiate_gc store in - gc_background_task store tmp_store filter resolve ; + let* tmp_store, promise, resolve, canceler = initiate_gc store in + gc_background_task store tmp_store filter resolve canceler ; if async then return_unit else Lwt.catch diff --git a/src/lib_layer2_store/indexed_store.mli b/src/lib_layer2_store/indexed_store.mli index f4bb34eb752b07d96ae1b34acaef2f2eebf8e223..f745bd4e395c0ff3c31a046a2c35b40d8219d9b7 100644 --- a/src/lib_layer2_store/indexed_store.mli +++ b/src/lib_layer2_store/indexed_store.mli @@ -116,6 +116,10 @@ module type INDEXABLE_STORE = sig (** [is_gc_finished t] returns [true] if there is no GC running. *) val is_gc_finished : 'a t -> bool + + (** [cancel_gc t] stops the currently ongoing GC if any. It returns [true] if + a GC was canceled. *) + val cancel_gc : 'a t -> bool Lwt.t end (** An index store mapping keys to values. Keys are associated to optional @@ -198,6 +202,10 @@ module type INDEXED_FILE = sig (** [is_gc_finished t] returns [true] if there is no GC running. *) val is_gc_finished : 'a t -> bool + + (** [cancel_gc t] stops the currently ongoing GC if any. It returns [true] if + a GC was canceled. *) + val cancel_gc : 'a t -> bool Lwt.t end (** Same as {!INDEXED_FILE} but where headers are extracted from values. *) diff --git a/src/lib_layer2_store/irmin_context.ml b/src/lib_layer2_store/irmin_context.ml index 57be4e820b2bedc181f46515528435fb014ef398..02545e022c64c71590c11434eea4c0f3d5aaf140 100644 --- a/src/lib_layer2_store/irmin_context.ml +++ b/src/lib_layer2_store/irmin_context.ml @@ -198,6 +198,8 @@ let wait_gc_completion index = let is_gc_finished index = IStore.Gc.is_finished index.repo +let cancel_gc index = IStore.Gc.cancel index.repo + let index context = context.index let export_snapshot {path = _; repo} hash ~path = diff --git a/src/lib_layer2_store/irmin_context.mli b/src/lib_layer2_store/irmin_context.mli index f484557511b362f86d5d3bb2b1ea491ea8fc2d85..f06adc938248b81b1bdf28f8bf265fe04c40d32d 100644 --- a/src/lib_layer2_store/irmin_context.mli +++ b/src/lib_layer2_store/irmin_context.mli @@ -123,6 +123,10 @@ val gc : if a GC is running for [index]. *) val is_gc_finished : [> `Write] index -> bool +(** [cancel_gc index] stops the Irmin GC if it is currently running for + [index]. It returns [true] if a GC was canceled. *) +val cancel_gc : [> `Write] index -> bool + (** [wait_gc_completion index] will return a blocking thread if a GC run is currently ongoing. *) val wait_gc_completion : [> `Write] index -> unit Lwt.t diff --git a/src/lib_layer2_store/riscv_context.ml b/src/lib_layer2_store/riscv_context.ml index bc1ea9d918a40c1739afe28334c8455232d1ccc8..4bb42eff65c3f091996e0f0d8cf0250b193796d8 100644 --- a/src/lib_layer2_store/riscv_context.ml +++ b/src/lib_layer2_store/riscv_context.ml @@ -63,6 +63,8 @@ let commit ?message ctxt = Storage.commit ?message ctxt.index.repo ctxt.tree let is_gc_finished index = Storage.is_gc_finished index.repo +let cancel_gc index = Storage.cancel_gc index.repo + let split index = Storage.split index.repo let gc index ?(callback : unit -> unit Lwt.t = fun () -> Lwt.return ()) diff --git a/src/lib_riscv/pvm/storage.ml b/src/lib_riscv/pvm/storage.ml index 39724eab5f5bc3697c058c64983c8cc8326494ad..abcf9fc3ac56dc72b2ebb09276879db8e0ae2f06 100644 --- a/src/lib_riscv/pvm/storage.ml +++ b/src/lib_riscv/pvm/storage.ml @@ -45,6 +45,8 @@ let commit ?message:_ repo state = let is_gc_finished _repo = true +let cancel_gc _repo = false + let split _repo = () let gc _repo ?callback:_ _key = Lwt.return_unit diff --git a/src/lib_riscv/pvm/storage.mli b/src/lib_riscv/pvm/storage.mli index a947ce63f5556210d0ecccecc3d15c21974a7966..36bb1c7a3760002fec07d89c68b4ca57c3df69b6 100644 --- a/src/lib_riscv/pvm/storage.mli +++ b/src/lib_riscv/pvm/storage.mli @@ -37,6 +37,8 @@ val commit : ?message:string -> Repo.t -> State.t -> Id.t Lwt.t val is_gc_finished : Repo.t -> bool +val cancel_gc : Repo.t -> bool + val split : Repo.t -> unit val gc : Repo.t -> ?callback:(unit -> unit Lwt.t) -> Id.t -> unit Lwt.t diff --git a/src/lib_smart_rollup/rollup_node_services.ml b/src/lib_smart_rollup/rollup_node_services.ml index d246aebb495bc64677aaf39fc27274e38b119881..8657055adc70b138adf584b9769584e12b1efd28 100644 --- a/src/lib_smart_rollup/rollup_node_services.ml +++ b/src/lib_smart_rollup/rollup_node_services.ml @@ -915,4 +915,11 @@ module Admin = struct ~query:Query.operation_tag_query ~output:Data_encoding.unit (path / "injector" / "queues") + + let cancel_gc = + Tezos_rpc.Service.get_service + ~description:"Cancel any ongoing GC" + ~query:Tezos_rpc.Query.empty + ~output:Data_encoding.bool + (path / "cancel_gc") end diff --git a/src/lib_smart_rollup_node/node_context.ml b/src/lib_smart_rollup_node/node_context.ml index 7ebcaced81b093af56b48a3f804e9267c4bd81f4..d21e04ba504f01c356f48b90df99a5fed03921ce 100644 --- a/src/lib_smart_rollup_node/node_context.ml +++ b/src/lib_smart_rollup_node/node_context.ml @@ -1059,14 +1059,16 @@ let gc ?(wait_finished = false) ?(force = false) node_ctxt ~(level : int32) = let* () = Store.gc node_ctxt.store ~level:gc_level in let gc_waiter () = let open Lwt_syntax in - let* () = Context.wait_gc_completion node_ctxt.context - and* () = Store.wait_gc_completion node_ctxt.store in - Metrics.wrap (fun () -> - let stop_timestamp = Time.System.now () in - Metrics.GC.set_process_time - @@ Ptime.diff stop_timestamp start_timestamp) ; - let* () = Event.gc_finished ~gc_level ~head_level:level in - Lwt_lock_file.unlock gc_lockfile + Lwt.finalize + (fun () -> + let* () = Context.wait_gc_completion node_ctxt.context + and* () = Store.wait_gc_completion node_ctxt.store in + Metrics.wrap (fun () -> + let stop_timestamp = Time.System.now () in + Metrics.GC.set_process_time + @@ Ptime.diff stop_timestamp start_timestamp) ; + Event.gc_finished ~gc_level ~head_level:level) + (fun () -> Lwt_lock_file.unlock gc_lockfile) in if wait_finished then let*! () = gc_waiter () in @@ -1076,6 +1078,12 @@ let gc ?(wait_finished = false) ?(force = false) node_ctxt ~(level : int32) = return_unit)) | _ -> return_unit +let cancel_gc node_ctxt = + let open Lwt_syntax in + let canceled_context_gc = Context.cancel_gc node_ctxt.context in + let+ canceled_store_gc = Store.cancel_gc node_ctxt.store in + canceled_context_gc || canceled_store_gc + let check_level_available node_ctxt accessed_level = let open Lwt_result_syntax in let* {first_available_level; _} = get_gc_levels node_ctxt in diff --git a/src/lib_smart_rollup_node/node_context.mli b/src/lib_smart_rollup_node/node_context.mli index 382770f25f15e701b7e5d014de9e3dca2f1a93f7..b7e99411294a82d39343293f032b577f6b50d32d 100644 --- a/src/lib_smart_rollup_node/node_context.mli +++ b/src/lib_smart_rollup_node/node_context.mli @@ -563,6 +563,10 @@ val gc : level:int32 -> unit tzresult Lwt.t +(** [cancel_gc t] stops any currently ongoing GC. It returns [true] if a GC was + canceled. *) +val cancel_gc : rw -> bool Lwt.t + (** [get_gc_levels node_ctxt] returns information about the garbage collected levels. *) val get_gc_levels : _ t -> Store.Gc_levels.levels tzresult Lwt.t diff --git a/src/lib_smart_rollup_node/rpc_directory.ml b/src/lib_smart_rollup_node/rpc_directory.ml index 0ffa3e8fb8c84f1b80a03a159c6da2b456e067f3..fe668ea38b72a533e0ffbc94736728243fe5feba 100644 --- a/src/lib_smart_rollup_node/rpc_directory.ml +++ b/src/lib_smart_rollup_node/rpc_directory.ml @@ -93,10 +93,9 @@ module Admin_directory = Make_directory (struct type context = Node_context.rw - type subcontext = Node_context.ro + type subcontext = Node_context.rw - let context_of_prefix node_ctxt () = - Lwt_result.return (Node_context.readonly node_ctxt) + let context_of_prefix node_ctxt () = Lwt_result.return node_ctxt end) let () = @@ -607,6 +606,13 @@ let () = Admin_directory.register0 Rollup_node_services.Admin.clear_injector_queues @@ fun _node_ctxt tag () -> Injector.clear_queues ?tag () +let () = + Admin_directory.register0 Rollup_node_services.Admin.cancel_gc + @@ fun node_ctxt () () -> + let open Lwt_result_syntax in + let*! canceled = Node_context.cancel_gc node_ctxt in + return canceled + let add_describe dir = Tezos_rpc.Directory.register_describe_directory_service dir diff --git a/src/lib_smart_rollup_node/store_v2.ml b/src/lib_smart_rollup_node/store_v2.ml index 570f413f19d01629b33672508698621206938088..c5ef2a48f551c0a4e80d84820603c30818f51dbf 100644 --- a/src/lib_smart_rollup_node/store_v2.ml +++ b/src/lib_smart_rollup_node/store_v2.ml @@ -601,3 +601,36 @@ let is_gc_finished && Commitments_published_at_level.is_gc_finished commitments_published_at_level && Levels_to_hashes.is_gc_finished levels_to_hashes + +let cancel_gc + ({ + l2_blocks; + messages; + inboxes; + commitments; + commitments_published_at_level; + l2_head = _; + last_finalized_level = _; + lcc = _; + lpc = _; + levels_to_hashes; + irmin_store = _; + protocols = _; + gc_levels = _; + last_context_split_level = _; + history_mode = _; + } : + _ t) = + let open Lwt_syntax in + let+ canceled = + Lwt.all + [ + L2_blocks.cancel_gc l2_blocks; + Messages.cancel_gc messages; + Inboxes.cancel_gc inboxes; + Commitments.cancel_gc commitments; + Commitments_published_at_level.cancel_gc commitments_published_at_level; + Levels_to_hashes.cancel_gc levels_to_hashes; + ] + in + List.exists Fun.id canceled diff --git a/src/lib_smart_rollup_node/store_v2.mli b/src/lib_smart_rollup_node/store_v2.mli index 2baadd3c89a37eccfa6b0b5a779d620d10c958bc..842897d55a576035ed22837db8b0d364d7017371 100644 --- a/src/lib_smart_rollup_node/store_v2.mli +++ b/src/lib_smart_rollup_node/store_v2.mli @@ -139,3 +139,7 @@ include Store_sig.S with type 'a store := 'a store (** [is_gc_finished t] returns [true] if there is no GC running. *) val is_gc_finished : 'a t -> bool + +(** [cancel_gc t] stops any currently ongoing GC. It returns [true] if a GC was + canceled. *) +val cancel_gc : 'a t -> bool Lwt.t