diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index 7c065c36e057df7303023e07e024fcfe072960a6..8db75c93ad5ef44e69b86b65c43648f63a88eaa4 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -1151,8 +1151,8 @@ let get_proto_plugins cctxt profile_ctxt ~last_processed_level ~first_seen_level and we don't detect it if this code starts running just before the migration level, and the head changes meanwhile to be above the migration level. *) -let clean_up_store ctxt cctxt ~last_processed_level ~first_seen_level head_level - proto_parameters = +let clean_up_store_and_catch_up ctxt cctxt ~last_processed_level + ~first_seen_level head_level proto_parameters = let open Lwt_result_syntax in let store_skip_list_cells ~level = let*? (module Plugin) = @@ -1171,6 +1171,7 @@ let clean_up_store ctxt cctxt ~last_processed_level ~first_seen_level head_level (module Plugin : Dal_plugin.T with type block_info = Plugin.block_info) in let store = Node_context.get_store ctxt in + let last_processed_level_store = Store.last_processed_level store in let supports_refutations = Handler.supports_refutations ctxt in (* [target_level] identifies the level wrt to head level at which we want to start the P2P and process blocks as usual. *) @@ -1180,21 +1181,26 @@ let clean_up_store ctxt cctxt ~last_processed_level ~first_seen_level head_level the plugin. *) Int32.(sub level (of_int period)) in - let should_store_skip_list_cells ~head_level ~level = - let profile_ctxt = Node_context.get_profile_ctxt ctxt in - let period = - get_storage_period - profile_ctxt - proto_parameters - head_level - first_seen_level - + skip_list_offset proto_parameters - in - supports_refutations - && level >= first_level_for_skip_list_storage period head_level + let should_store_skip_list_cells ~head_level = + if not supports_refutations then fun ~level:_ -> false + else + let profile_ctxt = Node_context.get_profile_ctxt ctxt in + let period = + get_storage_period + profile_ctxt + proto_parameters + head_level + first_seen_level + + skip_list_offset proto_parameters + in + let first_level = first_level_for_skip_list_storage period head_level in + fun ~level -> level >= first_level in let rec do_clean_up last_processed_level head_level = let last_level = target_level head_level in + let should_store_skip_list_cells = + should_store_skip_list_cells ~head_level + in let rec clean_up_at_level level = if level > last_level then return_unit else @@ -1202,14 +1208,18 @@ let clean_up_store ctxt cctxt ~last_processed_level ~first_seen_level head_level Handler.remove_old_level_stored_data proto_parameters ctxt level in let* () = - if should_store_skip_list_cells ~head_level ~level then + if should_store_skip_list_cells ~level then store_skip_list_cells ~level else return_unit in let* () = - let last_processed_level_store = Store.last_processed_level store in Store.Last_processed_level.save last_processed_level_store level in + let*! () = + if Int32.to_int level mod 1000 = 0 then + Event.emit_catching_up ~current_level:level + else Lwt.return_unit + in clean_up_at_level (Int32.succ level) in (* Clean up from [last_processed_level] to [last_level]. *) @@ -1221,9 +1231,23 @@ let clean_up_store ctxt cctxt ~last_processed_level ~first_seen_level head_level in let new_head_level = header.Block_header.level in if new_head_level > head_level then do_clean_up last_level new_head_level - else return_unit + else + let*! () = Event.emit_end_catchup () in + return_unit + in + let*! () = + let levels_to_clean_up = + Int32.(succ @@ sub head_level last_processed_level) + in + if levels_to_clean_up > 0l then + Event.emit_start_catchup + ~start_level:last_processed_level + ~end_level:head_level + ~levels_to_clean_up + else Lwt.return_unit in - do_clean_up last_processed_level head_level + let* () = do_clean_up last_processed_level head_level in + return_unit (* FIXME: https://gitlab.com/tezos/tezos/-/issues/3605 Improve general architecture, handle L1 disconnection etc @@ -1500,7 +1524,7 @@ let run ~data_dir ~configuration_override = match last_processed_level with | None -> (* there's nothing to clean up *) return_unit | Some last_processed_level -> - clean_up_store + clean_up_store_and_catch_up ctxt cctxt ~last_processed_level diff --git a/src/bin_dal_node/event.ml b/src/bin_dal_node/event.ml index 2955718e7d4a38297635f9163d3d7bab08bdd268..7e5227e1012491a8fcb5a9fa781587d1878246a2 100644 --- a/src/bin_dal_node/event.ml +++ b/src/bin_dal_node/event.ml @@ -885,6 +885,34 @@ open struct ("published_level", Data_encoding.int32) ("slot_index", Data_encoding.int31) ("shard_index", Data_encoding.int31) + + let start_catchup = + declare_3 + ~section + ~name:"start_catchup" + ~msg: + "catching up to level {end_level}, from last processed level \ + {start_level} (that is, {levels_to_clean_up} levels to process)" + ~level:Notice + ("start_level", Data_encoding.int32) + ("end_level", Data_encoding.int32) + ("levels_to_clean_up", Data_encoding.int32) + + let catching_up = + declare_1 + ~section + ~name:"catching_up" + ~msg:"caught up the store up to level {current_level}" + ~level:Notice + ("current_level", Data_encoding.int32) + + let end_catchup = + declare_0 + ~section + ~name:"end_catchup" + ~msg:"done catching up" + ~level:Notice + () end (* DAL node event emission functions *) @@ -1138,3 +1166,10 @@ let emit_dont_wait__register_trap ~delegate ~published_level ~slot_index emit__dont_wait__use_with_care register_trap (delegate, published_level, slot_index, shard_index) + +let emit_start_catchup ~start_level ~end_level ~levels_to_clean_up = + emit start_catchup (start_level, end_level, levels_to_clean_up) + +let emit_catching_up ~current_level = emit catching_up current_level + +let emit_end_catchup () = emit end_catchup ()