diff --git a/src/proto_alpha/bin_sc_rollup_node/commitment.ml b/src/proto_alpha/bin_sc_rollup_node/commitment.ml index a246eb5dd4ff80a927d0f56cc46fc020b801a7cb..71c6dafce30c2d202a14dd81764eb98d142f4368 100644 --- a/src/proto_alpha/bin_sc_rollup_node/commitment.ml +++ b/src/proto_alpha/bin_sc_rollup_node/commitment.ml @@ -51,6 +51,11 @@ module type Mutable_level_store = node, as only finalized heads are processed to build commitments. *) +(* FIXME: #3203 + + Using these global variables is fragile considering chain + reorganizations and interruptions. We should use a more persistent + representations for this piece of information. *) module Mutable_counter = struct module Make () = struct let x = ref Z.zero diff --git a/src/proto_alpha/bin_sc_rollup_node/daemon.ml b/src/proto_alpha/bin_sc_rollup_node/daemon.ml index 7be120b5f4f25665686c7fc60744f64dd071c8b3..8a4b50cb1ca03fe3367d12fd2374605145c4d795 100644 --- a/src/proto_alpha/bin_sc_rollup_node/daemon.ml +++ b/src/proto_alpha/bin_sc_rollup_node/daemon.ml @@ -89,7 +89,12 @@ module Make (PVM : Pvm.S) = struct the current head, but we still need to ensure that the node only published one commitment per block. *) let* () = Components.Commitment.publish_commitment node_ctxt store in - Components.Commitment.cement_commitment_if_possible node_ctxt store head + let* () = + Components.Commitment.cement_commitment_if_possible node_ctxt store head + in + when_ finalized (fun () -> + let*! () = Layer1.mark_processed_head store head in + return ()) (* [on_layer_1_chain_event node_ctxt store chain_event old_heads] processes a list of heads, coming from either a list of [old_heads] or from the current @@ -187,9 +192,12 @@ module Make (PVM : Pvm.S) = struct @@ iter_stream layer_1_chain_events @@ on_layer_1_chain_event node_ctxt store - let install_finalizer store rpc_server = + let install_finalizer store rpc_server heads stopper = let open Lwt_syntax in Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun exit_status -> + stopper () ; + let* () = Lwt_stream.closed heads in + let* () = Layer1.shutdown store in let* () = Components.RPC_server.shutdown rpc_server in let* () = Store.close store in let* () = Event.shutdown_node exit_status in @@ -201,13 +209,13 @@ module Make (PVM : Pvm.S) = struct let* rpc_server = Components.RPC_server.start node_ctxt store configuration in - let* tezos_heads = + let* tezos_heads, stopper = Layer1.start configuration node_ctxt.Node_context.cctxt store in let*! () = Inbox.start () in let*! () = Components.Commitment.start () in - let _ = install_finalizer store rpc_server in + let _ = install_finalizer store rpc_server tezos_heads stopper in let*! () = Event.node_is_ready ~rpc_addr:configuration.rpc_addr diff --git a/src/proto_alpha/bin_sc_rollup_node/layer1.ml b/src/proto_alpha/bin_sc_rollup_node/layer1.ml index 75374bc0ffd876117bc155ccc432a0aad120c018..ee02f32af2899e053e252d50d03d8e1e57efecc0 100644 --- a/src/proto_alpha/bin_sc_rollup_node/layer1.ml +++ b/src/proto_alpha/bin_sc_rollup_node/layer1.ml @@ -94,9 +94,29 @@ module State = struct let value_encoding = head_encoding end) - end - let already_seen = Store.Blocks.mem + module ProcessedHashes = Store.Make_append_only_map (struct + let path = ["tezos"; "processed_blocks"] + + let keep_last_n_entries_in_memory = reorganization_window_length + + type key = block_hash + + let string_of_key = Block_hash.to_b58check + + type value = unit + + let value_encoding = Data_encoding.unit + end) + + module LastProcessedHead = Store.Make_mutable_value (struct + let path = ["tezos"; "processed_head"] + + type value = head + + let value_encoding = head_encoding + end) + end let last_seen_head = Store.Head.find @@ -105,6 +125,15 @@ module State = struct let store_block = Store.Blocks.add let block_of_hash = Store.Blocks.get + + let mark_processed_head store (Head {hash; _} as head) = + let open Lwt_syntax in + let* () = Store.ProcessedHashes.add store hash () in + Store.LastProcessedHead.set store head + + let is_processed = Store.ProcessedHashes.mem + + let last_processed_head = Store.LastProcessedHead.find end (** @@ -236,7 +265,7 @@ let catch_up cctxt store chain last_seen_head new_head = (* We have reconnected to the last seen head. *) Lwt.return (ancestor_hash, [same_branch new_head heads]) else - State.already_seen store ancestor_hash >>= function + State.is_processed store ancestor_hash >>= function | true -> (* We have reconnected to a previously known head. [new_head] and [last_seen_head] are not the same branch. *) @@ -274,8 +303,10 @@ let chain_events cctxt store chain = let*! () = List.iter_s (store_chain_event store base) events in Lwt.return events in - let+ heads, _ = Tezos_shell_services.Monitor_services.heads cctxt chain in - Lwt_stream.map_list_s on_head heads + let+ heads, stopper = + Tezos_shell_services.Monitor_services.heads cctxt chain + in + (Lwt_stream.map_list_s on_head heads, stopper) let check_sc_rollup_address_exists sc_rollup_address (cctxt : Protocol_client_context.full) = @@ -324,9 +355,9 @@ let start configuration (cctxt : Protocol_client_context.full) store = let* () = check_sc_rollup_address_exists configuration.sc_rollup_address cctxt in - let* event_stream = chain_events cctxt store `Main in + let* event_stream, stopper = chain_events cctxt store `Main in let* info = gather_info cctxt configuration.sc_rollup_address in - return (discard_pre_origination_blocks info event_stream) + return (discard_pre_origination_blocks info event_stream, stopper) let current_head_hash store = let open Lwt_syntax in @@ -350,3 +381,23 @@ let processed = function | SameBranch {new_head; intermediate_heads} -> List.iter_s processed_head (intermediate_heads @ [new_head]) | Rollback {new_head} -> processed_head new_head + +let mark_processed_head store head = State.mark_processed_head store head + +(* We forget about the last seen heads that are not processed so that + the rollup node can process them when restarted. Notice that this + does prevent skipping heads when the node is interrupted in a bad + way. *) + +(* FIXME: https://gitlab.com/tezos/tezos/-/issues/3205 + + More generally, The rollup node should be able to restart properly + after an abnormal interruption at every point of its process. + Currently, the state is not persistent enough and the processing is + not idempotent enough to achieve that property. *) +let shutdown store = + let open Lwt_syntax in + let* last_processed_head = State.last_processed_head store in + match last_processed_head with + | None -> return_unit + | Some head -> State.set_new_head store head diff --git a/src/proto_alpha/bin_sc_rollup_node/layer1.mli b/src/proto_alpha/bin_sc_rollup_node/layer1.mli index 27100c6a4030d65e41a1a81d29df2ac6a64b5afe..8ea49d73bcde3c5485be795557335bea38cc813d 100644 --- a/src/proto_alpha/bin_sc_rollup_node/layer1.mli +++ b/src/proto_alpha/bin_sc_rollup_node/layer1.mli @@ -54,7 +54,7 @@ val start : Configuration.t -> Protocol_client_context.full -> Store.t -> - chain_event Lwt_stream.t tzresult Lwt.t + (chain_event Lwt_stream.t * RPC_context.stopper) tzresult Lwt.t (** [current_head_hash store] is the current hash of the head of the Tezos chain as far as the smart-contract rollup node knows from the @@ -77,3 +77,11 @@ val genesis_hash : Block_hash.t (** [processed chain_event] emits a log event to officialize the processing of some layer 1 [chain_event]. *) val processed : chain_event -> unit Lwt.t + +(** [mark_process_head store head] remembers that the [head] + is processed. The system should not have to come back to + it. *) +val mark_processed_head : Store.t -> head -> unit Lwt.t + +(** [shutdown store] properly shut the layer 1 down. *) +val shutdown : Store.t -> unit Lwt.t diff --git a/src/proto_alpha/bin_sc_rollup_node/store.ml b/src/proto_alpha/bin_sc_rollup_node/store.ml index e49a8b3d31e57615da1d07e679813e6e1662bb0d..b902beb9bd17fb596ebe1b9122e223a568a9d610 100644 --- a/src/proto_alpha/bin_sc_rollup_node/store.ml +++ b/src/proto_alpha/bin_sc_rollup_node/store.ml @@ -43,6 +43,8 @@ let load configuration = let* repo = IStore.Repo.v (Irmin_pack.config configuration.data_dir) in IStore.main repo +let flush store = IStore.flush (IStore.repo store) + let close store = IStore.Repo.close (IStore.repo store) let info message = @@ -107,15 +109,25 @@ struct let add store key value = let open Lwt_syntax in - let* already_exists = mem store key in + let* existing_value = find store key in let full_path = String.concat "/" (P.path @ [P.string_of_key key]) in - if already_exists then - Stdlib.failwith (Printf.sprintf "Key %s already exists" full_path) ; - let encoded_value = - Data_encoding.Binary.to_bytes_exn P.value_encoding value - in - let info () = info full_path in - IStore.set_exn ~info store (make_key key) encoded_value + let encode v = Data_encoding.Binary.to_bytes_exn P.value_encoding v in + let encoded_value = encode value in + match existing_value with + | None -> + let info () = info full_path in + IStore.set_exn ~info store (make_key key) encoded_value + | Some existing_value -> + (* To be robust to interruption in the middle of processes, + we accept to redo some work when we restart the node. + Hence, it is fine to insert twice the same value for a + given value. *) + if not (Bytes.equal (encode existing_value) encoded_value) then + Stdlib.failwith + (Printf.sprintf + "Key %s already exists with a different value" + full_path) + else return_unit end module Make_mutable_value (P : sig diff --git a/src/proto_alpha/lib_protocol/test/pbt/test_refutation_game.ml b/src/proto_alpha/lib_protocol/test/pbt/test_refutation_game.ml index 91d787b235d817f25438da593bdb20aa4b98f91f..229943e855e5f859ec24217f1105031870c1a83a 100644 --- a/src/proto_alpha/lib_protocol/test/pbt/test_refutation_game.ml +++ b/src/proto_alpha/lib_protocol/test/pbt/test_refutation_game.ml @@ -574,25 +574,16 @@ module Strategies (PVM : TestPVM with type hash = State_hash.t) = struct in let* outcome = let rec loop game refuter_move = - let player = if refuter_move then "refuter" else "defender" in let* move = if refuter_move then refuter_client.next_move game else defender_client.next_move game in match move with - | None -> - Printf.eprintf "@[No move from %s@]" player ; - return (if refuter_move then Defender_wins else Refuter_wins) + | None -> return (if refuter_move then Defender_wins else Refuter_wins) | Some move -> ( - Format.eprintf - "@[Move from %s is %a@]@." - player - Game.pp_refutation - move ; let* game_result = Game.play game move in match game_result with | Either.Left outcome -> - Format.eprintf "@[%a@]@." Game.pp_outcome outcome ; return (loser_to_outcome_for_tests outcome.loser alice_is_refuter) | Either.Right game -> loop game (not refuter_move)) diff --git a/tezt/tests/sc_rollup.ml b/tezt/tests/sc_rollup.ml index 60d0509e9789d61bf5e14024f672dc43143acb75..87f9a48766e72407c3405003d0ed125cf1c3352e 100644 --- a/tezt/tests/sc_rollup.ml +++ b/tezt/tests/sc_rollup.ml @@ -1159,7 +1159,7 @@ let commitment_stored _protocol sc_rollup_node sc_rollup_address _node client = (Check.option Check.int) ~error_msg: "Number of messages processed by commitment is different from the \ - number of messages expected (%L = %R)") ; + number of messages expected (%L expected <> %R processed)") ; let* published_commitment = Sc_rollup_client.last_published_commitment ~hooks sc_rollup_client in