From 0e71172726a29fdce7dabd755a3e09feb6632309 Mon Sep 17 00:00:00 2001 From: Valentin Chaboche Date: Mon, 29 Sep 2025 15:41:37 +0200 Subject: [PATCH 1/9] Baker/Tezt: test with no LB vote --- tezt/tests/liquidity_baking_per_block_votes.ml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tezt/tests/liquidity_baking_per_block_votes.ml b/tezt/tests/liquidity_baking_per_block_votes.ml index ebf615d96263..4eb11b9a3add 100644 --- a/tezt/tests/liquidity_baking_per_block_votes.ml +++ b/tezt/tests/liquidity_baking_per_block_votes.ml @@ -175,6 +175,18 @@ let test_all_per_block_votes = let* baker = Agnostic_baker.init node client in + Log.info "Test no voting file" ; + + let* baker = + Lwt.catch + (fun () -> + let* _ = run_vote_file baker in + Test.fail + "The baker was supposed to fail, it's missing the liquidity baking \ + vote.") + (fun _exn -> return baker) + in + Log.info "Test [off] vote file" ; let* baker = let votefile = Agnostic_baker.liquidity_baking_votefile Off in -- GitLab From c9b011460e04b493a79e307625a8e7698c667de1 Mon Sep 17 00:00:00 2001 From: Valentin Chaboche Date: Tue, 30 Sep 2025 11:38:05 +0200 Subject: [PATCH 2/9] Baker/Tezt: regression CLI test --- tezt/lib_tezos/agnostic_baker.ml | 29 +++++++++++++++++--- tezt/lib_tezos/agnostic_baker.mli | 9 +++++++ tezt/tests/agnostic_baker_test.ml | 44 ++++++++++++++++++++++++++++++- 3 files changed, 77 insertions(+), 5 deletions(-) diff --git a/tezt/lib_tezos/agnostic_baker.ml b/tezt/lib_tezos/agnostic_baker.ml index 564e254c99e8..1fb5227d7b9c 100644 --- a/tezt/lib_tezos/agnostic_baker.ml +++ b/tezt/lib_tezos/agnostic_baker.ml @@ -266,8 +266,8 @@ let run_args agnostic_baker = @ dal_node_timeout_percentage @ state_recorder @ node_version_check_bypass @ node_version_allowed @ keep_alive @ allow_signing_delay -let run ?env ?event_level ?event_sections_levels ?(extra_arguments = []) - (agnostic_baker : t) = +let raw ?env ?event_level ?event_sections_levels ~arguments (agnostic_baker : t) + = (match agnostic_baker.status with | Not_running -> () | Running _ -> @@ -281,17 +281,38 @@ let run ?env ?event_level ?event_sections_levels ?(extra_arguments = []) Log.info "Starting baker %s with args: %s" agnostic_baker.name - (String.concat " " (run_args agnostic_baker @ extra_arguments)) ; + (String.concat " " arguments) ; run ?env ?event_level ?event_sections_levels agnostic_baker {ready = false} - (run_args agnostic_baker @ extra_arguments) + arguments ~on_terminate ?runner:agnostic_baker.persistent_state.runner +let run ?env ?event_level ?event_sections_levels ?(extra_arguments = []) + (agnostic_baker : t) = + raw + ?env + ?event_level + ?event_sections_levels + ~arguments:(run_args agnostic_baker @ extra_arguments) + agnostic_baker + +let raw ~arguments (agnostic_baker : t) = raw agnostic_baker ~arguments + +let spawn_raw ~arguments (agnostic_baker : t) = + (match agnostic_baker.status with + | Not_running -> () + | Running _ -> + Test.fail "agnostic_baker %s is already running" agnostic_baker.name) ; + Process.spawn + ?runner:agnostic_baker.persistent_state.runner + agnostic_baker.path + arguments + let spawn_run ?env ?(extra_arguments = []) (agnostic_baker : t) = (match agnostic_baker.status with | Not_running -> () diff --git a/tezt/lib_tezos/agnostic_baker.mli b/tezt/lib_tezos/agnostic_baker.mli index 103e3e12d641..a401f5a36c92 100644 --- a/tezt/lib_tezos/agnostic_baker.mli +++ b/tezt/lib_tezos/agnostic_baker.mli @@ -63,6 +63,15 @@ val run : val spawn_run : ?env:string String_map.t -> ?extra_arguments:string list -> t -> Process.t +(** Spawn [octez-baker]. + + Similar to {!run} but takes all its arguments from [arguments]. *) +val raw : arguments:string list -> t -> unit Lwt.t + +(** Spawn [octez-baker] similarly to {!spawn_run} but doesn't have any arguments + other than [arguments]. *) +val spawn_raw : arguments:string list -> t -> Process.t + (** Liquidity baking vote values. *) type liquidity_baking_vote = Off | On | Pass diff --git a/tezt/tests/agnostic_baker_test.ml b/tezt/tests/agnostic_baker_test.ml index f7394e242614..38a2fca09467 100644 --- a/tezt/tests/agnostic_baker_test.ml +++ b/tezt/tests/agnostic_baker_test.ml @@ -290,7 +290,49 @@ let test_keep_alive = and* () = wait_period_status in unit -let register ~protocols = test_keep_alive protocols +let test_cli = + Protocol.register_test + ~__FILE__ + ~title:"Agnostic baker CLI" + ~tags:[team; "sandbox"; "agnostic"; "baker"; "cli"] + ~uses:(fun _protocol -> [Constant.octez_agnostic_baker]) + @@ fun protocol -> + let* node, client = Client.init_with_protocol `Client ~protocol () in + let baker = Agnostic_baker.create node client in + let check_process_error expected_err p = + let* err = Process.check_and_read_stderr ~expect_failure:true p in + Check.(err =~ rex expected_err) ~error_msg:"Expected error %R but got %L" ; + unit + in + + let arguments = + [ + "--endpoint"; + Node.rpc_endpoint node; + "--base-dir"; + Client.base_dir client; + "run"; + "with"; + "local"; + "node"; + Node.data_dir node; + ] + in + let p = Agnostic_baker.spawn_raw ~arguments baker in + let* () = check_process_error ".*Missing liquidity baking toggle vote.*" p in + + let arguments = arguments @ ["--liquidity-baking-toggle-vote"; "pass"] in + let p = Agnostic_baker.spawn_raw ~arguments baker in + let* () = check_process_error "Please connect a running DAL node using" p in + + let arguments = arguments @ ["--without-dal"] in + let* () = Agnostic_baker.wait_for_ready baker + and* () = Agnostic_baker.raw ~arguments baker in + unit + +let register ~protocols = + test_keep_alive protocols ; + test_cli protocols let register_migration ~migrate_from ~migrate_to = migrate ~migrate_from ~migrate_to ~use_remote_signer:false ; -- GitLab From 21e6e99a411b8cb1585e6a7e5cfac67254a6aed0 Mon Sep 17 00:00:00 2001 From: Valentin Chaboche Date: Wed, 15 Oct 2025 14:03:46 +0200 Subject: [PATCH 3/9] Revert "Accuser: Move next_protocol filtering to worker_loop" This reverts commit 1901fcc22e0f24630ab1d7d26f0368df32d3f980. --- .../lib_delegate/client_baking_blocks.ml | 4 +- .../lib_delegate/client_baking_blocks.mli | 1 + .../client_baking_denunciation.ml | 75 +++++++++---------- .../lib_delegate/client_daemon.ml | 6 +- .../lib_delegate/delegate_events.ml | 8 -- .../lib_delegate/client_baking_blocks.ml | 4 +- .../lib_delegate/client_baking_blocks.mli | 1 + .../client_baking_denunciation.ml | 75 +++++++++---------- .../lib_delegate/client_daemon.ml | 6 +- .../lib_delegate/delegate_events.ml | 8 -- .../lib_delegate/client_baking_blocks.ml | 4 +- .../lib_delegate/client_baking_blocks.mli | 1 + .../client_baking_denunciation.ml | 75 +++++++++---------- src/proto_alpha/lib_delegate/client_daemon.ml | 6 +- .../lib_delegate/delegate_events.ml | 8 -- tezt/tests/double_consensus.ml | 60 --------------- tezt/tests/main.ml | 2 - 17 files changed, 135 insertions(+), 209 deletions(-) diff --git a/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.ml b/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.ml index 35d43f6d97b7..18f7dd7efc61 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.ml @@ -152,10 +152,10 @@ module Block_seen_event = struct module Event = Internal_event.Make (Definition) end -let monitor_applied_blocks cctxt ?chains ?protocols () = +let monitor_applied_blocks cctxt ?chains ?protocols ~next_protocols () = let open Lwt_result_syntax in let* block_stream, stop = - Monitor_services.applied_blocks cctxt ?chains ?protocols () + Monitor_services.applied_blocks cctxt ?chains ?protocols ?next_protocols () in return ( Lwt_stream.map_s diff --git a/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.mli b/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.mli index d9c50296db14..8426c19b8994 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/client_baking_blocks.mli @@ -49,6 +49,7 @@ val monitor_applied_blocks : #Protocol_client_context.rpc_context -> ?chains:Chain_services.chain list -> ?protocols:Protocol_hash.t list -> + next_protocols:Protocol_hash.t list option -> unit -> (block_info tzresult Lwt_stream.t * Tezos_rpc.Context.stopper) tzresult Lwt.t diff --git a/src/proto_023_PtSeouLo/lib_delegate/client_baking_denunciation.ml b/src/proto_023_PtSeouLo/lib_delegate/client_baking_denunciation.ml index d73cc45b9424..3071abb73838 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/client_baking_denunciation.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/client_baking_denunciation.ml @@ -494,38 +494,42 @@ let cleanup_old_operations state = - Check that every baker (pre)attested only once at the block's level and round *) let process_new_block (cctxt : #Protocol_client_context.full) state - {hash; chain_id; level; _} = + {hash; chain_id; level; protocol; next_protocol; _} = let open Lwt_result_syntax in - let*! () = Events.(emit accuser_saw_block) (level, hash) in - let chain = `Hash chain_id in - let block = `Hash (hash, 0) in - state.highest_level_encountered <- - Raw_level.max level state.highest_level_encountered ; - (* Processing blocks *) - let* () = - let*! block_info = Alpha_block_services.info cctxt ~chain ~block () in - match block_info with - | Ok block_info -> ( - let* () = process_block cctxt state block_info in - (* Processing (pre)attestations in the block *) - match block_info.operations with - | consensus_ops :: _ -> - let packed_op {Alpha_block_services.shell; protocol_data; _} = - {shell; protocol_data} - in - process_operations cctxt state consensus_ops ~packed_op chain_id - | _ -> - (* Should not happen as a block should contain 4 lists of - operations, the first list being dedicated to consensus - operations. *) - let*! () = Events.(emit fetch_operations_error hash) in - return_unit) - | Error errs -> - let*! () = Events.(emit accuser_block_error) (hash, errs) in - return_unit - in - cleanup_old_operations state ; - return_unit + if Protocol_hash.(protocol <> next_protocol) then + let*! () = Events.(emit protocol_change_detected) () in + return_unit + else + let*! () = Events.(emit accuser_saw_block) (level, hash) in + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + state.highest_level_encountered <- + Raw_level.max level state.highest_level_encountered ; + (* Processing blocks *) + let* () = + let*! block_info = Alpha_block_services.info cctxt ~chain ~block () in + match block_info with + | Ok block_info -> ( + let* () = process_block cctxt state block_info in + (* Processing (pre)attestations in the block *) + match block_info.operations with + | consensus_ops :: _ -> + let packed_op {Alpha_block_services.shell; protocol_data; _} = + {shell; protocol_data} + in + process_operations cctxt state consensus_ops ~packed_op chain_id + | _ -> + (* Should not happen as a block should contain 4 lists of + operations, the first list being dedicated to consensus + operations. *) + let*! () = Events.(emit fetch_operations_error hash) in + return_unit) + | Error errs -> + let*! () = Events.(emit accuser_block_error) (hash, errs) in + return_unit + in + cleanup_old_operations state ; + return_unit let process_new_block cctxt state bi = let open Lwt_syntax in @@ -618,13 +622,8 @@ let create (cctxt : #Protocol_client_context.full) ?canceler ~preserved_levels tzfail Baking_errors.Node_connection_lost | `Block (Some (Ok bi)) -> last_get_block := None ; - (* When protocol changes, we stop current accuser process. *) - if Protocol_hash.(bi.next_protocol <> Protocol.hash) then - let*! () = B_Events.(emit daemon_stop) name in - return_unit - else - let*! () = process_new_block cctxt state bi in - worker_loop () + let*! () = process_new_block cctxt state bi in + worker_loop () | `Operations None -> (* restart a new operations monitor stream *) last_get_ops := None ; diff --git a/src/proto_023_PtSeouLo/lib_delegate/client_daemon.ml b/src/proto_023_PtSeouLo/lib_delegate/client_daemon.ml index a22f5d631d2c..ac1438c06f91 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/client_daemon.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/client_daemon.ml @@ -176,7 +176,11 @@ module Accuser = struct Protocol.hash in let* valid_blocks_stream, _ = - Client_baking_blocks.monitor_applied_blocks cctxt ~chains:[chain] () + Client_baking_blocks.monitor_applied_blocks + ~next_protocols:(Some [Protocol.hash]) + cctxt + ~chains:[chain] + () in let canceler = Lwt_canceler.create () in let _ = diff --git a/src/proto_023_PtSeouLo/lib_delegate/delegate_events.ml b/src/proto_023_PtSeouLo/lib_delegate/delegate_events.ml index dde15e9acbcb..ff89e553760a 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/delegate_events.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/delegate_events.ml @@ -275,12 +275,4 @@ module Baking_scheduling = struct ~name:"daemon_start" ~msg:"starting {worker} daemon" ("worker", Data_encoding.string) - - let daemon_stop = - declare_1 - ~section - ~level:Notice - ~name:"daemon_stop" - ~msg:"stopping {worker} daemon" - ("worker", Data_encoding.string) end diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/client_baking_blocks.ml b/src/proto_024_PsD5wVTJ/lib_delegate/client_baking_blocks.ml index 35d43f6d97b7..18f7dd7efc61 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/client_baking_blocks.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/client_baking_blocks.ml @@ -152,10 +152,10 @@ module Block_seen_event = struct module Event = Internal_event.Make (Definition) end -let monitor_applied_blocks cctxt ?chains ?protocols () = +let monitor_applied_blocks cctxt ?chains ?protocols ~next_protocols () = let open Lwt_result_syntax in let* block_stream, stop = - Monitor_services.applied_blocks cctxt ?chains ?protocols () + Monitor_services.applied_blocks cctxt ?chains ?protocols ?next_protocols () in return ( Lwt_stream.map_s diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/client_baking_blocks.mli b/src/proto_024_PsD5wVTJ/lib_delegate/client_baking_blocks.mli index d9c50296db14..8426c19b8994 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/client_baking_blocks.mli +++ b/src/proto_024_PsD5wVTJ/lib_delegate/client_baking_blocks.mli @@ -49,6 +49,7 @@ val monitor_applied_blocks : #Protocol_client_context.rpc_context -> ?chains:Chain_services.chain list -> ?protocols:Protocol_hash.t list -> + next_protocols:Protocol_hash.t list option -> unit -> (block_info tzresult Lwt_stream.t * Tezos_rpc.Context.stopper) tzresult Lwt.t diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/client_baking_denunciation.ml b/src/proto_024_PsD5wVTJ/lib_delegate/client_baking_denunciation.ml index b4db64cd265a..6a373fba9b46 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/client_baking_denunciation.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/client_baking_denunciation.ml @@ -496,38 +496,42 @@ let cleanup_old_operations state = - Check that every baker (pre)attested only once at the block's level and round *) let process_new_block (cctxt : #Protocol_client_context.full) state - {hash; chain_id; level; _} = + {hash; chain_id; level; protocol; next_protocol; _} = let open Lwt_result_syntax in - let*! () = Events.(emit accuser_saw_block) (level, hash) in - let chain = `Hash chain_id in - let block = `Hash (hash, 0) in - state.highest_level_encountered <- - Raw_level.max level state.highest_level_encountered ; - (* Processing blocks *) - let* () = - let*! block_info = Alpha_block_services.info cctxt ~chain ~block () in - match block_info with - | Ok block_info -> ( - let* () = process_block cctxt state block_info in - (* Processing (pre)attestations in the block *) - match block_info.operations with - | consensus_ops :: _ -> - let packed_op {Alpha_block_services.shell; protocol_data; _} = - {shell; protocol_data} - in - process_operations cctxt state consensus_ops ~packed_op chain_id - | _ -> - (* Should not happen as a block should contain 4 lists of - operations, the first list being dedicated to consensus - operations. *) - let*! () = Events.(emit fetch_operations_error hash) in - return_unit) - | Error errs -> - let*! () = Events.(emit accuser_block_error) (hash, errs) in - return_unit - in - cleanup_old_operations state ; - return_unit + if Protocol_hash.(protocol <> next_protocol) then + let*! () = Events.(emit protocol_change_detected) () in + return_unit + else + let*! () = Events.(emit accuser_saw_block) (level, hash) in + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + state.highest_level_encountered <- + Raw_level.max level state.highest_level_encountered ; + (* Processing blocks *) + let* () = + let*! block_info = Alpha_block_services.info cctxt ~chain ~block () in + match block_info with + | Ok block_info -> ( + let* () = process_block cctxt state block_info in + (* Processing (pre)attestations in the block *) + match block_info.operations with + | consensus_ops :: _ -> + let packed_op {Alpha_block_services.shell; protocol_data; _} = + {shell; protocol_data} + in + process_operations cctxt state consensus_ops ~packed_op chain_id + | _ -> + (* Should not happen as a block should contain 4 lists of + operations, the first list being dedicated to consensus + operations. *) + let*! () = Events.(emit fetch_operations_error hash) in + return_unit) + | Error errs -> + let*! () = Events.(emit accuser_block_error) (hash, errs) in + return_unit + in + cleanup_old_operations state ; + return_unit let process_new_block cctxt state bi = let open Lwt_syntax in @@ -620,13 +624,8 @@ let create (cctxt : #Protocol_client_context.full) ?canceler ~preserved_levels tzfail Baking_errors.Node_connection_lost | `Block (Some (Ok bi)) -> last_get_block := None ; - (* When protocol changes, we stop current accuser process. *) - if Protocol_hash.(bi.next_protocol <> Protocol.hash) then - let*! () = B_Events.(emit daemon_stop) name in - return_unit - else - let*! () = process_new_block cctxt state bi in - worker_loop () + let*! () = process_new_block cctxt state bi in + worker_loop () | `Operations None -> (* restart a new operations monitor stream *) last_get_ops := None ; diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/client_daemon.ml b/src/proto_024_PsD5wVTJ/lib_delegate/client_daemon.ml index a22f5d631d2c..ac1438c06f91 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/client_daemon.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/client_daemon.ml @@ -176,7 +176,11 @@ module Accuser = struct Protocol.hash in let* valid_blocks_stream, _ = - Client_baking_blocks.monitor_applied_blocks cctxt ~chains:[chain] () + Client_baking_blocks.monitor_applied_blocks + ~next_protocols:(Some [Protocol.hash]) + cctxt + ~chains:[chain] + () in let canceler = Lwt_canceler.create () in let _ = diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/delegate_events.ml b/src/proto_024_PsD5wVTJ/lib_delegate/delegate_events.ml index dde15e9acbcb..ff89e553760a 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/delegate_events.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/delegate_events.ml @@ -275,12 +275,4 @@ module Baking_scheduling = struct ~name:"daemon_start" ~msg:"starting {worker} daemon" ("worker", Data_encoding.string) - - let daemon_stop = - declare_1 - ~section - ~level:Notice - ~name:"daemon_stop" - ~msg:"stopping {worker} daemon" - ("worker", Data_encoding.string) end diff --git a/src/proto_alpha/lib_delegate/client_baking_blocks.ml b/src/proto_alpha/lib_delegate/client_baking_blocks.ml index 35d43f6d97b7..18f7dd7efc61 100644 --- a/src/proto_alpha/lib_delegate/client_baking_blocks.ml +++ b/src/proto_alpha/lib_delegate/client_baking_blocks.ml @@ -152,10 +152,10 @@ module Block_seen_event = struct module Event = Internal_event.Make (Definition) end -let monitor_applied_blocks cctxt ?chains ?protocols () = +let monitor_applied_blocks cctxt ?chains ?protocols ~next_protocols () = let open Lwt_result_syntax in let* block_stream, stop = - Monitor_services.applied_blocks cctxt ?chains ?protocols () + Monitor_services.applied_blocks cctxt ?chains ?protocols ?next_protocols () in return ( Lwt_stream.map_s diff --git a/src/proto_alpha/lib_delegate/client_baking_blocks.mli b/src/proto_alpha/lib_delegate/client_baking_blocks.mli index d9c50296db14..8426c19b8994 100644 --- a/src/proto_alpha/lib_delegate/client_baking_blocks.mli +++ b/src/proto_alpha/lib_delegate/client_baking_blocks.mli @@ -49,6 +49,7 @@ val monitor_applied_blocks : #Protocol_client_context.rpc_context -> ?chains:Chain_services.chain list -> ?protocols:Protocol_hash.t list -> + next_protocols:Protocol_hash.t list option -> unit -> (block_info tzresult Lwt_stream.t * Tezos_rpc.Context.stopper) tzresult Lwt.t diff --git a/src/proto_alpha/lib_delegate/client_baking_denunciation.ml b/src/proto_alpha/lib_delegate/client_baking_denunciation.ml index b4db64cd265a..6a373fba9b46 100644 --- a/src/proto_alpha/lib_delegate/client_baking_denunciation.ml +++ b/src/proto_alpha/lib_delegate/client_baking_denunciation.ml @@ -496,38 +496,42 @@ let cleanup_old_operations state = - Check that every baker (pre)attested only once at the block's level and round *) let process_new_block (cctxt : #Protocol_client_context.full) state - {hash; chain_id; level; _} = + {hash; chain_id; level; protocol; next_protocol; _} = let open Lwt_result_syntax in - let*! () = Events.(emit accuser_saw_block) (level, hash) in - let chain = `Hash chain_id in - let block = `Hash (hash, 0) in - state.highest_level_encountered <- - Raw_level.max level state.highest_level_encountered ; - (* Processing blocks *) - let* () = - let*! block_info = Alpha_block_services.info cctxt ~chain ~block () in - match block_info with - | Ok block_info -> ( - let* () = process_block cctxt state block_info in - (* Processing (pre)attestations in the block *) - match block_info.operations with - | consensus_ops :: _ -> - let packed_op {Alpha_block_services.shell; protocol_data; _} = - {shell; protocol_data} - in - process_operations cctxt state consensus_ops ~packed_op chain_id - | _ -> - (* Should not happen as a block should contain 4 lists of - operations, the first list being dedicated to consensus - operations. *) - let*! () = Events.(emit fetch_operations_error hash) in - return_unit) - | Error errs -> - let*! () = Events.(emit accuser_block_error) (hash, errs) in - return_unit - in - cleanup_old_operations state ; - return_unit + if Protocol_hash.(protocol <> next_protocol) then + let*! () = Events.(emit protocol_change_detected) () in + return_unit + else + let*! () = Events.(emit accuser_saw_block) (level, hash) in + let chain = `Hash chain_id in + let block = `Hash (hash, 0) in + state.highest_level_encountered <- + Raw_level.max level state.highest_level_encountered ; + (* Processing blocks *) + let* () = + let*! block_info = Alpha_block_services.info cctxt ~chain ~block () in + match block_info with + | Ok block_info -> ( + let* () = process_block cctxt state block_info in + (* Processing (pre)attestations in the block *) + match block_info.operations with + | consensus_ops :: _ -> + let packed_op {Alpha_block_services.shell; protocol_data; _} = + {shell; protocol_data} + in + process_operations cctxt state consensus_ops ~packed_op chain_id + | _ -> + (* Should not happen as a block should contain 4 lists of + operations, the first list being dedicated to consensus + operations. *) + let*! () = Events.(emit fetch_operations_error hash) in + return_unit) + | Error errs -> + let*! () = Events.(emit accuser_block_error) (hash, errs) in + return_unit + in + cleanup_old_operations state ; + return_unit let process_new_block cctxt state bi = let open Lwt_syntax in @@ -620,13 +624,8 @@ let create (cctxt : #Protocol_client_context.full) ?canceler ~preserved_levels tzfail Baking_errors.Node_connection_lost | `Block (Some (Ok bi)) -> last_get_block := None ; - (* When protocol changes, we stop current accuser process. *) - if Protocol_hash.(bi.next_protocol <> Protocol.hash) then - let*! () = B_Events.(emit daemon_stop) name in - return_unit - else - let*! () = process_new_block cctxt state bi in - worker_loop () + let*! () = process_new_block cctxt state bi in + worker_loop () | `Operations None -> (* restart a new operations monitor stream *) last_get_ops := None ; diff --git a/src/proto_alpha/lib_delegate/client_daemon.ml b/src/proto_alpha/lib_delegate/client_daemon.ml index a22f5d631d2c..ac1438c06f91 100644 --- a/src/proto_alpha/lib_delegate/client_daemon.ml +++ b/src/proto_alpha/lib_delegate/client_daemon.ml @@ -176,7 +176,11 @@ module Accuser = struct Protocol.hash in let* valid_blocks_stream, _ = - Client_baking_blocks.monitor_applied_blocks cctxt ~chains:[chain] () + Client_baking_blocks.monitor_applied_blocks + ~next_protocols:(Some [Protocol.hash]) + cctxt + ~chains:[chain] + () in let canceler = Lwt_canceler.create () in let _ = diff --git a/src/proto_alpha/lib_delegate/delegate_events.ml b/src/proto_alpha/lib_delegate/delegate_events.ml index dde15e9acbcb..ff89e553760a 100644 --- a/src/proto_alpha/lib_delegate/delegate_events.ml +++ b/src/proto_alpha/lib_delegate/delegate_events.ml @@ -275,12 +275,4 @@ module Baking_scheduling = struct ~name:"daemon_start" ~msg:"starting {worker} daemon" ("worker", Data_encoding.string) - - let daemon_stop = - declare_1 - ~section - ~level:Notice - ~name:"daemon_stop" - ~msg:"stopping {worker} daemon" - ("worker", Data_encoding.string) end diff --git a/tezt/tests/double_consensus.ml b/tezt/tests/double_consensus.ml index c4e3ee6cdfe8..b8672f5bb684 100644 --- a/tezt/tests/double_consensus.ml +++ b/tezt/tests/double_consensus.ml @@ -809,63 +809,6 @@ let accuser_processed_block accuser = let daemon_stop accuser = Accuser.wait_for accuser "daemon_stop.v0" (fun _json -> Some ()) -let accusers_migration_test ~migrate_from ~migrate_to = - let parameters = JSON.parse_file (Protocol.parameter_file migrate_to) in - (* Migration level is set arbitrarily *) - let migration_level = JSON.(get "blocks_per_cycle" parameters |> as_int) in - Test.register - ~__FILE__ - ~title: - (Format.asprintf - "accuser works correctly under migration from %s to %s" - (Protocol.tag migrate_from) - (Protocol.tag migrate_to)) - ~tags: - [team; "migration"; Protocol.tag migrate_from; Protocol.tag migrate_to] - ~uses:[Constant.octez_accuser] - @@ fun () -> - let* client, node = - Protocol_migration.user_migratable_node_init ~migration_level ~migrate_to () - in - let* () = Client.activate_protocol ~protocol:migrate_from client in - - Log.info "Initialise accuser" ; - let* accuser = Accuser.init ~event_level:`Debug node in - let accuser_processed_block = accuser_processed_block accuser in - let accuser_stop = daemon_stop accuser in - - Log.info "Bake %d levels" (migration_level - 1) ; - let* () = - repeat (migration_level - 1) (fun () -> - let* () = Client.bake_for_and_wait client in - accuser_processed_block) - in - - Log.info - "Bake one more level to migrate from %s to %s" - (Protocol.tag migrate_from) - (Protocol.tag migrate_to) ; - let* () = Client.bake_for_and_wait client in - - Log.info "After migration, old protocol accuser should have stopped" ; - let* () = accuser_stop in - - Log.info "Bake a few more levels into the new protocol" ; - let* () = - repeat 5 (fun () -> - let* () = - Client.attest_for - ~protocol:migrate_to - ~force:true - ~key:[Constant.bootstrap1.alias] - client - in - let* () = Client.bake_for_and_wait client in - accuser_processed_block) - in - let* () = Accuser.terminate accuser in - unit - let fetch_round ?block client = Client.RPC.call client @@ RPC.get_chain_block_helper_round ?block () @@ -1211,9 +1154,6 @@ let double_preattestation_aggregation_wrong_payload_hash = in unit -let register_migration ~migrate_from ~migrate_to = - accusers_migration_test ~migrate_from ~migrate_to - let register ~protocols = double_attestation_wrong_block_payload_hash protocols ; double_preattestation_wrong_block_payload_hash protocols ; diff --git a/tezt/tests/main.ml b/tezt/tests/main.ml index 9262222814b5..07fd853b27f2 100644 --- a/tezt/tests/main.ml +++ b/tezt/tests/main.ml @@ -70,7 +70,6 @@ let register_protocol_independent_tests () = let register_protocol_migration_tests () = let migrate_from = Option.get @@ Protocol.previous_protocol migrate_to in Agnostic_baker_test.register_migration ~migrate_from ~migrate_to ; - Double_consensus.register_migration ~migrate_from ~migrate_to ; Mockup.register_constant_migration ~migrate_from ~migrate_to ; Protocol_migration.register ~migrate_from ~migrate_to ; Weeklynet.register () ; @@ -103,7 +102,6 @@ let register_old_protocol_migration_tests () = | None, _ -> () | Some migrate_from, migrate_to -> Agnostic_baker_test.register_migration ~migrate_from ~migrate_to ; - Double_consensus.register_migration ~migrate_from ~migrate_to ; Sc_rollup_migration.register ~migrate_from ~migrate_to ; Dal.register_migration ~migrate_from ~migrate_to) Protocol.all -- GitLab From 3282028f27e341871d11eaba86bf2f41eb01e97e Mon Sep 17 00:00:00 2001 From: Valentin Chaboche Date: Wed, 15 Oct 2025 15:52:05 +0200 Subject: [PATCH 4/9] Agnostic/Baker: current thread is not optional --- src/lib_agnostic_baker/daemon.ml | 100 ++++++++++++++----------------- 1 file changed, 44 insertions(+), 56 deletions(-) diff --git a/src/lib_agnostic_baker/daemon.ml b/src/lib_agnostic_baker/daemon.ml index 649e35c9808f..44e4a3138a82 100644 --- a/src/lib_agnostic_baker/daemon.ml +++ b/src/lib_agnostic_baker/daemon.ml @@ -113,7 +113,7 @@ module Make_daemon (Agent : AGENT) : type state = { node_endpoint : string; - mutable current_baker : baker option; + mutable current_baker : baker; mutable old_baker : baker_to_kill option; keep_alive : bool; command : command; @@ -128,7 +128,7 @@ module Make_daemon (Agent : AGENT) : let open Lwt_result_syntax in let*! result = f () in match result with - | Ok () -> return_unit + | Ok res -> return res | Error (Lost_node_connection :: _ | Cannot_connect_to_node _ :: _) -> let* _level = Utils.retry @@ -143,11 +143,12 @@ module Make_daemon (Agent : AGENT) : retry_on_disconnection ~emit node_addr f | Error trace -> fail trace - (** [run_thread ~protocol_hash ~cancel_promise ~init_sapling_params state] returns the - main running thread for the baker given its protocol [~protocol_hash] and - cancellation [~cancel_promise]. It can optionally initialise sapling parameters, - as requested by [~init_sapling_params]. *) - let run_thread ~protocol_hash ~cancel_promise ~init_sapling_params state = + (** [run_thread ~protocol_hash ~cancel_promise ~init_sapling_params cctxt + command] returns the main running thread for the baker given its protocol + [~protocol_hash] and cancellation [~cancel_promise]. It can optionally + initialise sapling parameters, as requested by [~init_sapling_params]. *) + let run_thread ~protocol_hash ~cancel_promise ~init_sapling_params cctxt + command = let plugin = match Protocol_plugins.proto_plugin_for_protocol @@ -177,11 +178,12 @@ module Make_daemon (Agent : AGENT) : Tezos_sapling.Core.Validator.init_params () else () ; - let agent_thread = Agent.run_command plugin state.cctxt state.command in + let agent_thread = Agent.run_command plugin cctxt command in Lwt.pick [agent_thread; cancel_promise] - (** [spawn_baker protocol_hash] spawns a new baker process for the given [protocol_hash]. *) - let spawn_baker state protocol_hash = + (** [spawn_baker cctxt command protocol_hash] spawns a new baker process for + the given [protocol_hash]. *) + let spawn_baker cctxt command protocol_hash = let open Lwt_result_syntax in let*! () = Events.(emit starting_agent) (Agent.name, protocol_hash) in let cancel_promise, canceller = Lwt.wait () in @@ -190,7 +192,8 @@ module Make_daemon (Agent : AGENT) : ~protocol_hash ~cancel_promise ~init_sapling_params:Agent.init_sapling_params - state + cctxt + command in let*! () = Events.(emit agent_running) (Agent.name, protocol_hash) in return {protocol_hash; process = {thread; canceller}} @@ -201,11 +204,6 @@ module Make_daemon (Agent : AGENT) : let hot_swap_baker ~state ~current_protocol_hash ~next_protocol_hash ~level_to_kill_old_baker = let open Lwt_result_syntax in - let* current_baker = - match state.current_baker with - | Some baker -> return baker - | None -> tzfail (Missing_current_agent Agent.name) - in let next_proto_status = Parameters.protocol_status next_protocol_hash in let*! () = Events.(emit protocol_encountered) (next_proto_status, next_protocol_hash) @@ -215,10 +213,10 @@ module Make_daemon (Agent : AGENT) : (Agent.name, current_protocol_hash, level_to_kill_old_baker) in state.old_baker <- - Some {baker = current_baker; level_to_kill = level_to_kill_old_baker} ; - state.current_baker <- None ; - let* new_baker = spawn_baker state next_protocol_hash in - state.current_baker <- Some new_baker ; + Some + {baker = state.current_baker; level_to_kill = level_to_kill_old_baker} ; + let* new_baker = spawn_baker state.cctxt state.command next_protocol_hash in + state.current_baker <- new_baker ; return_unit (** [maybe_kill_old_baker state node_addr] checks whether the [old_baker] process @@ -313,11 +311,7 @@ module Make_daemon (Agent : AGENT) : ~node_addr [@profiler.record_s {verbosity = Notice} "get_next_protocol_hash"]) in - let* current_protocol_hash = - match state.current_baker with - | None -> tzfail (Missing_current_agent Agent.name) - | Some baker -> return baker.protocol_hash - in + let current_protocol_hash = state.current_baker.protocol_hash in let* () = if not (Protocol_hash.equal current_protocol_hash next_protocol_hash) @@ -344,26 +338,21 @@ module Make_daemon (Agent : AGENT) : it propagates any error that can appear. *) let baker_thread ~state = let open Lwt_result_syntax in - let*! res = - match state.current_baker with - | Some baker -> baker.process.thread - | None -> return_unit - in + let*! res = state.current_baker.process.thread in match res with | Ok () -> return_unit | Error err -> fail (Agent_process_error Agent.name :: err) (* ---- Agnostic Baker Bootstrap ---- *) - (** [may_start_initial_baker state] recursively waits for an [active] protocol - and spawns a baker for it. If the protocol is [frozen] (not [active] anymore), it - waits for a head with an [active] protocol. *) - let may_start_initial_baker state = + (** [may_start_initial_baker cctxt command ~node_addr] recursively waits + for an [active] protocol and spawns a baker for it. If the protocol is + [frozen] (not [active] anymore), it waits for a head with an [active] + protocol. *) + let may_start_initial_baker cctxt command ~node_addr = let open Lwt_result_syntax in let rec may_start ?last_known_proto ~head_stream () = - let* protocol_hash = - Rpc_services.get_next_protocol_hash ~node_addr:state.node_endpoint - in + let* protocol_hash = Rpc_services.get_next_protocol_hash ~node_addr in let proto_status = Parameters.protocol_status protocol_hash in let*! () = match last_known_proto with @@ -375,9 +364,8 @@ module Make_daemon (Agent : AGENT) : in match proto_status with | Active -> - let* current_baker = spawn_baker state protocol_hash in - state.current_baker <- Some current_baker ; - return_unit + let* current_baker = spawn_baker cctxt command protocol_hash in + return current_baker | Frozen -> ( let* head_stream = match head_stream with @@ -388,7 +376,7 @@ module Make_daemon (Agent : AGENT) : (proto_status, protocol_hash) in let*! () = Events.(emit waiting_for_active_protocol) () in - monitor_heads ~node_addr:state.node_endpoint + monitor_heads ~node_addr in let*! v = Lwt_stream.get head_stream in match v with @@ -422,31 +410,31 @@ module Make_daemon (Agent : AGENT) : let run ~keep_alive ~command cctxt = let open Lwt_result_syntax in - let state = - { - node_endpoint = Uri.to_string cctxt#base; - current_baker = None; - old_baker = None; - keep_alive; - command; - cctxt; - } - in () [@profiler.overwrite may_start_profiler cctxt#get_base_dir] ; - let node_addr = state.node_endpoint in + let node_addr = Uri.to_string cctxt#base in let*! () = Events.(emit starting_daemon) Agent.name in let _ccid = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> let*! () = Events.(emit stopping_daemon) Agent.name in Lwt.return_unit) in - let* () = - if state.keep_alive then + let* current_baker = + if keep_alive then retry_on_disconnection ~emit:(Events.emit Events.cannot_connect) node_addr - (fun () -> may_start_initial_baker state) - else may_start_initial_baker state + (fun () -> may_start_initial_baker cctxt command ~node_addr) + else may_start_initial_baker cctxt command ~node_addr + in + let state = + { + node_endpoint = node_addr; + current_baker; + old_baker = None; + keep_alive; + command; + cctxt; + } in let monitor_voting_periods () = let* head_stream = monitor_heads ~node_addr in -- GitLab From 703a71aab7d7c9a5e4f494a30db535bd72d8811f Mon Sep 17 00:00:00 2001 From: Valentin Chaboche Date: Wed, 15 Oct 2025 16:43:33 +0200 Subject: [PATCH 5/9] Agnostic/Baker: main scheduler --- src/lib_agnostic_baker/daemon.ml | 195 +++++++++++++++++-------------- 1 file changed, 105 insertions(+), 90 deletions(-) diff --git a/src/lib_agnostic_baker/daemon.ml b/src/lib_agnostic_baker/daemon.ml index 44e4a3138a82..b61c7425b2c9 100644 --- a/src/lib_agnostic_baker/daemon.ml +++ b/src/lib_agnostic_baker/daemon.ml @@ -118,6 +118,7 @@ module Make_daemon (Agent : AGENT) : keep_alive : bool; command : command; cctxt : Tezos_client_base.Client_context.full; + head_stream : string Lwt_stream.t; } type t = state @@ -276,72 +277,47 @@ module Make_daemon (Agent : AGENT) : - Shut down an old baker it its time has come; - Spawn and "hot-swap" to a new baker if the next protocol hash is different. The voting period information is used for logging purposes. *) - let monitor_voting_periods ~state head_stream = + let monitor_voting_periods ~state = let open Lwt_result_syntax in let node_addr = state.node_endpoint in - let rec loop () = - let*! v = Lwt_stream.get head_stream in - match v with - | None -> tzfail Lost_node_connection - | Some _tick -> - let* block_hash = - (Rpc_services.get_block_hash - ~node_addr - [@profiler.record_s {verbosity = Notice} "get_block_hash"]) - in - () - [@profiler.reset_block_section - {profiler_module = Profiler} block_hash] ; - let* period_kind, remaining = - (Rpc_services.get_current_period - ~node_addr - [@profiler.record_s {verbosity = Notice} "get_current_period"]) - in - let*! () = - Events.(emit period_status) (block_hash, period_kind, remaining) - in - let* () = - (maybe_kill_old_baker - state - node_addr - [@profiler.record_s {verbosity = Notice} "maybe_kill_old_baker"]) - in - let* next_protocol_hash = - (Rpc_services.get_next_protocol_hash - ~node_addr - [@profiler.record_s {verbosity = Notice} "get_next_protocol_hash"]) - in - let current_protocol_hash = state.current_baker.protocol_hash in - let* () = - if - not (Protocol_hash.equal current_protocol_hash next_protocol_hash) - then - let* head_level = - (Rpc_services.get_level - ~node_addr - [@profiler.record_s {verbosity = Notice} "get_level"]) - in - (hot_swap_baker - ~state - ~current_protocol_hash - ~next_protocol_hash - ~level_to_kill_old_baker: - (head_level + Parameters.extra_levels_for_old_baker) - [@profiler.record_s {verbosity = Notice} "hot_swap_baker"]) - else return_unit - in - loop () + let* block_hash = + (Rpc_services.get_block_hash + ~node_addr [@profiler.record_s {verbosity = Notice} "get_block_hash"]) in - loop () - - (** [baker_thread ~state] monitors the current baker thread for any potential error, and - it propagates any error that can appear. *) - let baker_thread ~state = - let open Lwt_result_syntax in - let*! res = state.current_baker.process.thread in - match res with - | Ok () -> return_unit - | Error err -> fail (Agent_process_error Agent.name :: err) + () [@profiler.reset_block_section {profiler_module = Profiler} block_hash] ; + let* period_kind, remaining = + (Rpc_services.get_current_period + ~node_addr + [@profiler.record_s {verbosity = Notice} "get_current_period"]) + in + let*! () = + Events.(emit period_status) (block_hash, period_kind, remaining) + in + let* () = + (maybe_kill_old_baker + state + node_addr + [@profiler.record_s {verbosity = Notice} "maybe_kill_old_baker"]) + in + let* next_protocol_hash = + (Rpc_services.get_next_protocol_hash + ~node_addr + [@profiler.record_s {verbosity = Notice} "get_next_protocol_hash"]) + in + let current_protocol_hash = state.current_baker.protocol_hash in + if not (Protocol_hash.equal current_protocol_hash next_protocol_hash) then + let* head_level = + (Rpc_services.get_level + ~node_addr [@profiler.record_s {verbosity = Notice} "get_level"]) + in + (hot_swap_baker + ~state + ~current_protocol_hash + ~next_protocol_hash + ~level_to_kill_old_baker: + (head_level + Parameters.extra_levels_for_old_baker) + [@profiler.record_s {verbosity = Notice} "hot_swap_baker"]) + else return_unit (* ---- Agnostic Baker Bootstrap ---- *) @@ -408,6 +384,55 @@ module Make_daemon (Agent : AGENT) : backends | None -> () + type event = + | New_head + | Head_stream_ended + | Old_baker_stopped + | Current_baker_stopped + + let rec main_loop state = + let open Lwt_result_syntax in + let current_baker = + Lwt_result.map + (fun () -> Current_baker_stopped) + state.current_baker.process.thread + in + let old_baker = + match state.old_baker with + | Some old_baker -> + Lwt_result.map + (fun () -> Old_baker_stopped) + old_baker.baker.process.thread + | None -> Lwt_utils.never_ending () + in + let head_stream = + Lwt.map + (function None -> Ok Head_stream_ended | Some _head -> Ok New_head) + (Lwt_stream.get state.head_stream) + in + let* pick = Lwt.choose [current_baker; old_baker; head_stream] in + match pick with + | New_head -> + let* () = monitor_voting_periods ~state in + main_loop state + | Head_stream_ended -> tzfail Lost_node_connection + | Old_baker_stopped -> ( + match state.old_baker with + | None -> assert false + | Some old_baker -> + let* head_level = + (Rpc_services.get_level + ~node_addr:state.node_endpoint + [@profiler.record_s {verbosity = Notice} "get_level"]) + in + if head_level >= old_baker.level_to_kill then + (* The old baker is expired, it is expected. *) + main_loop state + else return_unit) + | Current_baker_stopped -> + (* It's not supposed to stop. *) + return_unit + let run ~keep_alive ~command cctxt = let open Lwt_result_syntax in () [@profiler.overwrite may_start_profiler cctxt#get_base_dir] ; @@ -426,33 +451,23 @@ module Make_daemon (Agent : AGENT) : (fun () -> may_start_initial_baker cctxt command ~node_addr) else may_start_initial_baker cctxt command ~node_addr in - let state = - { - node_endpoint = node_addr; - current_baker; - old_baker = None; - keep_alive; - command; - cctxt; - } - in - let monitor_voting_periods () = - let* head_stream = monitor_heads ~node_addr in - monitor_voting_periods ~state head_stream - in - (* Monitoring voting periods through heads monitoring to avoid - missing UAUs. *) - let* (), () = - tzboth - (* We do not care if --keep-alive is provided, if the baker thread doesn't - have the argument it'll abort the process anyway. *) - (retry_on_disconnection - ~emit:(fun _ -> Lwt.return_unit) - node_addr - monitor_voting_periods) - (baker_thread ~state) - in - return_unit + retry_on_disconnection + ~emit:(fun _ -> Lwt.return_unit) + node_addr + (fun () -> + let* head_stream = monitor_heads ~node_addr in + let state = + { + node_endpoint = node_addr; + current_baker; + old_baker = None; + keep_alive; + command; + cctxt; + head_stream; + } + in + main_loop state) end module Baker : AGNOSTIC_DAEMON with type command = Baker_agent.command = -- GitLab From ca37d927bd8e1b6eac2ea18e3708c966a8b4101a Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Mon, 27 Oct 2025 14:59:39 +0100 Subject: [PATCH 6/9] Agnostic_baker: use a state and remove mutable from daemon main loop --- src/lib_agnostic_baker/daemon.ml | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/lib_agnostic_baker/daemon.ml b/src/lib_agnostic_baker/daemon.ml index b61c7425b2c9..8ea4841e7248 100644 --- a/src/lib_agnostic_baker/daemon.ml +++ b/src/lib_agnostic_baker/daemon.ml @@ -113,8 +113,8 @@ module Make_daemon (Agent : AGENT) : type state = { node_endpoint : string; - mutable current_baker : baker; - mutable old_baker : baker_to_kill option; + current_baker : baker; + old_baker : baker_to_kill option; keep_alive : bool; command : command; cctxt : Tezos_client_base.Client_context.full; @@ -213,12 +213,12 @@ module Make_daemon (Agent : AGENT) : Events.(emit become_old_agent) (Agent.name, current_protocol_hash, level_to_kill_old_baker) in - state.old_baker <- + let old_baker = Some - {baker = state.current_baker; level_to_kill = level_to_kill_old_baker} ; + {baker = state.current_baker; level_to_kill = level_to_kill_old_baker} + in let* new_baker = spawn_baker state.cctxt state.command next_protocol_hash in - state.current_baker <- new_baker ; - return_unit + return {state with old_baker; current_baker = new_baker} (** [maybe_kill_old_baker state node_addr] checks whether the [old_baker] process from the [state] of the agnostic baker has surpassed its lifetime and it stops @@ -226,7 +226,7 @@ module Make_daemon (Agent : AGENT) : let maybe_kill_old_baker state node_addr = let open Lwt_result_syntax in match state.old_baker with - | None -> return_unit + | None -> return state | Some {baker; level_to_kill} -> let* head_level = (Rpc_services.get_level @@ -239,9 +239,8 @@ module Make_daemon (Agent : AGENT) : Lwt.wakeup baker.process.canceller (Ok ()) [@profiler.record_f {verbosity = Notice} "kill old baker"] ; - state.old_baker <- None ; - return_unit) - else return_unit + return {state with old_baker = None}) + else return state (* ---- Baker and Chain Monitoring ---- *) @@ -293,7 +292,7 @@ module Make_daemon (Agent : AGENT) : let*! () = Events.(emit period_status) (block_hash, period_kind, remaining) in - let* () = + let* state = (maybe_kill_old_baker state node_addr @@ -317,7 +316,7 @@ module Make_daemon (Agent : AGENT) : ~level_to_kill_old_baker: (head_level + Parameters.extra_levels_for_old_baker) [@profiler.record_s {verbosity = Notice} "hot_swap_baker"]) - else return_unit + else return state (* ---- Agnostic Baker Bootstrap ---- *) @@ -413,7 +412,7 @@ module Make_daemon (Agent : AGENT) : let* pick = Lwt.choose [current_baker; old_baker; head_stream] in match pick with | New_head -> - let* () = monitor_voting_periods ~state in + let* state = monitor_voting_periods ~state in main_loop state | Head_stream_ended -> tzfail Lost_node_connection | Old_baker_stopped -> ( -- GitLab From d46e08b86228633ca4020c729a6d8dfda43b71b3 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Mon, 27 Oct 2025 15:09:01 +0100 Subject: [PATCH 7/9] Agnostic_baker: rework retry_on_disconnection --- src/lib_agnostic_baker/daemon.ml | 38 +++++++++++++++++--------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/src/lib_agnostic_baker/daemon.ml b/src/lib_agnostic_baker/daemon.ml index 8ea4841e7248..186454538be3 100644 --- a/src/lib_agnostic_baker/daemon.ml +++ b/src/lib_agnostic_baker/daemon.ml @@ -118,16 +118,16 @@ module Make_daemon (Agent : AGENT) : keep_alive : bool; command : command; cctxt : Tezos_client_base.Client_context.full; - head_stream : string Lwt_stream.t; + head_stream : string Lwt_stream.t option; } type t = state (* ---- Baker Process Management ---- *) - let rec retry_on_disconnection ~emit node_addr f = + let rec retry_on_disconnection ~emit node_addr f arg = let open Lwt_result_syntax in - let*! result = f () in + let*! result = f arg in match result with | Ok res -> return res | Error (Lost_node_connection :: _ | Cannot_connect_to_node _ :: _) -> @@ -141,7 +141,7 @@ module Make_daemon (Agent : AGENT) : (fun node_addr -> Rpc_services.get_level ~node_addr) node_addr in - retry_on_disconnection ~emit node_addr f + retry_on_disconnection ~emit node_addr f arg | Error trace -> fail trace (** [run_thread ~protocol_hash ~cancel_promise ~init_sapling_params cctxt @@ -407,7 +407,7 @@ module Make_daemon (Agent : AGENT) : let head_stream = Lwt.map (function None -> Ok Head_stream_ended | Some _head -> Ok New_head) - (Lwt_stream.get state.head_stream) + (Lwt_stream.get head_stream) in let* pick = Lwt.choose [current_baker; old_baker; head_stream] in match pick with @@ -448,25 +448,27 @@ module Make_daemon (Agent : AGENT) : ~emit:(Events.emit Events.cannot_connect) node_addr (fun () -> may_start_initial_baker cctxt command ~node_addr) + () else may_start_initial_baker cctxt command ~node_addr in + let state = + { + node_endpoint = node_addr; + current_baker; + old_baker = None; + keep_alive; + command; + cctxt; + head_stream = None; + } + in retry_on_disconnection ~emit:(fun _ -> Lwt.return_unit) node_addr - (fun () -> + (fun state -> let* head_stream = monitor_heads ~node_addr in - let state = - { - node_endpoint = node_addr; - current_baker; - old_baker = None; - keep_alive; - command; - cctxt; - head_stream; - } - in - main_loop state) + main_loop {state with head_stream = Some head_stream}) + state end module Baker : AGNOSTIC_DAEMON with type command = Baker_agent.command = -- GitLab From a99372767f1d5c4cf557ad79f95a02b57bf965f7 Mon Sep 17 00:00:00 2001 From: Mathias Bourgoin Date: Tue, 28 Oct 2025 10:56:28 +0100 Subject: [PATCH 8/9] tezt: add baker reconnect migration test --- tezt/tests/agnostic_baker_test.ml | 123 +++++++++++++++++++++++++++++- 1 file changed, 120 insertions(+), 3 deletions(-) diff --git a/tezt/tests/agnostic_baker_test.ml b/tezt/tests/agnostic_baker_test.ml index 38a2fca09467..795da1707456 100644 --- a/tezt/tests/agnostic_baker_test.ml +++ b/tezt/tests/agnostic_baker_test.ml @@ -63,8 +63,9 @@ let remote_sign client = (* Performs a protocol migration thanks to a UAU and the agnostic baker. *) let perform_protocol_migration ?node_name ?client_name ?parameter_file - ?(use_remote_signer = false) ~blocks_per_cycle ~migration_level - ~migrate_from ~migrate_to ~baked_blocks_after_migration () = + ?(use_remote_signer = false) ?(extra_run_arguments = []) ?on_new_baker_ready + ~blocks_per_cycle ~migration_level ~migrate_from ~migrate_to + ~baked_blocks_after_migration () = assert (migration_level >= blocks_per_cycle) ; Log.info "Node starting" ; let* client, node = @@ -83,7 +84,7 @@ let perform_protocol_migration ?node_name ?client_name ?parameter_file wait_for_active_protocol_waiting baker in Log.info "Starting agnostic baker" ; - let* () = Agnostic_baker.run baker in + let* () = Agnostic_baker.run ~extra_arguments:extra_run_arguments baker in Log.info "Starting agnostic accuser" ; let* () = Accuser.run accuser in @@ -106,6 +107,11 @@ let perform_protocol_migration ?node_name ?client_name ?parameter_file (Protocol.tag migrate_from) ; let* () = wait_for_become_old_baker in let* () = wait_for_become_old_accuser in + let* () = + match on_new_baker_ready with + | None -> Lwt.return_unit + | Some f -> f ~node ~client ~baker + in (* Ensure that the block before migration is consistent *) Log.info "Checking migration block consistency" ; let* () = @@ -192,6 +198,116 @@ let migrate ~migrate_from ~migrate_to ~use_remote_signer = in unit +let reconnect_after_migration ~migrate_from ~migrate_to = + let parameters = JSON.parse_file (Protocol.parameter_file migrate_to) in + Test.register + ~__FILE__ + ~title: + (Format.asprintf + "Protocol migration reconnect from %s to %s" + (Protocol.tag migrate_from) + (Protocol.tag migrate_to)) + ~tags: + [ + "protocol"; + "migration"; + "agnostic"; + "baker"; + "reconnect"; + Protocol.tag migrate_from; + Protocol.tag migrate_to; + "flaky"; + ] + ~uses:[Constant.octez_agnostic_baker] + @@ fun () -> + let blocks_per_cycle = JSON.(get "blocks_per_cycle" parameters |> as_int) in + let consensus_rights_delay = + JSON.(get "consensus_rights_delay" parameters |> as_int) + in + let baked_blocks_after_migration = + 2 * consensus_rights_delay * blocks_per_cycle + in + let migration_level = blocks_per_cycle in + + Log.info "Node starting" ; + let* client, node = + Protocol_migration.user_migratable_node_init ~migration_level ~migrate_to () + in + Log.info "Node %s initialized" (Node.name node) ; + let baker = Agnostic_baker.create node client in + + let wait_for_active_protocol_waiting = + wait_for_active_protocol_waiting baker + in + let wait_for_ready () = Agnostic_baker.wait_for_ready baker in + let wait_for_period_status () = + Agnostic_baker.wait_for baker "period_status.v0" (fun _ -> Some ()) + in + let wait_for_become_old_baker = wait_for_become_old_baker baker in + let wait_for_stopping_baker = wait_for_stopping_baker baker in + + let timeout message seconds = + let* () = Lwt_unix.sleep seconds in + Test.fail message + in + + Log.info "Starting agnostic baker" ; + let* () = Agnostic_baker.run ~extra_arguments:["--keep-alive"] baker in + let* () = wait_for_active_protocol_waiting in + let* () = wait_for_ready () in + let* () = + Client.activate_protocol ~protocol:migrate_from client ~timestamp:Client.Now + in + Log.info "Protocol %s activated" (Protocol.hash migrate_from) ; + Log.info "Baking at least %d blocks to trigger migration" migration_level ; + let* _level = Node.wait_for_level node migration_level in + Log.info + "Check that the baking process for %s is not killed" + (Protocol.tag migrate_from) ; + let* () = wait_for_become_old_baker in + + (* Prepare watchers for the restart before it happens. *) + let wait_ready_after_restart = wait_for_ready () in + let wait_period_status_after_restart = wait_for_period_status () in + + Log.info "Restarting node to check baker reconnection" ; + let* () = Node.terminate node in + let* () = Lwt_unix.sleep 1. in + let* () = Node.run node [] in + let* () = Node.wait_for_ready node in + let* () = + Lwt.pick + [ + (let* () = wait_ready_after_restart in + Log.info "Agnostic baker reported ready after node restart" ; + Lwt.return_unit); + timeout "Baker did not report ready after node restart" 60.; + ] + in + let* () = + Lwt.pick + [ + (let* () = wait_period_status_after_restart in + Log.info "Agnostic baker emitted period_status after node restart" ; + Lwt.return_unit); + timeout "Baker did not emit period_status after node restart" 120.; + ] + in + + Log.info + "Check that baker for protocol %s is killed after %d levels" + (Protocol.tag migrate_from) + Agnostic_baker.extra_levels_for_old_baker ; + let kill_level = + migration_level + Agnostic_baker.extra_levels_for_old_baker + in + let* _level = Node.wait_for_level node kill_level in + let* () = wait_for_stopping_baker in + + let* _level = Node.wait_for_level node baked_blocks_after_migration in + let* () = Agnostic_baker.terminate baker in + unit + let test_start_and_stop () = Test.register ~__FILE__ @@ -336,6 +452,7 @@ let register ~protocols = let register_migration ~migrate_from ~migrate_to = migrate ~migrate_from ~migrate_to ~use_remote_signer:false ; + reconnect_after_migration ~migrate_from ~migrate_to ; migrate ~migrate_from ~migrate_to ~use_remote_signer:true let register_protocol_independent () = -- GitLab From 31934dbc71d0987d6f5f09c1983ed7c2d5303a78 Mon Sep 17 00:00:00 2001 From: Mathias Bourgoin Date: Tue, 28 Oct 2025 10:52:59 +0100 Subject: [PATCH 9/9] agnostic_baker: keep scheduler state on reconnect --- src/lib_agnostic_baker/daemon.ml | 59 ++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/src/lib_agnostic_baker/daemon.ml b/src/lib_agnostic_baker/daemon.ml index 186454538be3..67aa4cf30565 100644 --- a/src/lib_agnostic_baker/daemon.ml +++ b/src/lib_agnostic_baker/daemon.ml @@ -118,7 +118,7 @@ module Make_daemon (Agent : AGENT) : keep_alive : bool; command : command; cctxt : Tezos_client_base.Client_context.full; - head_stream : string Lwt_stream.t option; + head_stream : string Lwt_stream.t; } type t = state @@ -271,8 +271,8 @@ module Make_daemon (Agent : AGENT) : ignore (loop () : unit Lwt.t) ; return stream - (** [monitor_voting_periods ~state head_stream] continuously monitors [heads_stream] - to detect protocol changes. It will: + (** [monitor_voting_periods ~state] continuously monitors chain data to detect + protocol changes. It will: - Shut down an old baker it its time has come; - Spawn and "hot-swap" to a new baker if the next protocol hash is different. The voting period information is used for logging purposes. *) @@ -389,7 +389,7 @@ module Make_daemon (Agent : AGENT) : | Old_baker_stopped | Current_baker_stopped - let rec main_loop state = + let main_iteration state = let open Lwt_result_syntax in let current_baker = Lwt_result.map @@ -402,19 +402,24 @@ module Make_daemon (Agent : AGENT) : Lwt_result.map (fun () -> Old_baker_stopped) old_baker.baker.process.thread - | None -> Lwt_utils.never_ending () + | None -> + (* If there is no [old_baker], this promise is not expected to resolve + anytime. *) + Lwt_utils.never_ending () in let head_stream = Lwt.map (function None -> Ok Head_stream_ended | Some _head -> Ok New_head) - (Lwt_stream.get head_stream) + (Lwt_stream.get state.head_stream) in let* pick = Lwt.choose [current_baker; old_baker; head_stream] in match pick with | New_head -> let* state = monitor_voting_periods ~state in - main_loop state - | Head_stream_ended -> tzfail Lost_node_connection + return (Some state) + | Head_stream_ended -> + let* head_stream = monitor_heads ~node_addr:state.node_endpoint in + return (Some {state with head_stream}) | Old_baker_stopped -> ( match state.old_baker with | None -> assert false @@ -426,11 +431,24 @@ module Make_daemon (Agent : AGENT) : in if head_level >= old_baker.level_to_kill then (* The old baker is expired, it is expected. *) - main_loop state - else return_unit) + return (Some {state with old_baker = None}) + else failwith "Old baker was killed unexpectedly") | Current_baker_stopped -> (* It's not supposed to stop. *) - return_unit + failwith "Current baker stopped unexpectedly" + + let rec main_loop state = + let open Lwt_result_syntax in + let* result = + retry_on_disconnection + ~emit:(fun _ -> Lwt.return_unit) + state.node_endpoint + main_iteration + state + in + match result with + | None -> return_unit + | Some next_state -> main_loop next_state let run ~keep_alive ~command cctxt = let open Lwt_result_syntax in @@ -451,6 +469,15 @@ module Make_daemon (Agent : AGENT) : () else may_start_initial_baker cctxt command ~node_addr in + let* head_stream = + (* Useful if the baker is started before the node or restarted while the + node is down. *) + retry_on_disconnection + ~emit:(fun _ -> Lwt.return_unit) + node_addr + (fun node_addr -> monitor_heads ~node_addr) + node_addr + in let state = { node_endpoint = node_addr; @@ -459,16 +486,10 @@ module Make_daemon (Agent : AGENT) : keep_alive; command; cctxt; - head_stream = None; + head_stream; } in - retry_on_disconnection - ~emit:(fun _ -> Lwt.return_unit) - node_addr - (fun state -> - let* head_stream = monitor_heads ~node_addr in - main_loop {state with head_stream = Some head_stream}) - state + main_loop state end module Baker : AGNOSTIC_DAEMON with type command = Baker_agent.command = -- GitLab