diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index ba9ecd8a1632a0803c6df31ca5bda620475ee59e..10b77e24d4bc6762b783dad68628375f117eafa2 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -819,25 +819,30 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : current_batch := Accumulating new_contents) | _ -> () + (** Handles applicative (business-level) messages received from the P2P network, + i.e. messages that are not control ones like GRAFT, IWANT, SUBSCRIBE... *) + let handle_app_message state message = + let batching_configuration = state.message_handling in + let new_state, output = + GS.handle_receive_message + ~batching_configuration + message + state.gossip_state + in + (match batching_configuration with + | Sequentially -> () + | In_batches {time_interval} -> + batch_accumulator output time_interval state.events_stream) ; + update_gossip_state state (new_state, output) + |> handle_receive_message message + (** Handling messages received from the P2P network. *) let apply_p2p_message ~self ({gossip_state; _} as state) from_peer = function | Message_with_header {message; topic; message_id} -> (let receive_message = {GS.sender = from_peer; topic; message_id; message} in - let batching_configuration = state.message_handling in - let new_state, output = - GS.handle_receive_message - ~batching_configuration - receive_message - gossip_state - in - (match batching_configuration with - | Sequentially -> () - | In_batches {time_interval} -> - batch_accumulator output time_interval state.events_stream) ; - update_gossip_state state (new_state, output) - |> handle_receive_message receive_message) + handle_app_message state receive_message) [@profiler.span_f {verbosity = Notice} ["apply_event"; "P2P_input"; "In_message"; "Message_with_header"]] @@ -946,12 +951,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : match GS.Message_id.valid message.message_id with | `Valid | `Invalid -> let state = {state with unknown_validity_messages} in - GS.handle_receive_message - ~batching_configuration:state.message_handling - message - state.gossip_state - |> update_gossip_state state - |> handle_receive_message message + handle_app_message state message |> (* Other messages are processed recursively *) check_unknown_messages_id | `Unknown -> state