diff --git a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/faked_services.ml b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/faked_services.ml index e99d009738bfcf541ce909a8707be7f65a39e2c7..b2770d9bdd6903ca4c7ba9588c8eb06ce2312b84 100644 --- a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/faked_services.ml +++ b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/faked_services.ml @@ -234,6 +234,17 @@ module Make (Hooks : Mocked_services_hooks) = struct | None -> failwith "faked_services.inject_operation: can't deserialize" | Some operation -> Hooks.inject_operation operation) + let inject_private_operation = + Directory.register + Directory.empty + Injection_services.S.private_operation + (fun () _chain bytes -> + match Data_encoding.Binary.of_bytes_opt Operation.encoding bytes with + | None -> + failwith + "faked_services.inject_private_operation: can't deserialize" + | Some operation -> Hooks.inject_operation operation) + let broadcast_block = Directory.register Directory.empty @@ -323,6 +334,7 @@ module Make (Hooks : Mocked_services_hooks) = struct chain chain_id; inject_block; inject_operation; + inject_private_operation; monitor_operations; list_blocks; live_blocks; diff --git a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.ml b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.ml index 82ae1d67c2952fe16bf255d39609258e54ab40e8..01f139d8f7bab284994811b365b8268b8b5887f2 100644 --- a/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.ml +++ b/src/proto_016_PtMumbai/lib_delegate/test/mockup_simulator/mockup_simulator.ml @@ -67,8 +67,10 @@ type state = { heads_pipe : (Block_hash.t * Block_header.t) Lwt_pipe.Unbounded.t; (** [heads_pipe] is used to implement the [monitor_heads] RPC. *) - operations_pipe : - (Operation_hash.t * Mockup.M.Protocol.operation) option Lwt_pipe.Unbounded.t; + mutable operations_stream : + (Operation_hash.t * Mockup.M.Protocol.operation) list Lwt_stream.t; + mutable operations_stream_push : + (Operation_hash.t * Mockup.M.Protocol.operation) list option -> unit; (** [operations_pipe] is used to implement the [operations_pipe] RPC. *) mutable streaming_operations : bool; (** A helper flag used to implement the monitor operations RPC. *) @@ -481,23 +483,18 @@ let make_mocked_services_hooks (state : state) (user_hooks : (module Hooks)) : let streamed = ref false in state.streaming_operations <- true ; let next () = - let rec pop_until_ok () = - Lwt_pipe.Unbounded.pop state.operations_pipe >>= function + let rec loop () = + Lwt_stream.get state.operations_stream >>= function | None when !streamed -> Lwt.return None | None -> streamed := true ; Lwt.return (Some []) - | Some op -> ( - User_hooks.on_new_operation op >>= function - | None when !streamed -> pop_until_ok () - | None -> - streamed := true ; - Lwt.return (Some []) - | Some (oph, op) -> - streamed := true ; - Lwt.return (Some [((oph, op), None)])) + | Some ops -> ( + List.filter_map_s User_hooks.on_new_operation ops >>= function + | [] -> loop () + | l -> Lwt.return_some (List.map (fun x -> (x, None)) l)) in - pop_until_ok () + loop () in let shutdown () = () in Tezos_rpc.Answer.{next; shutdown} @@ -745,22 +742,15 @@ let rec process_block state block_hash (block_header : Block_header.t) then ( state.chain <- new_chain ; clear_mempool state >>=? fun () -> - (* The head has changed, the messages in the operations pipe are no - good anymore. *) - ignore (Lwt_pipe.Unbounded.pop_all_now state.operations_pipe) ; - (if state.streaming_operations then ( - state.streaming_operations <- false ; - Lwt_pipe.Unbounded.push state.operations_pipe None ; - Lwt.return ()) - else Lwt.return ()) - >>= fun () -> - (* Put back in the pipe operations that are still alive. *) - List.iter_s - (fun op -> - Lwt_pipe.Unbounded.push state.operations_pipe (Some op) ; - Lwt.return ()) - state.mempool - >>= fun () -> return_unit) + (* The head changed: notify that the stream ended. *) + state.operations_stream_push None ; + state.streaming_operations <- false ; + (* Instanciate a new stream *) + let operations_stream, operations_stream_push = Lwt_stream.create () in + state.operations_stream <- operations_stream ; + state.operations_stream_push <- operations_stream_push ; + state.operations_stream_push (Some state.mempool) ; + return_unit) else return_unit (** This process listens to broadcast block and operations and incorporates @@ -769,11 +759,13 @@ let rec listener ~(user_hooks : (module Hooks)) ~state ~broadcast_pipe = let module User_hooks = (val user_hooks : Hooks) in Lwt_pipe.Unbounded.pop broadcast_pipe >>= function | Broadcast_op (operation_hash, packed_operation) -> - state.mempool <- (operation_hash, packed_operation) :: state.mempool ; - Lwt_pipe.Unbounded.push - state.operations_pipe - (Some (operation_hash, packed_operation)) ; - User_hooks.check_mempool_after_processing ~mempool:state.mempool + (if + List.mem_assoc ~equal:Operation_hash.equal operation_hash state.mempool + then return_unit + else ( + state.mempool <- (operation_hash, packed_operation) :: state.mempool ; + state.operations_stream_push (Some [(operation_hash, packed_operation)]) ; + User_hooks.check_mempool_after_processing ~mempool:state.mempool)) >>=? fun () -> listener ~user_hooks ~state ~broadcast_pipe | Broadcast_block (block_hash, block_header, operations) -> get_block_level block_header >>=? fun level -> @@ -813,7 +805,7 @@ let create_fake_node_state ~i ~live_depth let chain0 = [genesis0] in let validated_blocks_pipe = Lwt_pipe.Unbounded.create () in let heads_pipe = Lwt_pipe.Unbounded.create () in - let operations_pipe = Lwt_pipe.Unbounded.create () in + let operations_stream, operations_stream_push = Lwt_stream.create () in let genesis_block_true_hash = Block_header.hash { @@ -849,7 +841,8 @@ let create_fake_node_state ~i ~live_depth ]); validated_blocks_pipe; heads_pipe; - operations_pipe; + operations_stream; + operations_stream_push; streaming_operations = false; broadcast_pipes; genesis_block_true_hash; diff --git a/src/proto_alpha/lib_delegate/test/mockup_simulator/faked_services.ml b/src/proto_alpha/lib_delegate/test/mockup_simulator/faked_services.ml index e99d009738bfcf541ce909a8707be7f65a39e2c7..b2770d9bdd6903ca4c7ba9588c8eb06ce2312b84 100644 --- a/src/proto_alpha/lib_delegate/test/mockup_simulator/faked_services.ml +++ b/src/proto_alpha/lib_delegate/test/mockup_simulator/faked_services.ml @@ -234,6 +234,17 @@ module Make (Hooks : Mocked_services_hooks) = struct | None -> failwith "faked_services.inject_operation: can't deserialize" | Some operation -> Hooks.inject_operation operation) + let inject_private_operation = + Directory.register + Directory.empty + Injection_services.S.private_operation + (fun () _chain bytes -> + match Data_encoding.Binary.of_bytes_opt Operation.encoding bytes with + | None -> + failwith + "faked_services.inject_private_operation: can't deserialize" + | Some operation -> Hooks.inject_operation operation) + let broadcast_block = Directory.register Directory.empty @@ -323,6 +334,7 @@ module Make (Hooks : Mocked_services_hooks) = struct chain chain_id; inject_block; inject_operation; + inject_private_operation; monitor_operations; list_blocks; live_blocks; diff --git a/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml b/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml index 82ae1d67c2952fe16bf255d39609258e54ab40e8..01f139d8f7bab284994811b365b8268b8b5887f2 100644 --- a/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml +++ b/src/proto_alpha/lib_delegate/test/mockup_simulator/mockup_simulator.ml @@ -67,8 +67,10 @@ type state = { heads_pipe : (Block_hash.t * Block_header.t) Lwt_pipe.Unbounded.t; (** [heads_pipe] is used to implement the [monitor_heads] RPC. *) - operations_pipe : - (Operation_hash.t * Mockup.M.Protocol.operation) option Lwt_pipe.Unbounded.t; + mutable operations_stream : + (Operation_hash.t * Mockup.M.Protocol.operation) list Lwt_stream.t; + mutable operations_stream_push : + (Operation_hash.t * Mockup.M.Protocol.operation) list option -> unit; (** [operations_pipe] is used to implement the [operations_pipe] RPC. *) mutable streaming_operations : bool; (** A helper flag used to implement the monitor operations RPC. *) @@ -481,23 +483,18 @@ let make_mocked_services_hooks (state : state) (user_hooks : (module Hooks)) : let streamed = ref false in state.streaming_operations <- true ; let next () = - let rec pop_until_ok () = - Lwt_pipe.Unbounded.pop state.operations_pipe >>= function + let rec loop () = + Lwt_stream.get state.operations_stream >>= function | None when !streamed -> Lwt.return None | None -> streamed := true ; Lwt.return (Some []) - | Some op -> ( - User_hooks.on_new_operation op >>= function - | None when !streamed -> pop_until_ok () - | None -> - streamed := true ; - Lwt.return (Some []) - | Some (oph, op) -> - streamed := true ; - Lwt.return (Some [((oph, op), None)])) + | Some ops -> ( + List.filter_map_s User_hooks.on_new_operation ops >>= function + | [] -> loop () + | l -> Lwt.return_some (List.map (fun x -> (x, None)) l)) in - pop_until_ok () + loop () in let shutdown () = () in Tezos_rpc.Answer.{next; shutdown} @@ -745,22 +742,15 @@ let rec process_block state block_hash (block_header : Block_header.t) then ( state.chain <- new_chain ; clear_mempool state >>=? fun () -> - (* The head has changed, the messages in the operations pipe are no - good anymore. *) - ignore (Lwt_pipe.Unbounded.pop_all_now state.operations_pipe) ; - (if state.streaming_operations then ( - state.streaming_operations <- false ; - Lwt_pipe.Unbounded.push state.operations_pipe None ; - Lwt.return ()) - else Lwt.return ()) - >>= fun () -> - (* Put back in the pipe operations that are still alive. *) - List.iter_s - (fun op -> - Lwt_pipe.Unbounded.push state.operations_pipe (Some op) ; - Lwt.return ()) - state.mempool - >>= fun () -> return_unit) + (* The head changed: notify that the stream ended. *) + state.operations_stream_push None ; + state.streaming_operations <- false ; + (* Instanciate a new stream *) + let operations_stream, operations_stream_push = Lwt_stream.create () in + state.operations_stream <- operations_stream ; + state.operations_stream_push <- operations_stream_push ; + state.operations_stream_push (Some state.mempool) ; + return_unit) else return_unit (** This process listens to broadcast block and operations and incorporates @@ -769,11 +759,13 @@ let rec listener ~(user_hooks : (module Hooks)) ~state ~broadcast_pipe = let module User_hooks = (val user_hooks : Hooks) in Lwt_pipe.Unbounded.pop broadcast_pipe >>= function | Broadcast_op (operation_hash, packed_operation) -> - state.mempool <- (operation_hash, packed_operation) :: state.mempool ; - Lwt_pipe.Unbounded.push - state.operations_pipe - (Some (operation_hash, packed_operation)) ; - User_hooks.check_mempool_after_processing ~mempool:state.mempool + (if + List.mem_assoc ~equal:Operation_hash.equal operation_hash state.mempool + then return_unit + else ( + state.mempool <- (operation_hash, packed_operation) :: state.mempool ; + state.operations_stream_push (Some [(operation_hash, packed_operation)]) ; + User_hooks.check_mempool_after_processing ~mempool:state.mempool)) >>=? fun () -> listener ~user_hooks ~state ~broadcast_pipe | Broadcast_block (block_hash, block_header, operations) -> get_block_level block_header >>=? fun level -> @@ -813,7 +805,7 @@ let create_fake_node_state ~i ~live_depth let chain0 = [genesis0] in let validated_blocks_pipe = Lwt_pipe.Unbounded.create () in let heads_pipe = Lwt_pipe.Unbounded.create () in - let operations_pipe = Lwt_pipe.Unbounded.create () in + let operations_stream, operations_stream_push = Lwt_stream.create () in let genesis_block_true_hash = Block_header.hash { @@ -849,7 +841,8 @@ let create_fake_node_state ~i ~live_depth ]); validated_blocks_pipe; heads_pipe; - operations_pipe; + operations_stream; + operations_stream_push; streaming_operations = false; broadcast_pipes; genesis_block_true_hash;