From b45201e1770c5116bd01b0d46b4a5757d89e3759 Mon Sep 17 00:00:00 2001 From: vbot Date: Mon, 20 Feb 2023 10:42:17 +0100 Subject: [PATCH 1/4] Alpha/Baker: add missing mockup RPC --- .../test/mockup_simulator/faked_services.ml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/proto_alpha/lib_delegate/test/mockup_simulator/faked_services.ml b/src/proto_alpha/lib_delegate/test/mockup_simulator/faked_services.ml index e99d009738bf..b2770d9bdd69 100644 --- a/src/proto_alpha/lib_delegate/test/mockup_simulator/faked_services.ml +++ b/src/proto_alpha/lib_delegate/test/mockup_simulator/faked_services.ml @@ -234,6 +234,17 @@ module Make (Hooks : Mocked_services_hooks) = struct | None -> failwith "faked_services.inject_operation: can't deserialize" | Some operation -> Hooks.inject_operation operation) + let inject_private_operation = + Directory.register + Directory.empty + Injection_services.S.private_operation + (fun () _chain bytes -> + match Data_encoding.Binary.of_bytes_opt Operation.encoding bytes with + | None -> + failwith + "faked_services.inject_private_operation: can't deserialize" + | Some operation -> Hooks.inject_operation operation) + let broadcast_block = Directory.register Directory.empty @@ -323,6 +334,7 @@ module Make (Hooks : Mocked_services_hooks) = struct chain chain_id; inject_block; inject_operation; + inject_private_operation; monitor_operations; list_blocks; live_blocks; -- GitLab From dd1daf78d436ffc10331fd2260523c646f6feb80 Mon Sep 17 00:00:00 2001 From: vbot Date: Wed, 8 Feb 2023 15:14:10 +0100 Subject: [PATCH 2/4] Alpha/Baker: make new head close the mockup-ed operation stream --- .../test/mockup_simulator/mockup_simulator.ml | 59 ++++++++----------- 1 file changed, 24 insertions(+), 35 deletions(-) diff --git a/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml b/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml index 82ae1d67c295..cb9ff42c2250 100644 --- a/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml +++ b/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml @@ -67,8 +67,10 @@ type state = { heads_pipe : (Block_hash.t * Block_header.t) Lwt_pipe.Unbounded.t; (** [heads_pipe] is used to implement the [monitor_heads] RPC. *) - operations_pipe : - (Operation_hash.t * Mockup.M.Protocol.operation) option Lwt_pipe.Unbounded.t; + mutable operations_stream : + (Operation_hash.t * Mockup.M.Protocol.operation) list Lwt_stream.t; + mutable operations_stream_push : + (Operation_hash.t * Mockup.M.Protocol.operation) list option -> unit; (** [operations_pipe] is used to implement the [operations_pipe] RPC. *) mutable streaming_operations : bool; (** A helper flag used to implement the monitor operations RPC. *) @@ -481,23 +483,18 @@ let make_mocked_services_hooks (state : state) (user_hooks : (module Hooks)) : let streamed = ref false in state.streaming_operations <- true ; let next () = - let rec pop_until_ok () = - Lwt_pipe.Unbounded.pop state.operations_pipe >>= function + let rec loop () = + Lwt_stream.get state.operations_stream >>= function | None when !streamed -> Lwt.return None | None -> streamed := true ; Lwt.return (Some []) - | Some op -> ( - User_hooks.on_new_operation op >>= function - | None when !streamed -> pop_until_ok () - | None -> - streamed := true ; - Lwt.return (Some []) - | Some (oph, op) -> - streamed := true ; - Lwt.return (Some [((oph, op), None)])) + | Some ops -> ( + List.filter_map_s User_hooks.on_new_operation ops >>= function + | [] -> loop () + | l -> Lwt.return_some (List.map (fun x -> (x, None)) l)) in - pop_until_ok () + loop () in let shutdown () = () in Tezos_rpc.Answer.{next; shutdown} @@ -745,22 +742,15 @@ let rec process_block state block_hash (block_header : Block_header.t) then ( state.chain <- new_chain ; clear_mempool state >>=? fun () -> - (* The head has changed, the messages in the operations pipe are no - good anymore. *) - ignore (Lwt_pipe.Unbounded.pop_all_now state.operations_pipe) ; - (if state.streaming_operations then ( - state.streaming_operations <- false ; - Lwt_pipe.Unbounded.push state.operations_pipe None ; - Lwt.return ()) - else Lwt.return ()) - >>= fun () -> - (* Put back in the pipe operations that are still alive. *) - List.iter_s - (fun op -> - Lwt_pipe.Unbounded.push state.operations_pipe (Some op) ; - Lwt.return ()) - state.mempool - >>= fun () -> return_unit) + (* The head changed: notify that the stream ended. *) + state.operations_stream_push None ; + state.streaming_operations <- false ; + (* Instanciate a new stream *) + let operations_stream, operations_stream_push = Lwt_stream.create () in + state.operations_stream <- operations_stream ; + state.operations_stream_push <- operations_stream_push ; + state.operations_stream_push (Some state.mempool) ; + return_unit) else return_unit (** This process listens to broadcast block and operations and incorporates @@ -770,9 +760,7 @@ let rec listener ~(user_hooks : (module Hooks)) ~state ~broadcast_pipe = Lwt_pipe.Unbounded.pop broadcast_pipe >>= function | Broadcast_op (operation_hash, packed_operation) -> state.mempool <- (operation_hash, packed_operation) :: state.mempool ; - Lwt_pipe.Unbounded.push - state.operations_pipe - (Some (operation_hash, packed_operation)) ; + state.operations_stream_push (Some [(operation_hash, packed_operation)]) ; User_hooks.check_mempool_after_processing ~mempool:state.mempool >>=? fun () -> listener ~user_hooks ~state ~broadcast_pipe | Broadcast_block (block_hash, block_header, operations) -> @@ -813,7 +801,7 @@ let create_fake_node_state ~i ~live_depth let chain0 = [genesis0] in let validated_blocks_pipe = Lwt_pipe.Unbounded.create () in let heads_pipe = Lwt_pipe.Unbounded.create () in - let operations_pipe = Lwt_pipe.Unbounded.create () in + let operations_stream, operations_stream_push = Lwt_stream.create () in let genesis_block_true_hash = Block_header.hash { @@ -849,7 +837,8 @@ let create_fake_node_state ~i ~live_depth ]); validated_blocks_pipe; heads_pipe; - operations_pipe; + operations_stream; + operations_stream_push; streaming_operations = false; broadcast_pipes; genesis_block_true_hash; -- GitLab From 106b047c106f99bb253c0e9b9df25ea9f96eaf21 Mon Sep 17 00:00:00 2001 From: vbot Date: Mon, 20 Feb 2023 11:15:25 +0100 Subject: [PATCH 3/4] Alpha/Baker: do not propagate already handled op in tests --- .../test/mockup_simulator/mockup_simulator.ml | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml b/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml index cb9ff42c2250..01f139d8f7ba 100644 --- a/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml +++ b/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml @@ -759,9 +759,13 @@ let rec listener ~(user_hooks : (module Hooks)) ~state ~broadcast_pipe = let module User_hooks = (val user_hooks : Hooks) in Lwt_pipe.Unbounded.pop broadcast_pipe >>= function | Broadcast_op (operation_hash, packed_operation) -> - state.mempool <- (operation_hash, packed_operation) :: state.mempool ; - state.operations_stream_push (Some [(operation_hash, packed_operation)]) ; - User_hooks.check_mempool_after_processing ~mempool:state.mempool + (if + List.mem_assoc ~equal:Operation_hash.equal operation_hash state.mempool + then return_unit + else ( + state.mempool <- (operation_hash, packed_operation) :: state.mempool ; + state.operations_stream_push (Some [(operation_hash, packed_operation)]) ; + User_hooks.check_mempool_after_processing ~mempool:state.mempool)) >>=? fun () -> listener ~user_hooks ~state ~broadcast_pipe | Broadcast_block (block_hash, block_header, operations) -> get_block_level block_header >>=? fun level -> -- GitLab From 18e0a37bca211124157c4a69578fd0bede2f6895 Mon Sep 17 00:00:00 2001 From: Albin Coquereau Date: Mon, 20 Feb 2023 11:28:50 +0100 Subject: [PATCH 4/4] Mumbai/baker: backport mockup fixes --- .../test/mockup_simulator/faked_services.ml | 12 ++++ .../test/mockup_simulator/mockup_simulator.ml | 67 +++++++++---------- 2 files changed, 42 insertions(+), 37 deletions(-) diff --git a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/faked_services.ml b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/faked_services.ml index e99d009738bf..b2770d9bdd69 100644 --- a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/faked_services.ml +++ b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/faked_services.ml @@ -234,6 +234,17 @@ module Make (Hooks : Mocked_services_hooks) = struct | None -> failwith "faked_services.inject_operation: can't deserialize" | Some operation -> Hooks.inject_operation operation) + let inject_private_operation = + Directory.register + Directory.empty + Injection_services.S.private_operation + (fun () _chain bytes -> + match Data_encoding.Binary.of_bytes_opt Operation.encoding bytes with + | None -> + failwith + "faked_services.inject_private_operation: can't deserialize" + | Some operation -> Hooks.inject_operation operation) + let broadcast_block = Directory.register Directory.empty @@ -323,6 +334,7 @@ module Make (Hooks : Mocked_services_hooks) = struct chain chain_id; inject_block; inject_operation; + inject_private_operation; monitor_operations; list_blocks; live_blocks; diff --git a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.ml b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.ml index 82ae1d67c295..01f139d8f7ba 100644 --- a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.ml +++ b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.ml @@ -67,8 +67,10 @@ type state = { heads_pipe : (Block_hash.t * Block_header.t) Lwt_pipe.Unbounded.t; (** [heads_pipe] is used to implement the [monitor_heads] RPC. *) - operations_pipe : - (Operation_hash.t * Mockup.M.Protocol.operation) option Lwt_pipe.Unbounded.t; + mutable operations_stream : + (Operation_hash.t * Mockup.M.Protocol.operation) list Lwt_stream.t; + mutable operations_stream_push : + (Operation_hash.t * Mockup.M.Protocol.operation) list option -> unit; (** [operations_pipe] is used to implement the [operations_pipe] RPC. *) mutable streaming_operations : bool; (** A helper flag used to implement the monitor operations RPC. *) @@ -481,23 +483,18 @@ let make_mocked_services_hooks (state : state) (user_hooks : (module Hooks)) : let streamed = ref false in state.streaming_operations <- true ; let next () = - let rec pop_until_ok () = - Lwt_pipe.Unbounded.pop state.operations_pipe >>= function + let rec loop () = + Lwt_stream.get state.operations_stream >>= function | None when !streamed -> Lwt.return None | None -> streamed := true ; Lwt.return (Some []) - | Some op -> ( - User_hooks.on_new_operation op >>= function - | None when !streamed -> pop_until_ok () - | None -> - streamed := true ; - Lwt.return (Some []) - | Some (oph, op) -> - streamed := true ; - Lwt.return (Some [((oph, op), None)])) + | Some ops -> ( + List.filter_map_s User_hooks.on_new_operation ops >>= function + | [] -> loop () + | l -> Lwt.return_some (List.map (fun x -> (x, None)) l)) in - pop_until_ok () + loop () in let shutdown () = () in Tezos_rpc.Answer.{next; shutdown} @@ -745,22 +742,15 @@ let rec process_block state block_hash (block_header : Block_header.t) then ( state.chain <- new_chain ; clear_mempool state >>=? fun () -> - (* The head has changed, the messages in the operations pipe are no - good anymore. *) - ignore (Lwt_pipe.Unbounded.pop_all_now state.operations_pipe) ; - (if state.streaming_operations then ( - state.streaming_operations <- false ; - Lwt_pipe.Unbounded.push state.operations_pipe None ; - Lwt.return ()) - else Lwt.return ()) - >>= fun () -> - (* Put back in the pipe operations that are still alive. *) - List.iter_s - (fun op -> - Lwt_pipe.Unbounded.push state.operations_pipe (Some op) ; - Lwt.return ()) - state.mempool - >>= fun () -> return_unit) + (* The head changed: notify that the stream ended. *) + state.operations_stream_push None ; + state.streaming_operations <- false ; + (* Instanciate a new stream *) + let operations_stream, operations_stream_push = Lwt_stream.create () in + state.operations_stream <- operations_stream ; + state.operations_stream_push <- operations_stream_push ; + state.operations_stream_push (Some state.mempool) ; + return_unit) else return_unit (** This process listens to broadcast block and operations and incorporates @@ -769,11 +759,13 @@ let rec listener ~(user_hooks : (module Hooks)) ~state ~broadcast_pipe = let module User_hooks = (val user_hooks : Hooks) in Lwt_pipe.Unbounded.pop broadcast_pipe >>= function | Broadcast_op (operation_hash, packed_operation) -> - state.mempool <- (operation_hash, packed_operation) :: state.mempool ; - Lwt_pipe.Unbounded.push - state.operations_pipe - (Some (operation_hash, packed_operation)) ; - User_hooks.check_mempool_after_processing ~mempool:state.mempool + (if + List.mem_assoc ~equal:Operation_hash.equal operation_hash state.mempool + then return_unit + else ( + state.mempool <- (operation_hash, packed_operation) :: state.mempool ; + state.operations_stream_push (Some [(operation_hash, packed_operation)]) ; + User_hooks.check_mempool_after_processing ~mempool:state.mempool)) >>=? fun () -> listener ~user_hooks ~state ~broadcast_pipe | Broadcast_block (block_hash, block_header, operations) -> get_block_level block_header >>=? fun level -> @@ -813,7 +805,7 @@ let create_fake_node_state ~i ~live_depth let chain0 = [genesis0] in let validated_blocks_pipe = Lwt_pipe.Unbounded.create () in let heads_pipe = Lwt_pipe.Unbounded.create () in - let operations_pipe = Lwt_pipe.Unbounded.create () in + let operations_stream, operations_stream_push = Lwt_stream.create () in let genesis_block_true_hash = Block_header.hash { @@ -849,7 +841,8 @@ let create_fake_node_state ~i ~live_depth ]); validated_blocks_pipe; heads_pipe; - operations_pipe; + operations_stream; + operations_stream_push; streaming_operations = false; broadcast_pipes; genesis_block_true_hash; -- GitLab