From 0d79c072279bfd200b5662d10139884aff2d9259 Mon Sep 17 00:00:00 2001 From: Valentin Chaboche Date: Mon, 29 Sep 2025 15:41:37 +0200 Subject: [PATCH 1/9] Baker/Tezt: test with no LB vote --- tezt/tests/liquidity_baking_per_block_votes.ml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tezt/tests/liquidity_baking_per_block_votes.ml b/tezt/tests/liquidity_baking_per_block_votes.ml index ebf615d96263..4eb11b9a3add 100644 --- a/tezt/tests/liquidity_baking_per_block_votes.ml +++ b/tezt/tests/liquidity_baking_per_block_votes.ml @@ -175,6 +175,18 @@ let test_all_per_block_votes = let* baker = Agnostic_baker.init node client in + Log.info "Test no voting file" ; + + let* baker = + Lwt.catch + (fun () -> + let* _ = run_vote_file baker in + Test.fail + "The baker was supposed to fail, it's missing the liquidity baking \ + vote.") + (fun _exn -> return baker) + in + Log.info "Test [off] vote file" ; let* baker = let votefile = Agnostic_baker.liquidity_baking_votefile Off in -- GitLab From f414776eec62f49d1cfc9757c868dfd9e4ac3a8e Mon Sep 17 00:00:00 2001 From: Valentin Chaboche Date: Tue, 30 Sep 2025 11:38:05 +0200 Subject: [PATCH 2/9] Baker/Tezt: regression CLI test --- tezt/lib_tezos/agnostic_baker.ml | 35 ++++++++++++++++++++---- tezt/lib_tezos/agnostic_baker.mli | 12 ++++++++- tezt/tests/agnostic_baker_test.ml | 44 ++++++++++++++++++++++++++++++- 3 files changed, 84 insertions(+), 7 deletions(-) diff --git a/tezt/lib_tezos/agnostic_baker.ml b/tezt/lib_tezos/agnostic_baker.ml index 6fed64b76c65..a71a01cbe08b 100644 --- a/tezt/lib_tezos/agnostic_baker.ml +++ b/tezt/lib_tezos/agnostic_baker.ml @@ -244,8 +244,8 @@ let run_args agnostic_baker = @ dal_node_timeout_percentage @ state_recorder @ node_version_check_bypass @ node_version_allowed @ keep_alive -let run ?env ?event_level ?event_sections_levels ?(extra_arguments = []) - (agnostic_baker : t) = +let raw ?env ?event_level ?event_sections_levels ~arguments (agnostic_baker : t) + = (match agnostic_baker.status with | Not_running -> () | Running _ -> @@ -255,17 +255,42 @@ let run ?env ?event_level ?event_sections_levels ?(extra_arguments = []) trigger_ready agnostic_baker None ; unit in + Log.info + "Starting baker %s with args: %s" + agnostic_baker.name + (String.concat " " arguments) ; run ?env ?event_level ?event_sections_levels agnostic_baker {ready = false} - (run_args agnostic_baker @ extra_arguments) + arguments ~on_terminate ?runner:agnostic_baker.persistent_state.runner -let spawn_run ?env (agnostic_baker : t) = +let run ?env ?event_level ?event_sections_levels ?(extra_arguments = []) + (agnostic_baker : t) = + raw + ?env + ?event_level + ?event_sections_levels + ~arguments:(run_args agnostic_baker @ extra_arguments) + agnostic_baker + +let raw ~arguments (agnostic_baker : t) = raw agnostic_baker ~arguments + +let spawn_raw ~arguments (agnostic_baker : t) = + (match agnostic_baker.status with + | Not_running -> () + | Running _ -> + Test.fail "agnostic_baker %s is already running" agnostic_baker.name) ; + Process.spawn + ?runner:agnostic_baker.persistent_state.runner + agnostic_baker.path + arguments + +let spawn_run ?env ?(extra_arguments = []) (agnostic_baker : t) = (match agnostic_baker.status with | Not_running -> () | Running _ -> @@ -274,7 +299,7 @@ let spawn_run ?env (agnostic_baker : t) = ?env ?runner:agnostic_baker.persistent_state.runner agnostic_baker.path - (run_args agnostic_baker) + (run_args agnostic_baker @ extra_arguments) let check_event ?where agnostic_baker name promise = let* result = promise in diff --git a/tezt/lib_tezos/agnostic_baker.mli b/tezt/lib_tezos/agnostic_baker.mli index a8b95c9e5b90..15921699f036 100644 --- a/tezt/lib_tezos/agnostic_baker.mli +++ b/tezt/lib_tezos/agnostic_baker.mli @@ -57,7 +57,17 @@ val run : unit Lwt.t (** Spawn [octez-baker run] similarly to {!run} but returns the process. *) -val spawn_run : ?env:string String_map.t -> t -> Process.t +val spawn_run : + ?env:string String_map.t -> ?extra_arguments:string list -> t -> Process.t + +(** Spawn [octez-baker]. + + Similar to {!run} but takes all its arguments from [arguments]. *) +val raw : arguments:string list -> t -> unit Lwt.t + +(** Spawn [octez-baker] similarly to {!spawn_run} but doesn't have any arguments + other than [arguments]. *) +val spawn_raw : arguments:string list -> t -> Process.t (** Liquidity baking vote values. *) type liquidity_baking_vote = Off | On | Pass diff --git a/tezt/tests/agnostic_baker_test.ml b/tezt/tests/agnostic_baker_test.ml index 6b57df0768a7..8a2731b6c761 100644 --- a/tezt/tests/agnostic_baker_test.ml +++ b/tezt/tests/agnostic_baker_test.ml @@ -269,7 +269,49 @@ let test_keep_alive = and* () = wait_period_status in unit -let register ~protocols = test_keep_alive protocols +let test_cli = + Protocol.register_test + ~__FILE__ + ~title:"Agnostic baker CLI" + ~tags:[team; "sandbox"; "agnostic"; "baker"; "cli"] + ~uses:(fun _protocol -> [Constant.octez_agnostic_baker]) + @@ fun protocol -> + let* node, client = Client.init_with_protocol `Client ~protocol () in + let baker = Agnostic_baker.create node client in + let check_process_error expected_err p = + let* err = Process.check_and_read_stderr ~expect_failure:true p in + Check.(err =~ rex expected_err) ~error_msg:"Expected error %R but got %L" ; + unit + in + + let arguments = + [ + "--endpoint"; + Node.rpc_endpoint node; + "--base-dir"; + Client.base_dir client; + "run"; + "with"; + "local"; + "node"; + Node.data_dir node; + ] + in + let p = Agnostic_baker.spawn_raw ~arguments baker in + let* () = check_process_error ".*Missing liquidity baking toggle vote.*" p in + + let arguments = arguments @ ["--liquidity-baking-toggle-vote"; "pass"] in + let p = Agnostic_baker.spawn_raw ~arguments baker in + let* () = check_process_error "Please connect a running DAL node using" p in + + let arguments = arguments @ ["--without-dal"] in + let* () = Agnostic_baker.wait_for_ready baker + and* () = Agnostic_baker.raw ~arguments baker in + unit + +let register ~protocols = + test_keep_alive protocols ; + test_cli protocols let register_migration ~migrate_from ~migrate_to = migrate ~migrate_from ~migrate_to ~use_remote_signer:false ; -- GitLab From 50287cc1b511e34918df564de7c926d21ef89353 Mon Sep 17 00:00:00 2001 From: Valentin Chaboche Date: Wed, 15 Oct 2025 14:03:46 +0200 Subject: [PATCH 3/9] Revert "Accuser: Move next_protocol filtering to worker_loop" This reverts commit 1901fcc22e0f24630ab1d7d26f0368df32d3f980. --- .../lib_delegate/client_baking_blocks.ml | 4 +- .../lib_delegate/client_baking_blocks.mli | 1 + .../client_baking_denunciation.ml | 75 +++++++++---------- .../lib_delegate/client_daemon.ml | 6 +- .../lib_delegate/delegate_events.ml | 8 -- .../lib_delegate/client_baking_blocks.ml | 4 +- .../lib_delegate/client_baking_blocks.mli | 1 + .../client_baking_denunciation.ml | 75 +++++++++---------- .../lib_delegate/client_daemon.ml | 6 +- .../lib_delegate/delegate_events.ml | 8 -- .../lib_delegate/client_baking_blocks.ml | 4 +- .../lib_delegate/client_baking_blocks.mli | 1 + .../client_baking_denunciation.ml | 75 +++++++++---------- src/proto_alpha/lib_delegate/client_daemon.ml | 6 +- .../lib_delegate/delegate_events.ml | 8 -- tezt/tests/double_consensus.ml | 60 --------------- tezt/tests/main.ml | 2 - 17 files changed, 135 insertions(+), 209 deletions(-) diff --git a/src/proto_022_PsRiotum/lib_delegate/client_baking_blocks.ml b/src/proto_022_PsRiotum/lib_delegate/client_baking_blocks.ml index 35d43f6d97b7..18f7dd7efc61 100644 --- a/src/proto_022_PsRiotum/lib_delegate/client_baking_blocks.ml +++ b/src/proto_022_PsRiotum/lib_delegate/client_baking_blocks.ml @@ -152,10 +152,10 @@ module Block_seen_event = struct module Event = Internal_event.Make (Definition) end -let monitor_applied_blocks cctxt ?chains ?protocols () = +let monitor_applied_blocks cctxt ?chains ?protocols ~next_protocols () = let open Lwt_result_syntax in let* block_stream, stop = - Monitor_services.applied_blocks cctxt ?chains ?protocols () + Monitor_services.applied_blocks cctxt ?chains ?protocols ?next_protocols () in return ( Lwt_stream.map_s diff --git a/src/proto_022_PsRiotum/lib_delegate/client_baking_blocks.mli b/src/proto_022_PsRiotum/lib_delegate/client_baking_blocks.mli index d9c50296db14..8426c19b8994 100644 --- a/src/proto_022_PsRiotum/lib_delegate/client_baking_blocks.mli +++ b/src/proto_022_PsRiotum/lib_delegate/client_baking_blocks.mli @@ -49,6 +49,7 @@ val monitor_applied_blocks : #Protocol_client_context.rpc_context -> ?chains:Chain_services.chain list -> ?protocols:Protocol_hash.t list -> + next_protocols:Protocol_hash.t list option -> unit -> (block_info tzresult Lwt_stream.t * Tezos_rpc.Context.stopper) tzresult Lwt.t diff --git a/src/proto_022_PsRiotum/lib_delegate/client_baking_denunciation.ml b/src/proto_022_PsRiotum/lib_delegate/client_baking_denunciation.ml index 59a6349541f1..1972f8c4c78f 100644 --- a/src/proto_022_PsRiotum/lib_delegate/client_baking_denunciation.ml +++ b/src/proto_022_PsRiotum/lib_delegate/client_baking_denunciation.ml @@ -536,38 +536,42 @@ let cleanup_old_operations state = - Check that every baker (pre)attested only once at the block's level and round *) let process_new_block (cctxt : #Protocol_client_context.full) state - {hash; chain_id; level; _} = + {hash; chain_id; level; protocol; next_protocol; _} = let open Lwt_result_syntax in - let*! () = Events.(emit accuser_saw_block) (level, hash) in - let chain = `Hash chain_id in - let block = `Hash (hash, 0) in - state.highest_level_encountered <- - Raw_level.max level state.highest_level_encountered ; - (* Processing blocks *) - let* () = - let*! block_info = Alpha_block_services.info cctxt ~chain ~block () in - match block_info with - | Ok block_info -> ( - let* () = process_block cctxt state block_info in - (* Processing (pre)attestations in the block *) - match block_info.operations with - | consensus_ops :: _ -> - let packed_op {Alpha_block_services.shell; protocol_data; _} = - {shell; protocol_data} - in - process_operations cctxt state consensus_ops ~packed_op chain_id - | _ -> - (* Should not happen as a block should contain 4 lists of - operations, the first list being dedicated to consensus - operations. *) - let*! () = Events.(emit fetch_operations_error hash) in - return_unit) - | Error errs -> - let*! () = Events.(emit accuser_block_error) (hash, errs) in - return_unit - in - cleanup_old_operations state ; - return_unit + if Protocol_hash.(protocol <> next_protocol) then + let*! () = Events.(emit protocol_change_detected) () in + return_unit + else + let*! () = Events.(emit accuser_saw_block) (level, hash) in + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + state.highest_level_encountered <- + Raw_level.max level state.highest_level_encountered ; + (* Processing blocks *) + let* () = + let*! block_info = Alpha_block_services.info cctxt ~chain ~block () in + match block_info with + | Ok block_info -> ( + let* () = process_block cctxt state block_info in + (* Processing (pre)attestations in the block *) + match block_info.operations with + | consensus_ops :: _ -> + let packed_op {Alpha_block_services.shell; protocol_data; _} = + {shell; protocol_data} + in + process_operations cctxt state consensus_ops ~packed_op chain_id + | _ -> + (* Should not happen as a block should contain 4 lists of + operations, the first list being dedicated to consensus + operations. *) + let*! () = Events.(emit fetch_operations_error hash) in + return_unit) + | Error errs -> + let*! () = Events.(emit accuser_block_error) (hash, errs) in + return_unit + in + cleanup_old_operations state ; + return_unit let process_new_block cctxt state bi = let open Lwt_syntax in @@ -659,13 +663,8 @@ let create (cctxt : #Protocol_client_context.full) ?canceler ~preserved_levels tzfail Baking_errors.Node_connection_lost | `Block (Some (Ok bi)) -> last_get_block := None ; - (* When protocol changes, we stop current accuser process. *) - if Protocol_hash.(bi.next_protocol <> Protocol.hash) then - let*! () = B_Events.(emit daemon_stop) name in - return_unit - else - let*! () = process_new_block cctxt state bi in - worker_loop () + let*! () = process_new_block cctxt state bi in + worker_loop () | `Operations None -> (* restart a new operations monitor stream *) last_get_ops := None ; diff --git a/src/proto_022_PsRiotum/lib_delegate/client_daemon.ml b/src/proto_022_PsRiotum/lib_delegate/client_daemon.ml index d589e9dc4d2b..ce41bc0768e2 100644 --- a/src/proto_022_PsRiotum/lib_delegate/client_daemon.ml +++ b/src/proto_022_PsRiotum/lib_delegate/client_daemon.ml @@ -176,7 +176,11 @@ module Accuser = struct Protocol.hash in let* valid_blocks_stream, _ = - Client_baking_blocks.monitor_applied_blocks cctxt ~chains:[chain] () + Client_baking_blocks.monitor_applied_blocks + ~next_protocols:(Some [Protocol.hash]) + cctxt + ~chains:[chain] + () in let canceler = Lwt_canceler.create () in let _ = diff --git a/src/proto_022_PsRiotum/lib_delegate/delegate_events.ml b/src/proto_022_PsRiotum/lib_delegate/delegate_events.ml index 8bb0665ea09b..1efa1dd56e68 100644 --- a/src/proto_022_PsRiotum/lib_delegate/delegate_events.ml +++ b/src/proto_022_PsRiotum/lib_delegate/delegate_events.ml @@ -253,12 +253,4 @@ module Baking_scheduling = struct ~name:"daemon_start" ~msg:"starting {worker} daemon" ("worker", Data_encoding.string) - - let daemon_stop = - declare_1 - ~section - ~level:Notice - ~name:"daemon_stop" - ~msg:"stopping {worker} daemon" - ("worker", Data_encoding.string) end diff --git a/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.ml b/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.ml index 35d43f6d97b7..18f7dd7efc61 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.ml @@ -152,10 +152,10 @@ module Block_seen_event = struct module Event = Internal_event.Make (Definition) end -let monitor_applied_blocks cctxt ?chains ?protocols () = +let monitor_applied_blocks cctxt ?chains ?protocols ~next_protocols () = let open Lwt_result_syntax in let* block_stream, stop = - Monitor_services.applied_blocks cctxt ?chains ?protocols () + Monitor_services.applied_blocks cctxt ?chains ?protocols ?next_protocols () in return ( Lwt_stream.map_s diff --git a/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.mli b/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.mli index d9c50296db14..8426c19b8994 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.mli @@ -49,6 +49,7 @@ val monitor_applied_blocks : #Protocol_client_context.rpc_context -> ?chains:Chain_services.chain list -> ?protocols:Protocol_hash.t list -> + next_protocols:Protocol_hash.t list option -> unit -> (block_info tzresult Lwt_stream.t * Tezos_rpc.Context.stopper) tzresult Lwt.t diff --git a/src/proto_023_PtSeouLo/lib_delegate/client_baking_denunciation.ml b/src/proto_023_PtSeouLo/lib_delegate/client_baking_denunciation.ml index d2b1a462feb7..0f8d983e563c 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/client_baking_denunciation.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/client_baking_denunciation.ml @@ -493,38 +493,42 @@ let cleanup_old_operations state = - Check that every baker (pre)attested only once at the block's level and round *) let process_new_block (cctxt : #Protocol_client_context.full) state - {hash; chain_id; level; _} = + {hash; chain_id; level; protocol; next_protocol; _} = let open Lwt_result_syntax in - let*! () = Events.(emit accuser_saw_block) (level, hash) in - let chain = `Hash chain_id in - let block = `Hash (hash, 0) in - state.highest_level_encountered <- - Raw_level.max level state.highest_level_encountered ; - (* Processing blocks *) - let* () = - let*! block_info = Alpha_block_services.info cctxt ~chain ~block () in - match block_info with - | Ok block_info -> ( - let* () = process_block cctxt state block_info in - (* Processing (pre)attestations in the block *) - match block_info.operations with - | consensus_ops :: _ -> - let packed_op {Alpha_block_services.shell; protocol_data; _} = - {shell; protocol_data} - in - process_operations cctxt state consensus_ops ~packed_op chain_id - | _ -> - (* Should not happen as a block should contain 4 lists of - operations, the first list being dedicated to consensus - operations. *) - let*! () = Events.(emit fetch_operations_error hash) in - return_unit) - | Error errs -> - let*! () = Events.(emit accuser_block_error) (hash, errs) in - return_unit - in - cleanup_old_operations state ; - return_unit + if Protocol_hash.(protocol <> next_protocol) then + let*! () = Events.(emit protocol_change_detected) () in + return_unit + else + let*! () = Events.(emit accuser_saw_block) (level, hash) in + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + state.highest_level_encountered <- + Raw_level.max level state.highest_level_encountered ; + (* Processing blocks *) + let* () = + let*! block_info = Alpha_block_services.info cctxt ~chain ~block () in + match block_info with + | Ok block_info -> ( + let* () = process_block cctxt state block_info in + (* Processing (pre)attestations in the block *) + match block_info.operations with + | consensus_ops :: _ -> + let packed_op {Alpha_block_services.shell; protocol_data; _} = + {shell; protocol_data} + in + process_operations cctxt state consensus_ops ~packed_op chain_id + | _ -> + (* Should not happen as a block should contain 4 lists of + operations, the first list being dedicated to consensus + operations. *) + let*! () = Events.(emit fetch_operations_error hash) in + return_unit) + | Error errs -> + let*! () = Events.(emit accuser_block_error) (hash, errs) in + return_unit + in + cleanup_old_operations state ; + return_unit let process_new_block cctxt state bi = let open Lwt_syntax in @@ -617,13 +621,8 @@ let create (cctxt : #Protocol_client_context.full) ?canceler ~preserved_levels tzfail Baking_errors.Node_connection_lost | `Block (Some (Ok bi)) -> last_get_block := None ; - (* When protocol changes, we stop current accuser process. *) - if Protocol_hash.(bi.next_protocol <> Protocol.hash) then - let*! () = B_Events.(emit daemon_stop) name in - return_unit - else - let*! () = process_new_block cctxt state bi in - worker_loop () + let*! () = process_new_block cctxt state bi in + worker_loop () | `Operations None -> (* restart a new operations monitor stream *) last_get_ops := None ; diff --git a/src/proto_023_PtSeouLo/lib_delegate/client_daemon.ml b/src/proto_023_PtSeouLo/lib_delegate/client_daemon.ml index 714e0c6b3ce5..b663b248a416 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/client_daemon.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/client_daemon.ml @@ -176,7 +176,11 @@ module Accuser = struct Protocol.hash in let* valid_blocks_stream, _ = - Client_baking_blocks.monitor_applied_blocks cctxt ~chains:[chain] () + Client_baking_blocks.monitor_applied_blocks + ~next_protocols:(Some [Protocol.hash]) + cctxt + ~chains:[chain] + () in let canceler = Lwt_canceler.create () in let _ = diff --git a/src/proto_023_PtSeouLo/lib_delegate/delegate_events.ml b/src/proto_023_PtSeouLo/lib_delegate/delegate_events.ml index dde15e9acbcb..ff89e553760a 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/delegate_events.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/delegate_events.ml @@ -275,12 +275,4 @@ module Baking_scheduling = struct ~name:"daemon_start" ~msg:"starting {worker} daemon" ("worker", Data_encoding.string) - - let daemon_stop = - declare_1 - ~section - ~level:Notice - ~name:"daemon_stop" - ~msg:"stopping {worker} daemon" - ("worker", Data_encoding.string) end diff --git a/src/proto_alpha/lib_delegate/client_baking_blocks.ml b/src/proto_alpha/lib_delegate/client_baking_blocks.ml index 35d43f6d97b7..18f7dd7efc61 100644 --- a/src/proto_alpha/lib_delegate/client_baking_blocks.ml +++ b/src/proto_alpha/lib_delegate/client_baking_blocks.ml @@ -152,10 +152,10 @@ module Block_seen_event = struct module Event = Internal_event.Make (Definition) end -let monitor_applied_blocks cctxt ?chains ?protocols () = +let monitor_applied_blocks cctxt ?chains ?protocols ~next_protocols () = let open Lwt_result_syntax in let* block_stream, stop = - Monitor_services.applied_blocks cctxt ?chains ?protocols () + Monitor_services.applied_blocks cctxt ?chains ?protocols ?next_protocols () in return ( Lwt_stream.map_s diff --git a/src/proto_alpha/lib_delegate/client_baking_blocks.mli b/src/proto_alpha/lib_delegate/client_baking_blocks.mli index d9c50296db14..8426c19b8994 100644 --- a/src/proto_alpha/lib_delegate/client_baking_blocks.mli +++ b/src/proto_alpha/lib_delegate/client_baking_blocks.mli @@ -49,6 +49,7 @@ val monitor_applied_blocks : #Protocol_client_context.rpc_context -> ?chains:Chain_services.chain list -> ?protocols:Protocol_hash.t list -> + next_protocols:Protocol_hash.t list option -> unit -> (block_info tzresult Lwt_stream.t * Tezos_rpc.Context.stopper) tzresult Lwt.t diff --git a/src/proto_alpha/lib_delegate/client_baking_denunciation.ml b/src/proto_alpha/lib_delegate/client_baking_denunciation.ml index d2b1a462feb7..0f8d983e563c 100644 --- a/src/proto_alpha/lib_delegate/client_baking_denunciation.ml +++ b/src/proto_alpha/lib_delegate/client_baking_denunciation.ml @@ -493,38 +493,42 @@ let cleanup_old_operations state = - Check that every baker (pre)attested only once at the block's level and round *) let process_new_block (cctxt : #Protocol_client_context.full) state - {hash; chain_id; level; _} = + {hash; chain_id; level; protocol; next_protocol; _} = let open Lwt_result_syntax in - let*! () = Events.(emit accuser_saw_block) (level, hash) in - let chain = `Hash chain_id in - let block = `Hash (hash, 0) in - state.highest_level_encountered <- - Raw_level.max level state.highest_level_encountered ; - (* Processing blocks *) - let* () = - let*! block_info = Alpha_block_services.info cctxt ~chain ~block () in - match block_info with - | Ok block_info -> ( - let* () = process_block cctxt state block_info in - (* Processing (pre)attestations in the block *) - match block_info.operations with - | consensus_ops :: _ -> - let packed_op {Alpha_block_services.shell; protocol_data; _} = - {shell; protocol_data} - in - process_operations cctxt state consensus_ops ~packed_op chain_id - | _ -> - (* Should not happen as a block should contain 4 lists of - operations, the first list being dedicated to consensus - operations. *) - let*! () = Events.(emit fetch_operations_error hash) in - return_unit) - | Error errs -> - let*! () = Events.(emit accuser_block_error) (hash, errs) in - return_unit - in - cleanup_old_operations state ; - return_unit + if Protocol_hash.(protocol <> next_protocol) then + let*! () = Events.(emit protocol_change_detected) () in + return_unit + else + let*! () = Events.(emit accuser_saw_block) (level, hash) in + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + state.highest_level_encountered <- + Raw_level.max level state.highest_level_encountered ; + (* Processing blocks *) + let* () = + let*! block_info = Alpha_block_services.info cctxt ~chain ~block () in + match block_info with + | Ok block_info -> ( + let* () = process_block cctxt state block_info in + (* Processing (pre)attestations in the block *) + match block_info.operations with + | consensus_ops :: _ -> + let packed_op {Alpha_block_services.shell; protocol_data; _} = + {shell; protocol_data} + in + process_operations cctxt state consensus_ops ~packed_op chain_id + | _ -> + (* Should not happen as a block should contain 4 lists of + operations, the first list being dedicated to consensus + operations. *) + let*! () = Events.(emit fetch_operations_error hash) in + return_unit) + | Error errs -> + let*! () = Events.(emit accuser_block_error) (hash, errs) in + return_unit + in + cleanup_old_operations state ; + return_unit let process_new_block cctxt state bi = let open Lwt_syntax in @@ -617,13 +621,8 @@ let create (cctxt : #Protocol_client_context.full) ?canceler ~preserved_levels tzfail Baking_errors.Node_connection_lost | `Block (Some (Ok bi)) -> last_get_block := None ; - (* When protocol changes, we stop current accuser process. *) - if Protocol_hash.(bi.next_protocol <> Protocol.hash) then - let*! () = B_Events.(emit daemon_stop) name in - return_unit - else - let*! () = process_new_block cctxt state bi in - worker_loop () + let*! () = process_new_block cctxt state bi in + worker_loop () | `Operations None -> (* restart a new operations monitor stream *) last_get_ops := None ; diff --git a/src/proto_alpha/lib_delegate/client_daemon.ml b/src/proto_alpha/lib_delegate/client_daemon.ml index 714e0c6b3ce5..b663b248a416 100644 --- a/src/proto_alpha/lib_delegate/client_daemon.ml +++ b/src/proto_alpha/lib_delegate/client_daemon.ml @@ -176,7 +176,11 @@ module Accuser = struct Protocol.hash in let* valid_blocks_stream, _ = - Client_baking_blocks.monitor_applied_blocks cctxt ~chains:[chain] () + Client_baking_blocks.monitor_applied_blocks + ~next_protocols:(Some [Protocol.hash]) + cctxt + ~chains:[chain] + () in let canceler = Lwt_canceler.create () in let _ = diff --git a/src/proto_alpha/lib_delegate/delegate_events.ml b/src/proto_alpha/lib_delegate/delegate_events.ml index dde15e9acbcb..ff89e553760a 100644 --- a/src/proto_alpha/lib_delegate/delegate_events.ml +++ b/src/proto_alpha/lib_delegate/delegate_events.ml @@ -275,12 +275,4 @@ module Baking_scheduling = struct ~name:"daemon_start" ~msg:"starting {worker} daemon" ("worker", Data_encoding.string) - - let daemon_stop = - declare_1 - ~section - ~level:Notice - ~name:"daemon_stop" - ~msg:"stopping {worker} daemon" - ("worker", Data_encoding.string) end diff --git a/tezt/tests/double_consensus.ml b/tezt/tests/double_consensus.ml index 472d55984ed4..26855adfe26b 100644 --- a/tezt/tests/double_consensus.ml +++ b/tezt/tests/double_consensus.ml @@ -777,63 +777,6 @@ let accuser_processed_block accuser = let daemon_stop accuser = Accuser.wait_for accuser "daemon_stop.v0" (fun _json -> Some ()) -let accusers_migration_test ~migrate_from ~migrate_to = - let parameters = JSON.parse_file (Protocol.parameter_file migrate_to) in - (* Migration level is set arbitrarily *) - let migration_level = JSON.(get "blocks_per_cycle" parameters |> as_int) in - Test.register - ~__FILE__ - ~title: - (Format.asprintf - "accuser works correctly under migration from %s to %s" - (Protocol.tag migrate_from) - (Protocol.tag migrate_to)) - ~tags: - [team; "migration"; Protocol.tag migrate_from; Protocol.tag migrate_to] - ~uses:[Constant.octez_accuser] - @@ fun () -> - let* client, node = - Protocol_migration.user_migratable_node_init ~migration_level ~migrate_to () - in - let* () = Client.activate_protocol ~protocol:migrate_from client in - - Log.info "Initialise accuser" ; - let* accuser = Accuser.init ~event_level:`Debug node in - let accuser_processed_block = accuser_processed_block accuser in - let accuser_stop = daemon_stop accuser in - - Log.info "Bake %d levels" (migration_level - 1) ; - let* () = - repeat (migration_level - 1) (fun () -> - let* () = Client.bake_for_and_wait client in - accuser_processed_block) - in - - Log.info - "Bake one more level to migrate from %s to %s" - (Protocol.tag migrate_from) - (Protocol.tag migrate_to) ; - let* () = Client.bake_for_and_wait client in - - Log.info "After migration, old protocol accuser should have stopped" ; - let* () = accuser_stop in - - Log.info "Bake a few more levels into the new protocol" ; - let* () = - repeat 5 (fun () -> - let* () = - Client.attest_for - ~protocol:migrate_to - ~force:true - ~key:[Constant.bootstrap1.alias] - client - in - let* () = Client.bake_for_and_wait client in - accuser_processed_block) - in - let* () = Accuser.terminate accuser in - unit - let fetch_round ?block client = Client.RPC.call client @@ RPC.get_chain_block_helper_round ?block () @@ -1156,9 +1099,6 @@ let double_preattestation_aggregation_wrong_payload_hash = in unit -let register_migration ~migrate_from ~migrate_to = - accusers_migration_test ~migrate_from ~migrate_to - let register ~protocols = double_attestation_wrong_block_payload_hash protocols ; double_preattestation_wrong_block_payload_hash protocols ; diff --git a/tezt/tests/main.ml b/tezt/tests/main.ml index 8568527d1dc2..3d2ca7be83a2 100644 --- a/tezt/tests/main.ml +++ b/tezt/tests/main.ml @@ -69,7 +69,6 @@ let register_protocol_independent_tests () = let register_protocol_migration_tests () = let migrate_from = Option.get @@ Protocol.previous_protocol migrate_to in Agnostic_baker_test.register_migration ~migrate_from ~migrate_to ; - Double_consensus.register_migration ~migrate_from ~migrate_to ; Mockup.register_constant_migration ~migrate_from ~migrate_to ; Protocol_migration.register ~migrate_from ~migrate_to ; Weeklynet.register () ; @@ -102,7 +101,6 @@ let register_old_protocol_migration_tests () = | None, _ -> () | Some migrate_from, migrate_to -> Agnostic_baker_test.register_migration ~migrate_from ~migrate_to ; - Double_consensus.register_migration ~migrate_from ~migrate_to ; Sc_rollup_migration.register ~migrate_from ~migrate_to ; Dal.register_migration ~migrate_from ~migrate_to) Protocol.all -- GitLab From f097cee64a630b3510b0637301d8a27f752190e4 Mon Sep 17 00:00:00 2001 From: Valentin Chaboche Date: Wed, 15 Oct 2025 15:52:05 +0200 Subject: [PATCH 4/9] Agnostic/Baker: current thread is not optional --- src/lib_agnostic_baker/daemon.ml | 100 ++++++++++++++----------------- 1 file changed, 44 insertions(+), 56 deletions(-) diff --git a/src/lib_agnostic_baker/daemon.ml b/src/lib_agnostic_baker/daemon.ml index 118bacb3ca5f..6e2526efba4d 100644 --- a/src/lib_agnostic_baker/daemon.ml +++ b/src/lib_agnostic_baker/daemon.ml @@ -113,7 +113,7 @@ module Make_daemon (Agent : AGENT) : type state = { node_endpoint : string; - mutable current_baker : baker option; + mutable current_baker : baker; mutable old_baker : baker_to_kill option; keep_alive : bool; command : command; @@ -128,7 +128,7 @@ module Make_daemon (Agent : AGENT) : let open Lwt_result_syntax in let*! result = f () in match result with - | Ok () -> return_unit + | Ok res -> return res | Error (Lost_node_connection :: _ | Cannot_connect_to_node _ :: _) -> let* _level = Utils.retry @@ -143,11 +143,12 @@ module Make_daemon (Agent : AGENT) : retry_on_disconnection ~emit node_addr f | Error trace -> fail trace - (** [run_thread ~protocol_hash ~cancel_promise ~init_sapling_params state] returns the - main running thread for the baker given its protocol [~protocol_hash] and - cancellation [~cancel_promise]. It can optionally initialise sapling parameters, - as requested by [~init_sapling_params]. *) - let run_thread ~protocol_hash ~cancel_promise ~init_sapling_params state = + (** [run_thread ~protocol_hash ~cancel_promise ~init_sapling_params cctxt + command] returns the main running thread for the baker given its protocol + [~protocol_hash] and cancellation [~cancel_promise]. It can optionally + initialise sapling parameters, as requested by [~init_sapling_params]. *) + let run_thread ~protocol_hash ~cancel_promise ~init_sapling_params cctxt + command = let plugin = match Protocol_plugins.proto_plugin_for_protocol @@ -177,11 +178,12 @@ module Make_daemon (Agent : AGENT) : Tezos_sapling.Core.Validator.init_params () else () ; - let agent_thread = Agent.run_command plugin state.cctxt state.command in + let agent_thread = Agent.run_command plugin cctxt command in Lwt.pick [agent_thread; cancel_promise] - (** [spawn_baker protocol_hash] spawns a new baker process for the given [protocol_hash]. *) - let spawn_baker state protocol_hash = + (** [spawn_baker cctxt command protocol_hash] spawns a new baker process for + the given [protocol_hash]. *) + let spawn_baker cctxt command protocol_hash = let open Lwt_result_syntax in let*! () = Events.(emit starting_agent) (Agent.name, protocol_hash) in let cancel_promise, canceller = Lwt.wait () in @@ -190,7 +192,8 @@ module Make_daemon (Agent : AGENT) : ~protocol_hash ~cancel_promise ~init_sapling_params:Agent.init_sapling_params - state + cctxt + command in let*! () = Events.(emit agent_running) (Agent.name, protocol_hash) in return {protocol_hash; process = {thread; canceller}} @@ -201,11 +204,6 @@ module Make_daemon (Agent : AGENT) : let hot_swap_baker ~state ~current_protocol_hash ~next_protocol_hash ~level_to_kill_old_baker = let open Lwt_result_syntax in - let* current_baker = - match state.current_baker with - | Some baker -> return baker - | None -> tzfail (Missing_current_agent Agent.name) - in let next_proto_status = Parameters.protocol_status next_protocol_hash in let*! () = Events.(emit protocol_encountered) (next_proto_status, next_protocol_hash) @@ -215,10 +213,10 @@ module Make_daemon (Agent : AGENT) : (Agent.name, current_protocol_hash, level_to_kill_old_baker) in state.old_baker <- - Some {baker = current_baker; level_to_kill = level_to_kill_old_baker} ; - state.current_baker <- None ; - let* new_baker = spawn_baker state next_protocol_hash in - state.current_baker <- Some new_baker ; + Some + {baker = state.current_baker; level_to_kill = level_to_kill_old_baker} ; + let* new_baker = spawn_baker state.cctxt state.command next_protocol_hash in + state.current_baker <- new_baker ; return_unit (** [maybe_kill_old_baker state node_addr] checks whether the [old_baker] process @@ -313,11 +311,7 @@ module Make_daemon (Agent : AGENT) : ~node_addr [@profiler.record_s {verbosity = Notice} "get_next_protocol_hash"]) in - let* current_protocol_hash = - match state.current_baker with - | None -> tzfail (Missing_current_agent Agent.name) - | Some baker -> return baker.protocol_hash - in + let current_protocol_hash = state.current_baker.protocol_hash in let* () = if not (Protocol_hash.equal current_protocol_hash next_protocol_hash) @@ -344,26 +338,21 @@ module Make_daemon (Agent : AGENT) : it propagates any error that can appear. *) let baker_thread ~state = let open Lwt_result_syntax in - let*! res = - match state.current_baker with - | Some baker -> baker.process.thread - | None -> return_unit - in + let*! res = state.current_baker.process.thread in match res with | Ok () -> return_unit | Error err -> fail (Agent_process_error Agent.name :: err) (* ---- Agnostic Baker Bootstrap ---- *) - (** [may_start_initial_baker state] recursively waits for an [active] protocol - and spawns a baker for it. If the protocol is [frozen] (not [active] anymore), it - waits for a head with an [active] protocol. *) - let may_start_initial_baker state = + (** [may_start_initial_baker cctxt command ~node_addr] recursively waits + for an [active] protocol and spawns a baker for it. If the protocol is + [frozen] (not [active] anymore), it waits for a head with an [active] + protocol. *) + let may_start_initial_baker cctxt command ~node_addr = let open Lwt_result_syntax in let rec may_start ?last_known_proto ~head_stream () = - let* protocol_hash = - Rpc_services.get_next_protocol_hash ~node_addr:state.node_endpoint - in + let* protocol_hash = Rpc_services.get_next_protocol_hash ~node_addr in let proto_status = Parameters.protocol_status protocol_hash in let*! () = match last_known_proto with @@ -375,9 +364,8 @@ module Make_daemon (Agent : AGENT) : in match proto_status with | Active -> - let* current_baker = spawn_baker state protocol_hash in - state.current_baker <- Some current_baker ; - return_unit + let* current_baker = spawn_baker cctxt command protocol_hash in + return current_baker | Frozen -> ( let* head_stream = match head_stream with @@ -388,7 +376,7 @@ module Make_daemon (Agent : AGENT) : (proto_status, protocol_hash) in let*! () = Events.(emit waiting_for_active_protocol) () in - monitor_heads ~node_addr:state.node_endpoint + monitor_heads ~node_addr in let*! v = Lwt_stream.get head_stream in match v with @@ -422,31 +410,31 @@ module Make_daemon (Agent : AGENT) : let run ~keep_alive ~command cctxt = let open Lwt_result_syntax in - let state = - { - node_endpoint = Uri.to_string cctxt#base; - current_baker = None; - old_baker = None; - keep_alive; - command; - cctxt; - } - in () [@profiler.overwrite may_start_profiler cctxt#get_base_dir] ; - let node_addr = state.node_endpoint in + let node_addr = Uri.to_string cctxt#base in let*! () = Events.(emit starting_daemon) Agent.name in let _ccid = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> let*! () = Events.(emit stopping_daemon) Agent.name in Lwt.return_unit) in - let* () = - if state.keep_alive then + let* current_baker = + if keep_alive then retry_on_disconnection ~emit:(Events.emit Events.cannot_connect) node_addr - (fun () -> may_start_initial_baker state) - else may_start_initial_baker state + (fun () -> may_start_initial_baker cctxt command ~node_addr) + else may_start_initial_baker cctxt command ~node_addr + in + let state = + { + node_endpoint = node_addr; + current_baker; + old_baker = None; + keep_alive; + command; + cctxt; + } in let monitor_voting_periods () = let* head_stream = monitor_heads ~node_addr in -- GitLab From 3df2831ba3dd90df45233c6bcc3c7e22ee29fb49 Mon Sep 17 00:00:00 2001 From: Valentin Chaboche Date: Wed, 15 Oct 2025 16:43:33 +0200 Subject: [PATCH 5/9] Agnostic/Baker: main scheduler --- src/lib_agnostic_baker/daemon.ml | 198 ++++++++++++++++--------------- 1 file changed, 105 insertions(+), 93 deletions(-) diff --git a/src/lib_agnostic_baker/daemon.ml b/src/lib_agnostic_baker/daemon.ml index 6e2526efba4d..b61c7425b2c9 100644 --- a/src/lib_agnostic_baker/daemon.ml +++ b/src/lib_agnostic_baker/daemon.ml @@ -118,6 +118,7 @@ module Make_daemon (Agent : AGENT) : keep_alive : bool; command : command; cctxt : Tezos_client_base.Client_context.full; + head_stream : string Lwt_stream.t; } type t = state @@ -276,72 +277,47 @@ module Make_daemon (Agent : AGENT) : - Shut down an old baker it its time has come; - Spawn and "hot-swap" to a new baker if the next protocol hash is different. The voting period information is used for logging purposes. *) - let monitor_voting_periods ~state head_stream = + let monitor_voting_periods ~state = let open Lwt_result_syntax in let node_addr = state.node_endpoint in - let rec loop () = - let*! v = Lwt_stream.get head_stream in - match v with - | None -> tzfail Lost_node_connection - | Some _tick -> - let* block_hash = - (Rpc_services.get_block_hash - ~node_addr - [@profiler.record_s {verbosity = Notice} "get_block_hash"]) - in - () - [@profiler.reset_block_section - {profiler_module = Profiler} block_hash] ; - let* period_kind, remaining = - (Rpc_services.get_current_period - ~node_addr - [@profiler.record_s {verbosity = Notice} "get_current_period"]) - in - let*! () = - Events.(emit period_status) (block_hash, period_kind, remaining) - in - let* () = - (maybe_kill_old_baker - state - node_addr - [@profiler.record_s {verbosity = Notice} "maybe_kill_old_baker"]) - in - let* next_protocol_hash = - (Rpc_services.get_next_protocol_hash - ~node_addr - [@profiler.record_s {verbosity = Notice} "get_next_protocol_hash"]) - in - let current_protocol_hash = state.current_baker.protocol_hash in - let* () = - if - not (Protocol_hash.equal current_protocol_hash next_protocol_hash) - then - let* head_level = - (Rpc_services.get_level - ~node_addr - [@profiler.record_s {verbosity = Notice} "get_level"]) - in - (hot_swap_baker - ~state - ~current_protocol_hash - ~next_protocol_hash - ~level_to_kill_old_baker: - (head_level + Parameters.extra_levels_for_old_baker) - [@profiler.record_s {verbosity = Notice} "hot_swap_baker"]) - else return_unit - in - loop () + let* block_hash = + (Rpc_services.get_block_hash + ~node_addr [@profiler.record_s {verbosity = Notice} "get_block_hash"]) in - loop () - - (** [baker_thread ~state] monitors the current baker thread for any potential error, and - it propagates any error that can appear. *) - let baker_thread ~state = - let open Lwt_result_syntax in - let*! res = state.current_baker.process.thread in - match res with - | Ok () -> return_unit - | Error err -> fail (Agent_process_error Agent.name :: err) + () [@profiler.reset_block_section {profiler_module = Profiler} block_hash] ; + let* period_kind, remaining = + (Rpc_services.get_current_period + ~node_addr + [@profiler.record_s {verbosity = Notice} "get_current_period"]) + in + let*! () = + Events.(emit period_status) (block_hash, period_kind, remaining) + in + let* () = + (maybe_kill_old_baker + state + node_addr + [@profiler.record_s {verbosity = Notice} "maybe_kill_old_baker"]) + in + let* next_protocol_hash = + (Rpc_services.get_next_protocol_hash + ~node_addr + [@profiler.record_s {verbosity = Notice} "get_next_protocol_hash"]) + in + let current_protocol_hash = state.current_baker.protocol_hash in + if not (Protocol_hash.equal current_protocol_hash next_protocol_hash) then + let* head_level = + (Rpc_services.get_level + ~node_addr [@profiler.record_s {verbosity = Notice} "get_level"]) + in + (hot_swap_baker + ~state + ~current_protocol_hash + ~next_protocol_hash + ~level_to_kill_old_baker: + (head_level + Parameters.extra_levels_for_old_baker) + [@profiler.record_s {verbosity = Notice} "hot_swap_baker"]) + else return_unit (* ---- Agnostic Baker Bootstrap ---- *) @@ -408,6 +384,55 @@ module Make_daemon (Agent : AGENT) : backends | None -> () + type event = + | New_head + | Head_stream_ended + | Old_baker_stopped + | Current_baker_stopped + + let rec main_loop state = + let open Lwt_result_syntax in + let current_baker = + Lwt_result.map + (fun () -> Current_baker_stopped) + state.current_baker.process.thread + in + let old_baker = + match state.old_baker with + | Some old_baker -> + Lwt_result.map + (fun () -> Old_baker_stopped) + old_baker.baker.process.thread + | None -> Lwt_utils.never_ending () + in + let head_stream = + Lwt.map + (function None -> Ok Head_stream_ended | Some _head -> Ok New_head) + (Lwt_stream.get state.head_stream) + in + let* pick = Lwt.choose [current_baker; old_baker; head_stream] in + match pick with + | New_head -> + let* () = monitor_voting_periods ~state in + main_loop state + | Head_stream_ended -> tzfail Lost_node_connection + | Old_baker_stopped -> ( + match state.old_baker with + | None -> assert false + | Some old_baker -> + let* head_level = + (Rpc_services.get_level + ~node_addr:state.node_endpoint + [@profiler.record_s {verbosity = Notice} "get_level"]) + in + if head_level >= old_baker.level_to_kill then + (* The old baker is expired, it is expected. *) + main_loop state + else return_unit) + | Current_baker_stopped -> + (* It's not supposed to stop. *) + return_unit + let run ~keep_alive ~command cctxt = let open Lwt_result_syntax in () [@profiler.overwrite may_start_profiler cctxt#get_base_dir] ; @@ -426,36 +451,23 @@ module Make_daemon (Agent : AGENT) : (fun () -> may_start_initial_baker cctxt command ~node_addr) else may_start_initial_baker cctxt command ~node_addr in - let state = - { - node_endpoint = node_addr; - current_baker; - old_baker = None; - keep_alive; - command; - cctxt; - } - in - let monitor_voting_periods () = - let* head_stream = monitor_heads ~node_addr in - monitor_voting_periods ~state head_stream - in - (* Monitoring voting periods through heads monitoring to avoid - missing UAUs. *) - let* () = - Lwt.pick - [ - (* We do not care if --keep-alive is provided, if the baker thread doesn't - have the argument it'll abort the process anyway. *) - retry_on_disconnection - ~emit:(fun _ -> Lwt.return_unit) - node_addr - monitor_voting_periods; - baker_thread ~state; - ] - in - let*! () = Lwt_utils.never_ending () in - return_unit + retry_on_disconnection + ~emit:(fun _ -> Lwt.return_unit) + node_addr + (fun () -> + let* head_stream = monitor_heads ~node_addr in + let state = + { + node_endpoint = node_addr; + current_baker; + old_baker = None; + keep_alive; + command; + cctxt; + head_stream; + } + in + main_loop state) end module Baker : AGNOSTIC_DAEMON with type command = Baker_agent.command = -- GitLab From 0df3d8731a19afd3ed522ac65cc4a939e1bbf754 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Mon, 27 Oct 2025 14:59:39 +0100 Subject: [PATCH 6/9] Agnostic_baker: use a state and remove mutable from daemon main loop --- src/lib_agnostic_baker/daemon.ml | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/lib_agnostic_baker/daemon.ml b/src/lib_agnostic_baker/daemon.ml index b61c7425b2c9..8ea4841e7248 100644 --- a/src/lib_agnostic_baker/daemon.ml +++ b/src/lib_agnostic_baker/daemon.ml @@ -113,8 +113,8 @@ module Make_daemon (Agent : AGENT) : type state = { node_endpoint : string; - mutable current_baker : baker; - mutable old_baker : baker_to_kill option; + current_baker : baker; + old_baker : baker_to_kill option; keep_alive : bool; command : command; cctxt : Tezos_client_base.Client_context.full; @@ -213,12 +213,12 @@ module Make_daemon (Agent : AGENT) : Events.(emit become_old_agent) (Agent.name, current_protocol_hash, level_to_kill_old_baker) in - state.old_baker <- + let old_baker = Some - {baker = state.current_baker; level_to_kill = level_to_kill_old_baker} ; + {baker = state.current_baker; level_to_kill = level_to_kill_old_baker} + in let* new_baker = spawn_baker state.cctxt state.command next_protocol_hash in - state.current_baker <- new_baker ; - return_unit + return {state with old_baker; current_baker = new_baker} (** [maybe_kill_old_baker state node_addr] checks whether the [old_baker] process from the [state] of the agnostic baker has surpassed its lifetime and it stops @@ -226,7 +226,7 @@ module Make_daemon (Agent : AGENT) : let maybe_kill_old_baker state node_addr = let open Lwt_result_syntax in match state.old_baker with - | None -> return_unit + | None -> return state | Some {baker; level_to_kill} -> let* head_level = (Rpc_services.get_level @@ -239,9 +239,8 @@ module Make_daemon (Agent : AGENT) : Lwt.wakeup baker.process.canceller (Ok ()) [@profiler.record_f {verbosity = Notice} "kill old baker"] ; - state.old_baker <- None ; - return_unit) - else return_unit + return {state with old_baker = None}) + else return state (* ---- Baker and Chain Monitoring ---- *) @@ -293,7 +292,7 @@ module Make_daemon (Agent : AGENT) : let*! () = Events.(emit period_status) (block_hash, period_kind, remaining) in - let* () = + let* state = (maybe_kill_old_baker state node_addr @@ -317,7 +316,7 @@ module Make_daemon (Agent : AGENT) : ~level_to_kill_old_baker: (head_level + Parameters.extra_levels_for_old_baker) [@profiler.record_s {verbosity = Notice} "hot_swap_baker"]) - else return_unit + else return state (* ---- Agnostic Baker Bootstrap ---- *) @@ -413,7 +412,7 @@ module Make_daemon (Agent : AGENT) : let* pick = Lwt.choose [current_baker; old_baker; head_stream] in match pick with | New_head -> - let* () = monitor_voting_periods ~state in + let* state = monitor_voting_periods ~state in main_loop state | Head_stream_ended -> tzfail Lost_node_connection | Old_baker_stopped -> ( -- GitLab From f9941f82a70ce0b9f27b40af79511c3e61ea369f Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Mon, 27 Oct 2025 15:09:01 +0100 Subject: [PATCH 7/9] Agnostic_baker: rework retry_on_disconnection --- src/lib_agnostic_baker/daemon.ml | 38 +++++++++++++++++--------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/src/lib_agnostic_baker/daemon.ml b/src/lib_agnostic_baker/daemon.ml index 8ea4841e7248..186454538be3 100644 --- a/src/lib_agnostic_baker/daemon.ml +++ b/src/lib_agnostic_baker/daemon.ml @@ -118,16 +118,16 @@ module Make_daemon (Agent : AGENT) : keep_alive : bool; command : command; cctxt : Tezos_client_base.Client_context.full; - head_stream : string Lwt_stream.t; + head_stream : string Lwt_stream.t option; } type t = state (* ---- Baker Process Management ---- *) - let rec retry_on_disconnection ~emit node_addr f = + let rec retry_on_disconnection ~emit node_addr f arg = let open Lwt_result_syntax in - let*! result = f () in + let*! result = f arg in match result with | Ok res -> return res | Error (Lost_node_connection :: _ | Cannot_connect_to_node _ :: _) -> @@ -141,7 +141,7 @@ module Make_daemon (Agent : AGENT) : (fun node_addr -> Rpc_services.get_level ~node_addr) node_addr in - retry_on_disconnection ~emit node_addr f + retry_on_disconnection ~emit node_addr f arg | Error trace -> fail trace (** [run_thread ~protocol_hash ~cancel_promise ~init_sapling_params cctxt @@ -407,7 +407,7 @@ module Make_daemon (Agent : AGENT) : let head_stream = Lwt.map (function None -> Ok Head_stream_ended | Some _head -> Ok New_head) - (Lwt_stream.get state.head_stream) + (Lwt_stream.get head_stream) in let* pick = Lwt.choose [current_baker; old_baker; head_stream] in match pick with @@ -448,25 +448,27 @@ module Make_daemon (Agent : AGENT) : ~emit:(Events.emit Events.cannot_connect) node_addr (fun () -> may_start_initial_baker cctxt command ~node_addr) + () else may_start_initial_baker cctxt command ~node_addr in + let state = + { + node_endpoint = node_addr; + current_baker; + old_baker = None; + keep_alive; + command; + cctxt; + head_stream = None; + } + in retry_on_disconnection ~emit:(fun _ -> Lwt.return_unit) node_addr - (fun () -> + (fun state -> let* head_stream = monitor_heads ~node_addr in - let state = - { - node_endpoint = node_addr; - current_baker; - old_baker = None; - keep_alive; - command; - cctxt; - head_stream; - } - in - main_loop state) + main_loop {state with head_stream = Some head_stream}) + state end module Baker : AGNOSTIC_DAEMON with type command = Baker_agent.command = -- GitLab From 885e54806f8299fb5f204cfca5f5697bc89dd001 Mon Sep 17 00:00:00 2001 From: Mathias Bourgoin Date: Tue, 28 Oct 2025 10:56:28 +0100 Subject: [PATCH 8/9] tezt: add baker reconnect migration test --- tezt/tests/agnostic_baker_test.ml | 140 +++++++++++++++++++++++++++++- 1 file changed, 136 insertions(+), 4 deletions(-) diff --git a/tezt/tests/agnostic_baker_test.ml b/tezt/tests/agnostic_baker_test.ml index 8a2731b6c761..b56f6433c1f9 100644 --- a/tezt/tests/agnostic_baker_test.ml +++ b/tezt/tests/agnostic_baker_test.ml @@ -33,6 +33,15 @@ let wait_for_become_old_baker agnostic_baker = Agnostic_baker.wait_for agnostic_baker "become_old_agent.v0" (fun _json -> Some ()) +let wait_for_become_old_accuser accuser = + Lwt.pick + [ + (let* () = Lwt_unix.sleep 60. in + Test.fail + "[wait_for_become_old_baker] has waited for 60 seconds, exiting"); + Accuser.wait_for accuser "become_old_agent.v0" (fun _json -> Some ()); + ] + let wait_for_stopping_baker agnostic_baker = Agnostic_baker.wait_for agnostic_baker "stopping_agent.v0" (fun _json -> Some ()) @@ -51,8 +60,9 @@ let remote_sign client = (* Performs a protocol migration thanks to a UAU and the agnostic baker. *) let perform_protocol_migration ?node_name ?client_name ?parameter_file - ?(use_remote_signer = false) ~blocks_per_cycle ~migration_level - ~migrate_from ~migrate_to ~baked_blocks_after_migration () = + ?(use_remote_signer = false) ?(extra_run_arguments = []) ?on_new_baker_ready + ~blocks_per_cycle ~migration_level ~migrate_from ~migrate_to + ~baked_blocks_after_migration () = assert (migration_level >= blocks_per_cycle) ; Log.info "Node starting" ; let* client, node = @@ -66,11 +76,14 @@ let perform_protocol_migration ?node_name ?client_name ?parameter_file let* () = if use_remote_signer then remote_sign client else unit in Log.info "Node %s initialized" (Node.name node) ; let baker = Agnostic_baker.create node client in + let accuser = Accuser.create node in let wait_for_active_protocol_waiting = wait_for_active_protocol_waiting baker in Log.info "Starting agnostic baker" ; - let* () = Agnostic_baker.run baker in + let* () = Agnostic_baker.run ~extra_arguments:extra_run_arguments baker in + Log.info "Starting agnostic accuser" ; + let* () = Accuser.run accuser in let* () = wait_for_active_protocol_waiting in let* () = Agnostic_baker.wait_for_ready baker in let* () = @@ -84,10 +97,17 @@ let perform_protocol_migration ?node_name ?client_name ?parameter_file Log.info "Baking at least %d blocks to trigger migration" migration_level ; let* _level = Node.wait_for_level node migration_level in let wait_for_become_old_baker = wait_for_become_old_baker baker in + let wait_for_become_old_accuser = wait_for_become_old_accuser accuser in Log.info "Check that the baking process for %s is not killed" (Protocol.tag migrate_from) ; let* () = wait_for_become_old_baker in + let* () = wait_for_become_old_accuser in + let* () = + match on_new_baker_ready with + | None -> Lwt.return_unit + | Some f -> f ~node ~client ~baker + in (* Ensure that the block before migration is consistent *) Log.info "Checking migration block consistency" ; let* () = @@ -150,7 +170,8 @@ let migrate ~migrate_from ~migrate_to ~use_remote_signer = Protocol.tag migrate_from; Protocol.tag migrate_to; ] - ~uses:([Constant.octez_agnostic_baker] @ remote_signer) + ~uses: + ([Constant.octez_agnostic_baker; Constant.octez_accuser] @ remote_signer) @@ fun () -> let blocks_per_cycle = JSON.(get "blocks_per_cycle" parameters |> as_int) in let consensus_rights_delay = @@ -171,6 +192,116 @@ let migrate ~migrate_from ~migrate_to ~use_remote_signer = in unit +let reconnect_after_migration ~migrate_from ~migrate_to = + let parameters = JSON.parse_file (Protocol.parameter_file migrate_to) in + Test.register + ~__FILE__ + ~title: + (Format.asprintf + "Protocol migration reconnect from %s to %s" + (Protocol.tag migrate_from) + (Protocol.tag migrate_to)) + ~tags: + [ + "protocol"; + "migration"; + "agnostic"; + "baker"; + "reconnect"; + Protocol.tag migrate_from; + Protocol.tag migrate_to; + "flaky"; + ] + ~uses:[Constant.octez_agnostic_baker] + @@ fun () -> + let blocks_per_cycle = JSON.(get "blocks_per_cycle" parameters |> as_int) in + let consensus_rights_delay = + JSON.(get "consensus_rights_delay" parameters |> as_int) + in + let baked_blocks_after_migration = + 2 * consensus_rights_delay * blocks_per_cycle + in + let migration_level = blocks_per_cycle in + + Log.info "Node starting" ; + let* client, node = + Protocol_migration.user_migratable_node_init ~migration_level ~migrate_to () + in + Log.info "Node %s initialized" (Node.name node) ; + let baker = Agnostic_baker.create node client in + + let wait_for_active_protocol_waiting = + wait_for_active_protocol_waiting baker + in + let wait_for_ready () = Agnostic_baker.wait_for_ready baker in + let wait_for_period_status () = + Agnostic_baker.wait_for baker "period_status.v0" (fun _ -> Some ()) + in + let wait_for_become_old_baker = wait_for_become_old_baker baker in + let wait_for_stopping_baker = wait_for_stopping_baker baker in + + let timeout message seconds = + let* () = Lwt_unix.sleep seconds in + Test.fail message + in + + Log.info "Starting agnostic baker" ; + let* () = Agnostic_baker.run ~extra_arguments:["--keep-alive"] baker in + let* () = wait_for_active_protocol_waiting in + let* () = wait_for_ready () in + let* () = + Client.activate_protocol ~protocol:migrate_from client ~timestamp:Client.Now + in + Log.info "Protocol %s activated" (Protocol.hash migrate_from) ; + Log.info "Baking at least %d blocks to trigger migration" migration_level ; + let* _level = Node.wait_for_level node migration_level in + Log.info + "Check that the baking process for %s is not killed" + (Protocol.tag migrate_from) ; + let* () = wait_for_become_old_baker in + + (* Prepare watchers for the restart before it happens. *) + let wait_ready_after_restart = wait_for_ready () in + let wait_period_status_after_restart = wait_for_period_status () in + + Log.info "Restarting node to check baker reconnection" ; + let* () = Node.terminate node in + let* () = Lwt_unix.sleep 1. in + let* () = Node.run node [] in + let* () = Node.wait_for_ready node in + let* () = + Lwt.pick + [ + (let* () = wait_ready_after_restart in + Log.info "Agnostic baker reported ready after node restart" ; + Lwt.return_unit); + timeout "Baker did not report ready after node restart" 60.; + ] + in + let* () = + Lwt.pick + [ + (let* () = wait_period_status_after_restart in + Log.info "Agnostic baker emitted period_status after node restart" ; + Lwt.return_unit); + timeout "Baker did not emit period_status after node restart" 120.; + ] + in + + Log.info + "Check that baker for protocol %s is killed after %d levels" + (Protocol.tag migrate_from) + Agnostic_baker.extra_levels_for_old_baker ; + let kill_level = + migration_level + Agnostic_baker.extra_levels_for_old_baker + in + let* _level = Node.wait_for_level node kill_level in + let* () = wait_for_stopping_baker in + + let* _level = Node.wait_for_level node baked_blocks_after_migration in + let* () = Agnostic_baker.terminate baker in + unit + let test_start_and_stop () = Test.register ~__FILE__ @@ -315,6 +446,7 @@ let register ~protocols = let register_migration ~migrate_from ~migrate_to = migrate ~migrate_from ~migrate_to ~use_remote_signer:false ; + reconnect_after_migration ~migrate_from ~migrate_to ; migrate ~migrate_from ~migrate_to ~use_remote_signer:true let register_protocol_independent () = -- GitLab From c41660c5108398b9bd2e1f32333566d2d81e57c2 Mon Sep 17 00:00:00 2001 From: Mathias Bourgoin Date: Tue, 28 Oct 2025 10:52:59 +0100 Subject: [PATCH 9/9] agnostic_baker: keep scheduler state on reconnect --- src/lib_agnostic_baker/daemon.ml | 59 ++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/src/lib_agnostic_baker/daemon.ml b/src/lib_agnostic_baker/daemon.ml index 186454538be3..d436594cb0a5 100644 --- a/src/lib_agnostic_baker/daemon.ml +++ b/src/lib_agnostic_baker/daemon.ml @@ -118,7 +118,7 @@ module Make_daemon (Agent : AGENT) : keep_alive : bool; command : command; cctxt : Tezos_client_base.Client_context.full; - head_stream : string Lwt_stream.t option; + head_stream : string Lwt_stream.t; } type t = state @@ -271,8 +271,8 @@ module Make_daemon (Agent : AGENT) : ignore (loop () : unit Lwt.t) ; return stream - (** [monitor_voting_periods ~state head_stream] continuously monitors [heads_stream] - to detect protocol changes. It will: + (** [monitor_voting_periods ~state] continuously monitors chain data to detect + protocol changes. It will: - Shut down an old baker it its time has come; - Spawn and "hot-swap" to a new baker if the next protocol hash is different. The voting period information is used for logging purposes. *) @@ -389,7 +389,7 @@ module Make_daemon (Agent : AGENT) : | Old_baker_stopped | Current_baker_stopped - let rec main_loop state = + let main_iteration state = let open Lwt_result_syntax in let current_baker = Lwt_result.map @@ -402,19 +402,24 @@ module Make_daemon (Agent : AGENT) : Lwt_result.map (fun () -> Old_baker_stopped) old_baker.baker.process.thread - | None -> Lwt_utils.never_ending () + | None -> + (* If there is no [old_baker], this promise is not expected to resolve + anytime. *) + Lwt_utils.never_ending () in let head_stream = Lwt.map (function None -> Ok Head_stream_ended | Some _head -> Ok New_head) - (Lwt_stream.get head_stream) + (Lwt_stream.get state.head_stream) in let* pick = Lwt.choose [current_baker; old_baker; head_stream] in match pick with | New_head -> let* state = monitor_voting_periods ~state in - main_loop state - | Head_stream_ended -> tzfail Lost_node_connection + return (Some state) + | Head_stream_ended -> + let* head_stream = monitor_heads ~node_addr:state.node_endpoint in + return (Some {state with head_stream}) | Old_baker_stopped -> ( match state.old_baker with | None -> assert false @@ -426,11 +431,24 @@ module Make_daemon (Agent : AGENT) : in if head_level >= old_baker.level_to_kill then (* The old baker is expired, it is expected. *) - main_loop state - else return_unit) + return (Some {state with old_baker = None}) + else failwith "Old baker was killed unexpectedly") | Current_baker_stopped -> (* It's not supposed to stop. *) - return_unit + failwith "Current baker stopped unexpectedly" + + let rec main_loop state = + let open Lwt_result_syntax in + let* result = + retry_on_disconnection + ~emit:(fun _ -> Lwt.return_unit) + state.node_endpoint + main_iteration + state + in + match result with + | None -> return_unit + | Some next_state -> main_loop next_state let run ~keep_alive ~command cctxt = let open Lwt_result_syntax in @@ -451,6 +469,15 @@ module Make_daemon (Agent : AGENT) : () else may_start_initial_baker cctxt command ~node_addr in + let* head_stream = + (* Useful if the baker is started before the node or restarted while the + node is down. *) + retry_on_disconnection + ~emit:(fun _ -> Lwt.return_unit) + node_addr + (fun node_addr -> monitor_heads ~node_addr) + node_addr + in let state = { node_endpoint = node_addr; @@ -459,16 +486,10 @@ module Make_daemon (Agent : AGENT) : keep_alive; command; cctxt; - head_stream = None; + head_stream; } in - retry_on_disconnection - ~emit:(fun _ -> Lwt.return_unit) - node_addr - (fun state -> - let* head_stream = monitor_heads ~node_addr in - main_loop {state with head_stream = Some head_stream}) - state + main_loop state end module Baker : AGNOSTIC_DAEMON with type command = Baker_agent.command = -- GitLab