diff --git a/CHANGES.rst b/CHANGES.rst index 7792e0202fd4c88ebebb76eaf056a85b0c4083d3..60701b3c13770f6634cb0cf09b49799d2fc9dbff 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -83,6 +83,10 @@ Node ``/monitor/commit_hash`` RPC endpoint. Use ``/version`` instead. (MR :gl:`!13879`) +- Adds a new optional query parameter ``force_node_sync`` to ``POST /injection/operation`` + to only inject the operation if the node is synchronized or fail otherwise. + (MR :gl:`!13853`) + Client ------ diff --git a/src/lib_shell/injection_directory.ml b/src/lib_shell/injection_directory.ml index eae255426ae5569bb6d7eaa89639740505580f26..81e57ed878dd71b6cfe35e40778a30484beb883c 100644 --- a/src/lib_shell/injection_directory.ml +++ b/src/lib_shell/injection_directory.ml @@ -45,22 +45,31 @@ let inject_block validator ?force ?chain bytes operations = let* _ = block in return_unit ) -let inject_operation validator ~force ?chain bytes = +let inject_operation validator ~force ?chain ?force_node_sync bytes = let open Lwt_result_syntax in let*! chain_id = read_chain_id validator chain in match Data_encoding.Binary.of_bytes_opt Operation.encoding bytes with | None -> failwith "Can't parse the operation" | Some op -> - let t = Validator.inject_operation validator ~force ?chain_id op in + let t = + Validator.inject_operation + validator + ~force + ?chain_id + ?force_node_sync + op + in let hash = Operation.hash op in return (hash, t) -let inject_operations validator ~force ?chain bytes_list = +let inject_operations validator ~force ?force_node_sync ?chain bytes_list = let open Lwt_result_syntax in let* rev_hashes, rev_promises = List.fold_left_es (fun (hashes, promises) bytes -> - let* hash, promise = inject_operation validator ~force ?chain bytes in + let* hash, promise = + inject_operation validator ~force ?force_node_sync ?chain bytes + in return (hash :: hashes, promise :: promises)) ([], []) bytes_list diff --git a/src/lib_shell/validator.ml b/src/lib_shell/validator.ml index 753558b4333ceeb9f79f40aa7ec957dfbca3f59e..12bea687aaeab6b71683039d0246c0249d41adb5 100644 --- a/src/lib_shell/validator.ml +++ b/src/lib_shell/validator.ml @@ -24,6 +24,34 @@ (* *) (*****************************************************************************) +type error += Missing_validator of Chain_id.t | Node_not_synchronised + +let () = + let open Data_encoding in + register_error_kind + `Permanent + ~id:"Missing_validator_error" + ~title:"Missing validator error" + ~description:"Missing validator for the given chain id" + ~pp:(fun ppf chain_id -> + Format.fprintf + ppf + "Could not get validator for the given chain id %a" + Chain_id.pp_short + chain_id) + (obj1 (req "chain_id" Chain_id.encoding)) + (function Missing_validator chain_id -> Some chain_id | _ -> None) + (function chain_id -> Missing_validator chain_id) ; + register_error_kind + `Permanent + ~id:"Node_not_synchronised" + ~title:"Node not synchronised" + ~description:"Node is not synchronised" + ~pp:(fun ppf () -> Format.fprintf ppf "Node is not synchronised") + unit + (function Node_not_synchronised -> Some () | _ -> None) + (function () -> Node_not_synchronised) + type t = { state : Store.t; db : Distributed_db.t; @@ -175,7 +203,22 @@ let watcher {valid_block_input; _} = Lwt_watcher.create_stream valid_block_input let chains_watcher {chains_input; _} = Lwt_watcher.create_stream chains_input -let inject_operation v ?chain_id ~force op = +let get_sync_status validator chain_id = + let open Lwt_result_syntax in + match get validator chain_id with + | Error _ -> tzfail (Missing_validator chain_id) + | Ok chain_validator -> return (Chain_validator.sync_status chain_validator) + +let may_check_sync_status ~force_node_sync validator chain_id = + let open Lwt_result_syntax in + if not force_node_sync then return_unit + else + let* sync_status = get_sync_status validator chain_id in + match sync_status with + | Synchronised _ -> return_unit + | Not_synchronised -> tzfail Node_not_synchronised + +let inject_operation v ?chain_id ?(force_node_sync = false) ~force op = let open Lwt_result_syntax in let inject_operation_on nv ~handle_missing_prevalidator = match Chain_validator.prevalidator nv with @@ -192,7 +235,8 @@ let inject_operation v ?chain_id ~force op = | None -> if force then Chain_id.Table.iter_es - (fun _chain_id chain -> + (fun chain_id chain -> + let* () = may_check_sync_status ~force_node_sync v chain_id in inject_operation_on chain ~handle_missing_prevalidator:return_unit) @@ -203,9 +247,11 @@ let inject_operation v ?chain_id ~force op = Block_hash.pp_short op.shell.branch | Some (chain_id, _bh) -> + let* () = may_check_sync_status ~force_node_sync v chain_id in let*? nv = get v chain_id in inject_operation_on nv ~handle_missing_prevalidator) | Some chain_id -> + let* () = may_check_sync_status ~force_node_sync v chain_id in let*? nv = get v chain_id in inject_operation_on nv ~handle_missing_prevalidator diff --git a/src/lib_shell/validator.mli b/src/lib_shell/validator.mli index e1a308e0e59ad3954a9125446fc0b1d7fb275a0d..4c196234175068fd93c4c6c1d0975a241e4bf4ce 100644 --- a/src/lib_shell/validator.mli +++ b/src/lib_shell/validator.mli @@ -73,8 +73,15 @@ val chains_watcher : t -> (Chain_id.t * bool) Lwt_stream.t * Lwt_watcher.stopper prevalidator is associated with the [chain_id]. If no [chain_id] is provided, try to recover a chain_id from the branch in the operation data. Fails if no chain_id can be recovered unless [force] is set. - If force is set, notify all the known prevalidator workers. *) + If force is set, notify all the known prevalidator workers. + If [force_node_sync] is set, fails to inject if the node is not synchronised. + *) val inject_operation : - t -> ?chain_id:Chain_id.t -> force:bool -> Operation.t -> unit tzresult Lwt.t + t -> + ?chain_id:Chain_id.t -> + ?force_node_sync:bool -> + force:bool -> + Operation.t -> + unit tzresult Lwt.t val distributed_db : t -> Distributed_db.t diff --git a/src/lib_shell_services/injection_services.ml b/src/lib_shell_services/injection_services.ml index 30e508c17880c8fc3338df072259203003230f90..786a970103a30c7843653bcde2663c1aa71e6153 100644 --- a/src/lib_shell_services/injection_services.ml +++ b/src/lib_shell_services/injection_services.ml @@ -122,29 +122,35 @@ module S = struct let operation_query = let open Tezos_rpc.Query in - query (fun async chain -> + query (fun async chain force_node_sync -> object method async = async method chain = chain + + method force_node_sync = force_node_sync end) |+ flag "async" (fun t -> t#async) |+ opt_field "chain" Chain_services.chain_arg (fun t -> t#chain) + |+ flag "force_node_sync" (fun t -> t#force_node_sync) |> seal let operations_query = let open Tezos_rpc.Query in - query (fun async force chain -> + query (fun async force chain force_node_sync -> object method async = async method force = force method chain = chain + + method force_node_sync = force_node_sync end) |+ flag "async" (fun t -> t#async) |+ flag "force" (fun t -> t#force) |+ opt_field "chain" Chain_services.chain_arg (fun t -> t#chain) + |+ flag "force_node_sync" (fun t -> t#force_node_sync) |> seal (* If [private_] is set, the [private/injection/operation] path is used, @@ -164,7 +170,8 @@ module S = struct operation to be (pre-)validated before returning. However, if ?async \ is true, the function returns immediately. The optional ?chain \ parameter can be used to specify whether to inject on the test chain \ - or the main chain." + or the main chain. If [force_node_sync] is [true], this endpoint will \ + return an error if the node is out of sync from the network." ~query:operation_query ~input:bytes ~output:Operation_hash.encoding @@ -189,7 +196,9 @@ module S = struct error is followed by markers for each operation: \ \"injection_operation_succeed\" for success and \ \"injection_operation_error\" for failure (followed by the errors \ - specific to this injection)." + specific to this injection). If [force_node_sync] is [true], this \ + endpoint will return an error if the node is out of sync from the \ + network." ~query:operations_query ~input:(list (dynamic_size bytes)) ~output:(list Operation_hash.encoding) @@ -236,7 +245,8 @@ let block ctxt ?(async = false) ?(force = false) ?chain raw operations = end) (raw, operations) -let operation_rpc ctxt ~private_rpc ?(async = false) ?chain operation = +let operation_rpc ctxt ~private_rpc ?(async = false) ?(force_node_sync = false) + ?chain operation = make_call (if private_rpc then S.private_operation else S.operation) ctxt @@ -245,14 +255,16 @@ let operation_rpc ctxt ~private_rpc ?(async = false) ?chain operation = method async = async method chain = chain + + method force_node_sync = force_node_sync end) operation -let private_operation ctxt ?async ?chain operation = - operation_rpc ctxt ~private_rpc:true ?async ?chain operation +let private_operation ctxt ?async ?chain ?force_node_sync operation = + operation_rpc ctxt ~private_rpc:true ?async ?chain ?force_node_sync operation -let private_operations ctxt ?(async = false) ?(force = false) ?chain operations - = +let private_operations ctxt ?(async = false) ?(force = false) ?chain + ?(force_node_sync = false) operations = make_call S.private_operations ctxt @@ -263,11 +275,13 @@ let private_operations ctxt ?(async = false) ?(force = false) ?chain operations method chain = chain method force = force + + method force_node_sync = force_node_sync end) operations -let operation ctxt ?async ?chain operation = - operation_rpc ctxt ~private_rpc:false ?async ?chain operation +let operation ctxt ?async ?chain ?force_node_sync operation = + operation_rpc ctxt ~private_rpc:false ?async ?chain ?force_node_sync operation let protocol ctxt ?(async = false) protocol = make_call diff --git a/src/lib_shell_services/injection_services.mli b/src/lib_shell_services/injection_services.mli index 532acb97fe6eed5fc9947eedd9edeed14f153a78..940dffa18d272a7bd96b50b0dff0d87d7cf13e26 100644 --- a/src/lib_shell_services/injection_services.mli +++ b/src/lib_shell_services/injection_services.mli @@ -61,6 +61,7 @@ val operation : #simple -> ?async:bool -> ?chain:Chain_services.chain -> + ?force_node_sync:bool -> Bytes.t -> Operation_hash.t tzresult Lwt.t @@ -68,6 +69,7 @@ val private_operation : #simple -> ?async:bool -> ?chain:Chain_services.chain -> + ?force_node_sync:bool -> Bytes.t -> Operation_hash.t tzresult Lwt.t @@ -80,6 +82,7 @@ val private_operations : ?async:bool -> ?force:bool -> ?chain:Chain_services.chain -> + ?force_node_sync:bool -> Bytes.t list -> Operation_hash.t list tzresult Lwt.t @@ -100,7 +103,9 @@ module S : sig ( [`POST], unit, unit, - < async : bool ; chain : Chain_services.chain option >, + < async : bool + ; chain : Chain_services.chain option + ; force_node_sync : bool >, Bytes.t, Operation_hash.t ) Tezos_rpc.Service.t @@ -109,7 +114,9 @@ module S : sig ( [`POST], unit, unit, - < async : bool ; chain : Chain_services.chain option >, + < async : bool + ; chain : Chain_services.chain option + ; force_node_sync : bool >, Bytes.t, Operation_hash.t ) Tezos_rpc.Service.t @@ -118,7 +125,10 @@ module S : sig ( [`POST], unit, unit, - < async : bool ; force : bool ; chain : Chain_services.chain option >, + < async : bool + ; force : bool + ; chain : Chain_services.chain option + ; force_node_sync : bool >, Bytes.t list, Operation_hash.t list ) Tezos_rpc.Service.t diff --git a/tezt/lib_tezos/RPC.ml b/tezt/lib_tezos/RPC.ml index 7aa709ab37a93ffb7789cab3811f1dcddded74f8..7ab6c7bf9a61140d261bf7c638ead1e4eeb61455 100644 --- a/tezt/lib_tezos/RPC.ml +++ b/tezt/lib_tezos/RPC.ml @@ -229,19 +229,23 @@ let delete_network_greylist = make DELETE ["network"; "greylist"] Fun.id let get_version = make GET ["version"] Fun.id -let post_injection_operation ?(async = false) data = +let post_injection_operation ?(async = false) ?force_node_sync data = make POST ["injection"; "operation"] - ~query_string:(Query_arg.switch "async" async) + ~query_string: + Query_arg.( + switch "async" async @ opt_bool "force_node_sync" force_node_sync) ~data Fun.id -let post_private_injection_operation ?(async = false) data = +let post_private_injection_operation ?(async = false) ?force_node_sync data = make POST ["private"; "injection"; "operation"] - ~query_string:(Query_arg.switch "async" async) + ~query_string: + Query_arg.( + switch "async" async @ opt_bool "force_node_sync" force_node_sync) ~data Fun.id diff --git a/tezt/lib_tezos/RPC.mli b/tezt/lib_tezos/RPC.mli index d9eb0f5281ae9cefe450714c33aa1fe57933ca67..18b47d0120480f29201707c54f637ec10c98f5f8 100644 --- a/tezt/lib_tezos/RPC.mli +++ b/tezt/lib_tezos/RPC.mli @@ -233,10 +233,12 @@ val post_private_injection_operations : [`OpHash of string] list t (** RPC: [POST /injection/operation] *) -val post_injection_operation : ?async:bool -> data -> JSON.t t +val post_injection_operation : + ?async:bool -> ?force_node_sync:bool -> data -> JSON.t t (** RPC: [POST /private/injection/operation] *) -val post_private_injection_operation : ?async:bool -> data -> JSON.t t +val post_private_injection_operation : + ?async:bool -> ?force_node_sync:bool -> data -> JSON.t t (** RPC: [POST /chains//blocks//helpers/scripts/run_operation] diff --git a/tezt/lib_tezos/client.ml b/tezt/lib_tezos/client.ml index b3a6c847d0b0b78f14d12303feb68e2544ef103e..547acb0a75b374f4566c06245e187aa6c106be97 100644 --- a/tezt/lib_tezos/client.ml +++ b/tezt/lib_tezos/client.ml @@ -881,7 +881,7 @@ let bake_for_and_wait ?endpoint ?protocol ?keys ?minimal_fees in unit -let bake_until_level ~target_level ?keys ?node client = +let bake_until_level ?minimal_timestamp ~target_level ?keys ?node client = Log.info "Bake until level %d." target_level ; let node = match node with @@ -896,7 +896,7 @@ let bake_until_level ~target_level ?keys ?node client = Test.fail "bake_until_level(%d): already at level %d" target_level current ; let* () = repeat (target_level - current) (fun () -> - bake_for_and_wait ?keys ~node client) + bake_for_and_wait ?minimal_timestamp ?keys ~node client) in let* final_level = Node.get_level node in Check.((final_level = target_level) ~__LOC__ int) diff --git a/tezt/lib_tezos/client.mli b/tezt/lib_tezos/client.mli index cb7b01b1fe790596a906fd09e4b6bbcea4d610cd..dd03afac6705cba84e9472a8fc7cc72bbaf882d0 100644 --- a/tezt/lib_tezos/client.mli +++ b/tezt/lib_tezos/client.mli @@ -650,7 +650,12 @@ val spawn_bake_for : @param node See {!bake_for_and_wait}. *) val bake_until_level : - target_level:int -> ?keys:string list -> ?node:Node.t -> t -> unit Lwt.t + ?minimal_timestamp:bool -> + target_level:int -> + ?keys:string list -> + ?node:Node.t -> + t -> + unit Lwt.t (** Bake until the node is at [target_cycle], using {!bake_for_and_wait}. This function calls an RPC to know the exact "blocks_per_cycle" value to compute diff --git a/tezt/tests/prevalidator.ml b/tezt/tests/prevalidator.ml index 00c6d63a4c2babdd1c451231f78976bf53a22164..909bdb854d086bf8a8a22a00ebb110124dc55522 100644 --- a/tezt/tests/prevalidator.ml +++ b/tezt/tests/prevalidator.ml @@ -4066,6 +4066,64 @@ let test_request_operations_peer = let* () = wait_mempool in unit +let test_force_sync_node = + let n = 1 in + let blocks_to_bake = 5 in + Protocol.register_test + ~__FILE__ + ~title:"force sync node" + ~tags:[team; "mempool"; "injection"] + @@ fun protocol -> + Log.info "Initialize %d nodes." (n + 1) ; + let main_node = Node.create [Synchronisation_threshold 0; Connections 1] in + let other_node = Node.create [Synchronisation_threshold 1; Connections 1] in + let nodes = [main_node; other_node] in + let synchronisation_events = + List.map + (fun node -> Node.wait_for_synchronisation ~statuses:["synced"] node) + nodes + in + Cluster.clique nodes ; + let* () = Cluster.start nodes in + let* client = Client.init ~endpoint:(Node main_node) () in + + Log.info "Activate protocol." ; + let* () = Client.activate_protocol ~protocol client in + + Log.info "Bake %d blocks." blocks_to_bake ; + let* () = + repeat blocks_to_bake (fun () -> + Client.bake_for_and_wait ~minimal_timestamp:true client) + in + + Log.info "Wait for nodes to be synced" ; + let* () = Lwt.join synchronisation_events in + + Log.info "Inject operation" ; + let* _response = + Operation.inject_transfer + ~amount:1 + ~source:Constant.bootstrap1 + ~dest:Constant.bootstrap2 + client + in + let* () = Lwt_list.iter_s Node.terminate nodes in + unit + +(* Log.info "Setup a node that is far into the future" ; + let* leader_node, leader_client = Client.init_with_node `Client () in + let target_cycle = 3 in + let* () = Client.activate_protocol_and_wait ~protocol leader_client in + let* () = + Client.bake_until_cycle ~target_cycle ~node:leader_node leader_client + in + + Log.info "Setup unsynchronized node" ; + let node = Node.create [] in + + let* () = Node.terminate leader_node in + unit *) + let register ~protocols = Revamped.flush_mempool protocols ; Revamped.recycling_branch_refused protocols ; @@ -4101,4 +4159,5 @@ let register ~protocols = test_do_not_reclassify protocols ; force_operation_injection protocols ; injecting_old_operation_fails protocols ; - test_request_operations_peer protocols + test_request_operations_peer protocols ; + test_force_sync_node protocols