diff --git a/devtools/testnet_experiment_tools/sigs.ml b/devtools/testnet_experiment_tools/sigs.ml index 83b17a9bfc9380d76ea8812edd9a43dcd156cf10..44fcdc0fba8921de04e52622527bdd6e9b2b97e7 100644 --- a/devtools/testnet_experiment_tools/sigs.ml +++ b/devtools/testnet_experiment_tools/sigs.ml @@ -25,7 +25,10 @@ module type PROTO_TOOL = sig val start_injector : - Client_context.full -> operations_file_path:string -> unit tzresult Lwt.t + Client_context.full -> + op_per_mempool:int -> + operations_file_path:string -> + unit tzresult Lwt.t val sync_node : Client_context.full -> diff --git a/devtools/testnet_experiment_tools/simulation_scenario.ml b/devtools/testnet_experiment_tools/simulation_scenario.ml index 9bab7c3bbe034635c9d0c0ee6b71b63a5751f44f..55612b7df5d51fe6f43fff036fe1dd57f9f830b4 100644 --- a/devtools/testnet_experiment_tools/simulation_scenario.ml +++ b/devtools/testnet_experiment_tools/simulation_scenario.ml @@ -86,6 +86,21 @@ let round_duration_arg = | Some i when i > 0 -> return i | _ -> failwith "Parameter should be a positive integer literal" ) +let op_per_mempool_arg = + let open Lwt_result_syntax in + default_arg + ~doc: + "Number of operations that the injector will try to maintain in the \ + mempool at all time." + ~short:'n' + ~long:"op-per-mempool" + ~default:"2500" + ~placeholder:"integer" + ( parameter @@ fun _ s -> + match int_of_string_opt s with + | Some i when i > 0 -> return i + | _ -> failwith "Parameter should be a positive integer literal" ) + let pp_spaced_int ppf i = let s = Format.sprintf "%d" i |> String.to_seq |> List.of_seq |> List.rev in List.fold_left @@ -238,10 +253,11 @@ let sync_node (cctxt : Client_context.full) round_duration_target = let*! () = cctxt#message "Synchronizing the node to a low round time." in Tool.sync_node cctxt ?round_duration_target () -let run_injector (cctxt : Client_context.full) ~operations_file_path = +let run_injector (cctxt : Client_context.full) ~op_per_mempool + ~operations_file_path = let open Lwt_result_syntax in let* (module Tool) = find_proto_tool cctxt in - Tool.start_injector cctxt ~operations_file_path + Tool.start_injector cctxt ~op_per_mempool ~operations_file_path let commands = let open Lwt_result_syntax in @@ -291,11 +307,11 @@ let commands = ~desc: "Run a simulation scenario on a yes-node with the given operation file \ by injecting operations in the node's mempool. This tool will try to \ - target 1000 manager operations present in the mempool at all time." - no_options + target 2500 manager operations present in the mempool at all time." + (args1 op_per_mempool_arg) (prefixes ["run"; "simulation"] @@ operations_file_param @@ stop) - (fun () operations_file_path (cctxt : Client_context.full) -> - let* () = run_injector cctxt ~operations_file_path in + (fun op_per_mempool operations_file_path (cctxt : Client_context.full) -> + let* () = run_injector cctxt ~op_per_mempool ~operations_file_path in return_unit); ] diff --git a/devtools/testnet_experiment_tools/tool_017_PtNairob.ml b/devtools/testnet_experiment_tools/tool_017_PtNairob.ml index 842848a2dc0993359936d795afe7ef8c4735a745..099a4f994cd64acab485e33d74c4d3decc104ff4 100644 --- a/devtools/testnet_experiment_tools/tool_017_PtNairob.ml +++ b/devtools/testnet_experiment_tools/tool_017_PtNairob.ml @@ -363,8 +363,6 @@ type injected_operation = { modified_hash : Operation_hash.t; } -let op_per_mempool = 1000 - type t = { last_injected_op_per_manager : injected_operation ManagerMap.t; operation_queues : (Operation_hash.t * packed_operation) Queue.t ManagerMap.t; @@ -373,7 +371,7 @@ type t = { let pp_state fmt {last_injected_op_per_manager; operation_queues} = Format.fprintf fmt - "%d injected operations pending, %d queues left" + "%d injected operations pending, %d manager queues left" (ManagerMap.cardinal last_injected_op_per_manager) (ManagerMap.cardinal operation_queues) @@ -381,7 +379,7 @@ let pp_initial_state fmt {operation_queues; _} = Format.( fprintf fmt - "@[%d queues:@ %a@]@." + "@[%d manager queues:@ %a@]@." (ManagerMap.cardinal operation_queues) (pp_print_list ~pp_sep:pp_print_cut (fun fmt (manager, queue) -> Format.fprintf @@ -565,59 +563,94 @@ let choose_and_inject_operations cctxt state prohibited_managers n = | End -> return (!cpt, !errors, !updated_state) | exn -> Lwt.fail exn) in Format.printf - "%d new manager operations injected, %d errorneous operation queues \ + "%d new manager operations injected, %d erroneous operation manager queues \ discarded@." nb_injected nb_erroneous ; return new_state -let start_injector cctxt ~operations_file_path = +let start_injector cctxt ~op_per_mempool ~operations_file_path = let* state = init ~operations_file_path in Format.printf "Starting injector@." ; - let* applied_stream, _stopper = Monitor_services.applied_blocks cctxt () in - let*! current_head_opt = Lwt_stream.get applied_stream in - let ((_chain, _bh, header, _opll) as _current_head) = + let* head_stream, _stopper = Monitor_services.heads cctxt `Main in + let block_stream = + Lwt_stream.map_s + (fun (bh, header) -> + let*! opl = + Protocol_client_context.Alpha_block_services.Operations + .operations_in_pass + cctxt + ~metadata:`Always + ~block:(`Hash (bh, 0)) + Operation_repr.manager_pass + in + let opl = WithExceptions.Result.get_ok ~loc:__LOC__ opl in + Lwt.return (header, opl)) + head_stream + in + let*! current_head_opt = Lwt_stream.get block_stream in + let ((header, _mopl) as _current_head) = WithExceptions.Option.get ~loc:__LOC__ current_head_opt in let current_level = header.shell.level in let rec loop state current_level = - let*! r = Lwt_stream.get applied_stream in + let*! r = Lwt_stream.get block_stream in match r with | None -> failwith "Head stream ended: lost connection with node?" - | Some (_chain, _bh, header, _opll) + | Some (header, _opll) when Compare.Int32.(header.shell.level <= current_level) -> (* reorg *) + Format.printf "New head with non-increasing level: ignoring@." ; loop state current_level - | Some (_chain, _bh, _header, opll) as _new_head -> - let included_manager_hashes = - Stdlib.List.nth opll Operation_repr.manager_pass - |> List.map Tezos_base.Operation.hash - |> Operation_hash.Set.of_list + | Some (_header, mopl) as _new_head -> + Format.printf + "New increasing head received with %d included operations@." + (List.length mopl) ; + let* mempool = + Protocol_client_context.Alpha_block_services.Mempool + .pending_operations + cctxt + ~validated:true + ~refused:false + ~outdated:false + ~branch_refused:false + ~branch_delayed:false + ~validation_passes:[Operation_repr.manager_pass] + () + in + let live_operations = + Operation_hash.Set.( + union + (of_list + (List.map + fst + (Operation_hash.Map.bindings mempool.unprocessed))) + (of_list (List.map fst mempool.validated))) in - let last_injected_op_per_manager = state.last_injected_op_per_manager in + Format.printf + "%d manager operations still live in the mempool@." + (Operation_hash.Set.cardinal live_operations) ; let new_last_injected, prohibited_managers = + let last_injected_op_per_manager = + state.last_injected_op_per_manager + in ManagerMap.fold (fun manager {modified_hash; _} (new_last_injected, acc) -> - if Operation_hash.Set.mem modified_hash included_manager_hashes - then (ManagerMap.remove manager new_last_injected, acc) - else (new_last_injected, ManagerSet.add manager acc)) + if Operation_hash.Set.mem modified_hash live_operations then + (new_last_injected, ManagerSet.add manager acc) + else (ManagerMap.remove manager new_last_injected, acc)) last_injected_op_per_manager (last_injected_op_per_manager, ManagerSet.empty) in let state = {state with last_injected_op_per_manager = new_last_injected} in - let nb_included_operations = - Operation_hash.Set.cardinal included_manager_hashes - in let nb_missing_operations = - if nb_included_operations = 0 then op_per_mempool - else min op_per_mempool nb_included_operations + op_per_mempool + - ManagerMap.cardinal state.last_injected_op_per_manager in Format.printf - "New increasing head received with %d manager operations: injecting \ - %d new operations...@." - nb_included_operations + "Injecting %d new manager operations...@." nb_missing_operations ; let* state = choose_and_inject_operations diff --git a/devtools/testnet_experiment_tools/tool_018_Proxford.ml b/devtools/testnet_experiment_tools/tool_018_Proxford.ml index 49404c4a6444e298b2cc1c78c29c639840b34cbc..fb5888a6bed5aea889c711ed109c5a7566cb7b82 100644 --- a/devtools/testnet_experiment_tools/tool_018_Proxford.ml +++ b/devtools/testnet_experiment_tools/tool_018_Proxford.ml @@ -363,8 +363,6 @@ type injected_operation = { modified_hash : Operation_hash.t; } -let op_per_mempool = 1000 - type t = { last_injected_op_per_manager : injected_operation ManagerMap.t; operation_queues : (Operation_hash.t * packed_operation) Queue.t ManagerMap.t; @@ -373,7 +371,7 @@ type t = { let pp_state fmt {last_injected_op_per_manager; operation_queues} = Format.fprintf fmt - "%d injected operations pending, %d queues left" + "%d injected operations pending, %d manager queues left" (ManagerMap.cardinal last_injected_op_per_manager) (ManagerMap.cardinal operation_queues) @@ -381,7 +379,7 @@ let pp_initial_state fmt {operation_queues; _} = Format.( fprintf fmt - "@[%d queues:@ %a@]@." + "@[%d manager queues:@ %a@]@." (ManagerMap.cardinal operation_queues) (pp_print_list ~pp_sep:pp_print_cut (fun fmt (manager, queue) -> Format.fprintf @@ -565,59 +563,94 @@ let choose_and_inject_operations cctxt state prohibited_managers n = | End -> return (!cpt, !errors, !updated_state) | exn -> Lwt.fail exn) in Format.printf - "%d new manager operations injected, %d errorneous operation queues \ + "%d new manager operations injected, %d erroneous operation manager queues \ discarded@." nb_injected nb_erroneous ; return new_state -let start_injector cctxt ~operations_file_path = +let start_injector cctxt ~op_per_mempool ~operations_file_path = let* state = init ~operations_file_path in Format.printf "Starting injector@." ; - let* applied_stream, _stopper = Monitor_services.applied_blocks cctxt () in - let*! current_head_opt = Lwt_stream.get applied_stream in - let ((_chain, _bh, header, _opll) as _current_head) = + let* head_stream, _stopper = Monitor_services.heads cctxt `Main in + let block_stream = + Lwt_stream.map_s + (fun (bh, header) -> + let*! opl = + Protocol_client_context.Alpha_block_services.Operations + .operations_in_pass + cctxt + ~metadata:`Always + ~block:(`Hash (bh, 0)) + Operation_repr.manager_pass + in + let opl = WithExceptions.Result.get_ok ~loc:__LOC__ opl in + Lwt.return (header, opl)) + head_stream + in + let*! current_head_opt = Lwt_stream.get block_stream in + let ((header, _mopl) as _current_head) = WithExceptions.Option.get ~loc:__LOC__ current_head_opt in let current_level = header.shell.level in let rec loop state current_level = - let*! r = Lwt_stream.get applied_stream in + let*! r = Lwt_stream.get block_stream in match r with | None -> failwith "Head stream ended: lost connection with node?" - | Some (_chain, _bh, header, _opll) + | Some (header, _opll) when Compare.Int32.(header.shell.level <= current_level) -> (* reorg *) + Format.printf "New head with non-increasing level: ignoring@." ; loop state current_level - | Some (_chain, _bh, _header, opll) as _new_head -> - let included_manager_hashes = - Stdlib.List.nth opll Operation_repr.manager_pass - |> List.map Tezos_base.Operation.hash - |> Operation_hash.Set.of_list + | Some (_header, mopl) as _new_head -> + Format.printf + "New increasing head received with %d included operations@." + (List.length mopl) ; + let* mempool = + Protocol_client_context.Alpha_block_services.Mempool + .pending_operations + cctxt + ~validated:true + ~refused:false + ~outdated:false + ~branch_refused:false + ~branch_delayed:false + ~validation_passes:[Operation_repr.manager_pass] + () + in + let live_operations = + Operation_hash.Set.( + union + (of_list + (List.map + fst + (Operation_hash.Map.bindings mempool.unprocessed))) + (of_list (List.map fst mempool.validated))) in - let last_injected_op_per_manager = state.last_injected_op_per_manager in + Format.printf + "%d manager operations still live in the mempool@." + (Operation_hash.Set.cardinal live_operations) ; let new_last_injected, prohibited_managers = + let last_injected_op_per_manager = + state.last_injected_op_per_manager + in ManagerMap.fold (fun manager {modified_hash; _} (new_last_injected, acc) -> - if Operation_hash.Set.mem modified_hash included_manager_hashes - then (ManagerMap.remove manager new_last_injected, acc) - else (new_last_injected, ManagerSet.add manager acc)) + if Operation_hash.Set.mem modified_hash live_operations then + (new_last_injected, ManagerSet.add manager acc) + else (ManagerMap.remove manager new_last_injected, acc)) last_injected_op_per_manager (last_injected_op_per_manager, ManagerSet.empty) in let state = {state with last_injected_op_per_manager = new_last_injected} in - let nb_included_operations = - Operation_hash.Set.cardinal included_manager_hashes - in let nb_missing_operations = - if nb_included_operations = 0 then op_per_mempool - else min op_per_mempool nb_included_operations + op_per_mempool + - ManagerMap.cardinal state.last_injected_op_per_manager in Format.printf - "New increasing head received with %d manager operations: injecting \ - %d new operations...@." - nb_included_operations + "Injecting %d new manager operations...@." nb_missing_operations ; let* state = choose_and_inject_operations diff --git a/devtools/testnet_experiment_tools/tool_alpha.ml b/devtools/testnet_experiment_tools/tool_alpha.ml index ca5c989d83d89105f98b729ae02b2ab89b81fec4..51e4c1bb2b16092edc7c7c1296d0e8e77aee7a5b 100644 --- a/devtools/testnet_experiment_tools/tool_alpha.ml +++ b/devtools/testnet_experiment_tools/tool_alpha.ml @@ -363,8 +363,6 @@ type injected_operation = { modified_hash : Operation_hash.t; } -let op_per_mempool = 1000 - type t = { last_injected_op_per_manager : injected_operation ManagerMap.t; operation_queues : (Operation_hash.t * packed_operation) Queue.t ManagerMap.t; @@ -373,7 +371,7 @@ type t = { let pp_state fmt {last_injected_op_per_manager; operation_queues} = Format.fprintf fmt - "%d injected operations pending, %d queues left" + "%d injected operations pending, %d manager queues left" (ManagerMap.cardinal last_injected_op_per_manager) (ManagerMap.cardinal operation_queues) @@ -381,7 +379,7 @@ let pp_initial_state fmt {operation_queues; _} = Format.( fprintf fmt - "@[%d queues:@ %a@]@." + "@[%d manager queues:@ %a@]@." (ManagerMap.cardinal operation_queues) (pp_print_list ~pp_sep:pp_print_cut (fun fmt (manager, queue) -> Format.fprintf @@ -565,59 +563,94 @@ let choose_and_inject_operations cctxt state prohibited_managers n = | End -> return (!cpt, !errors, !updated_state) | exn -> Lwt.fail exn) in Format.printf - "%d new manager operations injected, %d errorneous operation queues \ + "%d new manager operations injected, %d erroneous operation manager queues \ discarded@." nb_injected nb_erroneous ; return new_state -let start_injector cctxt ~operations_file_path = +let start_injector cctxt ~op_per_mempool ~operations_file_path = let* state = init ~operations_file_path in Format.printf "Starting injector@." ; - let* applied_stream, _stopper = Monitor_services.applied_blocks cctxt () in - let*! current_head_opt = Lwt_stream.get applied_stream in - let ((_chain, _bh, header, _opll) as _current_head) = + let* head_stream, _stopper = Monitor_services.heads cctxt `Main in + let block_stream = + Lwt_stream.map_s + (fun (bh, header) -> + let*! opl = + Protocol_client_context.Alpha_block_services.Operations + .operations_in_pass + cctxt + ~metadata:`Always + ~block:(`Hash (bh, 0)) + Operation_repr.manager_pass + in + let opl = WithExceptions.Result.get_ok ~loc:__LOC__ opl in + Lwt.return (header, opl)) + head_stream + in + let*! current_head_opt = Lwt_stream.get block_stream in + let ((header, _mopl) as _current_head) = WithExceptions.Option.get ~loc:__LOC__ current_head_opt in let current_level = header.shell.level in let rec loop state current_level = - let*! r = Lwt_stream.get applied_stream in + let*! r = Lwt_stream.get block_stream in match r with | None -> failwith "Head stream ended: lost connection with node?" - | Some (_chain, _bh, header, _opll) + | Some (header, _opll) when Compare.Int32.(header.shell.level <= current_level) -> (* reorg *) + Format.printf "New head with non-increasing level: ignoring@." ; loop state current_level - | Some (_chain, _bh, _header, opll) as _new_head -> - let included_manager_hashes = - Stdlib.List.nth opll Operation_repr.manager_pass - |> List.map Tezos_base.Operation.hash - |> Operation_hash.Set.of_list + | Some (_header, mopl) as _new_head -> + Format.printf + "New increasing head received with %d included operations@." + (List.length mopl) ; + let* mempool = + Protocol_client_context.Alpha_block_services.Mempool + .pending_operations + cctxt + ~validated:true + ~refused:false + ~outdated:false + ~branch_refused:false + ~branch_delayed:false + ~validation_passes:[Operation_repr.manager_pass] + () + in + let live_operations = + Operation_hash.Set.( + union + (of_list + (List.map + fst + (Operation_hash.Map.bindings mempool.unprocessed))) + (of_list (List.map fst mempool.validated))) in - let last_injected_op_per_manager = state.last_injected_op_per_manager in + Format.printf + "%d manager operations still live in the mempool@." + (Operation_hash.Set.cardinal live_operations) ; let new_last_injected, prohibited_managers = + let last_injected_op_per_manager = + state.last_injected_op_per_manager + in ManagerMap.fold (fun manager {modified_hash; _} (new_last_injected, acc) -> - if Operation_hash.Set.mem modified_hash included_manager_hashes - then (ManagerMap.remove manager new_last_injected, acc) - else (new_last_injected, ManagerSet.add manager acc)) + if Operation_hash.Set.mem modified_hash live_operations then + (new_last_injected, ManagerSet.add manager acc) + else (ManagerMap.remove manager new_last_injected, acc)) last_injected_op_per_manager (last_injected_op_per_manager, ManagerSet.empty) in let state = {state with last_injected_op_per_manager = new_last_injected} in - let nb_included_operations = - Operation_hash.Set.cardinal included_manager_hashes - in let nb_missing_operations = - if nb_included_operations = 0 then op_per_mempool - else min op_per_mempool nb_included_operations + op_per_mempool + - ManagerMap.cardinal state.last_injected_op_per_manager in Format.printf - "New increasing head received with %d manager operations: injecting \ - %d new operations...@." - nb_included_operations + "Injecting %d new manager operations...@." nb_missing_operations ; let* state = choose_and_inject_operations