Disclosure of Invention
In view of the foregoing, the present application is directed to a method and apparatus for processing data packets, so as to improve the performance of a distributed system. The specific scheme is as follows:
in a first aspect, a method for processing a data packet is provided, including:
Receiving a data packet sent by a data transmission platform, and putting the data packet into a file receiving directory in the form of a file, wherein the data transmission platform is used for storing the data packet sent by an associated system, and the data packet comprises an associated system identifier;
acquiring a data packet in a file receiving directory, and putting the data packet into an association queue according to an association system identifier of the data packet, wherein the association queue corresponds to the association system;
Sequentially taking out the data packets of the associated queues by adopting a preset scheduling mode, and sequentially putting the data packets into a waiting queue;
and sequentially taking out the data packets in the waiting queue, putting the data packets into a consumer pool, and processing the data packets by utilizing consumers in the consumer pool, wherein the consumer pool comprises more than one consumer.
In another implementation manner of the first aspect of the embodiment of the present application, after the sequentially fetching the data packets in the waiting queue, the method further includes:
copying the data packet to a response queue, wherein the response queue is used for storing the data packet which is being processed by the consumer pool;
Judging whether the data packet is processed, if yes, removing the data packet in the response queue;
And if the processing is not completed and the running error of the consumer pool occurs, copying the data packet in the response queue into the waiting queue after the consumer pool is restored.
In another implementation manner of the first aspect of the embodiment of the present application, the method for processing a data packet is applied to a distributed system, and further includes:
And after the distributed system is down and restarted, the data packets transmitted by the data transmission platform received in the file receiving catalog during the down period of the distributed system are placed in the corresponding associated queues according to the associated system identification.
In another implementation manner of the first aspect of the embodiment of the present application, the obtaining the data packet in the file receiving directory and placing the data packet into the association queue according to the association system identifier of the data packet includes:
And calling a monitor thread to monitor the file receiving directory, and when a newly added data packet is monitored, putting the newly added data packet into an associated queue corresponding to the associated system according to the associated system identifier of the newly added data packet.
In a possible design, in another implementation manner of the first aspect of the embodiment of the present application, the scheduling manner includes a polling schedule and a priority schedule;
The step of sequentially taking out the data packets of the associated queues in a preset mode and sequentially putting the data packets into the waiting queues comprises the following steps:
Calling a scheduling thread, sequentially taking out data packets in an associated queue in a polling mode, and sequentially putting the data packets into a waiting queue;
Or alternatively, the first and second heat exchangers may be,
And calling a scheduling thread, sequentially taking out the data packets in the associated queues according to the preset priority, and sequentially putting the data packets into the waiting queues.
In another implementation manner of the first aspect of the embodiment of the present application, the sequentially fetching the data packets in the waiting queue and putting the data packets into the consumer pool includes:
and calling a consumer thread, sequentially taking out the data packets in the waiting queue by utilizing the consumer thread, and sending the data packets to consumers in the consumer pool.
In another implementation manner of the first aspect of the embodiment of the present application, the process of calling the listener thread to put a new added data packet into an association queue corresponding to the association system when the listener thread listens for the new added data packet includes:
When any monitor thread monitors a newly added data packet, accessing a distributed lock of a file receiving directory, and if the lock is successfully obtained, executing a process of putting the newly added data packet into an associated queue corresponding to the associated system, wherein the distributed lock of the file receiving directory is used for ensuring that only one monitor thread runs at the same time.
In another implementation manner of the first aspect of the embodiment of the present application, the scheduling thread includes two or more scheduling threads, and the calling scheduling thread sequentially takes out the data packets in the associated queues by adopting a polling manner and sequentially puts the data packets into the waiting queues, or calls the scheduling thread, sequentially takes out the data packets in the associated queues according to a preset priority, and sequentially puts the data packets into the waiting queues, including:
Any dispatch thread accesses the distributed lock of the associated queue, if the lock is successfully obtained, the process of taking out the data packets in the associated queue in turn in a polling mode and putting the data packets into the waiting queue in turn is executed, wherein the distributed lock of the associated queue is used for ensuring that only one dispatch thread runs at the same time;
Or alternatively, the first and second heat exchangers may be,
Any dispatch thread accesses the distributed lock of the associated queue, if the lock is successfully obtained, the process of sequentially taking out the data packets in the associated queue according to the preset priority and sequentially putting the data packets into the waiting queue is executed, and the distributed lock of the associated queue is used for ensuring that only one dispatch thread runs at the same time.
In another implementation manner of the first aspect of the embodiment of the present application, the invoking the consumer thread and sequentially fetching the data packets in the waiting queue by using the consumer thread includes:
any consumer thread accesses the distributed lock of the waiting queue, and if the lock is successfully obtained, the process of sequentially taking out the data packets in the waiting queue by using the consumer thread is executed, wherein the distributed lock of the waiting queue is used for ensuring that only one consumer thread runs at the same time.
In a second aspect, there is provided a packet processing device, including:
The data packet transmission unit is used for receiving a data packet sent by the data transmission platform and placing the data packet into a file receiving directory in a file form, wherein the data transmission platform is used for storing the data packet sent by the association system, and the data packet contains an association system identifier;
The data packet distribution unit is used for acquiring the data packet in the file receiving directory, and placing the data packet into an association queue according to the association system identifier of the data packet, wherein the association queue corresponds to the association system;
The data packet scheduling unit is used for sequentially taking out the data packets of the associated queues by adopting a preset scheduling mode and sequentially putting the data packets into the waiting queues;
And the data packet processing unit is used for sequentially taking out the data packets in the waiting queue, putting the data packets into a consumer pool, and processing the data packets by utilizing consumers in the consumer pool, wherein the consumer pool comprises more than one consumer.
By means of the technical scheme, the data packet processing method provided by the application firstly receives the data packets from the data transmission platform, wherein the data transmission platform is used for storing the data packets sent by the association system, the data packets are stored in the file receiving directory in the form of files, and the data packets contain the association system identifiers. And then, putting the data packets into corresponding associated queues according to the associated system identifiers, so that effective classification of the data packets is realized, each associated queue corresponds to a specific associated system, and the design ensures that the processing of the data packets is more orderly, and confusion and errors in the processing process are reduced. Further, a preset scheduling mode is adopted, and the data packets are sequentially taken out of the associated queues and put into the waiting queues. This scheduling mechanism avoids delays and accumulation of packet processing while also improving the response speed and processing capacity of the system. Finally, the packets are sequentially fetched from the wait queue and placed into the consumer pool. The consumer pool comprises one or more consumers which process the data packets in parallel, and in this way, the system can fully utilize the advantages of multi-thread processing, and the processing speed and throughput of the data packets are obviously improved. In summary, the data packet processing method provided by the application realizes the rapid, accurate and reliable processing of the data packet, improves the response speed and processing capacity of the system, and provides a high-efficiency and reliable solution for the data interaction between the systems.
Detailed Description
Before describing the inventive solution, the English language referred to in this text is explained first:
a distributed lock, a mechanism for coordinating access of a plurality of nodes to shared resources in a distributed system, ensures that only one node can operate the resources at any time and prevents data inconsistency. It needs to be mutually exclusive, high availability, high performance, re-entrant and secure. Common implementations include database-based, cache systems (e.g., redis), zooKeeper, etcd, and the like. The implementation is chosen according to the specific scenario and system architecture.
Message Queue (Message Queue) refers to platform-independent data interactions with highly reliable messaging mechanisms and integration of distributed systems based on data communications. Through message passing and message queuing, the system can provide functions of application decoupling, elastic expansion, redundant storage, traffic peak clipping, asynchronous communication, data synchronization and the like in a distributed environment.
The following description of the embodiments of the present application will be made clearly and completely with reference to the accompanying drawings, in which it is apparent that the embodiments described are only some embodiments of the present application, but not all embodiments. All other embodiments, which can be made by those skilled in the art based on the embodiments of the application without making any inventive effort, are intended to be within the scope of the application.
The application provides a data packet processing method, which comprises the steps of firstly receiving a data packet file which is stored by a data transmission platform and contains an associated system identifier, and ensuring accurate identification and classification of the data packet. And then, putting the data packets into corresponding associated queues according to the identifiers, and realizing effective classification. Then, the data packet is moved from the associated queue to the waiting queue by a preset scheduling mode, and finally is sent to a consumer pool containing a plurality of consumers. The method improves the processing speed, accuracy and reliability of the data packet and enhances the response and processing capacity of the system.
The scheme of the application can be realized based on the terminal with data processing capability, and the terminal can be a mobile phone, a computer, a server and the like.
Next, referring to fig. 1, a flow chart of a packet processing method provided by the present application is implemented by a packet processing system deployed on a terminal, and specifically includes the following steps:
Step S100, receiving a data packet sent by a data transmission platform, and placing the data packet into a file receiving directory in a file form.
In particular, when the associated systems need to interact with the current system for business data, they interact with the packet processing system via the data transmission platform. The data transmission platform sends data packets to the data packet processing system, and the data packets are encapsulated into file form in the transmission process so as to ensure the integrity and the safety of data. After the data packet processing system receives the data packets, the data packets are uniformly stored in a preset file receiving directory.
The file receiving directory is a specific storage area for temporarily storing all the data packet files sent from the associated system through the data transmission platform. In the file receiving directory, each data packet contains an associated system identifier, and the identifier information is used for distinguishing the data packets sent by different associated systems, so that the data packets can be correctly identified and processed. And each data packet in the file receiving directory conforms to a specific format specification, wherein the data packet comprises identification information of the associated system and unique identification information of the data packet. This information is critical to the subsequent packet processing flow and is used to guide how the packet is further processed and distributed.
Step S110, obtaining the data packet in the file receiving catalog, and putting the data packet into an association queue according to the association system identification of the data packet.
Specifically, the data packets in the file receiving directory are obtained, and each data packet has an associated system identifier for distinguishing from which particular associated system the data packet originated from. By parsing these identifications, the system is able to identify the source of each packet and perform subsequent classification operations accordingly. The "associated queue" herein is effectively a message queue, which is a high-level data structure used to manage and deliver messages in a distributed system.
Each association queue is dedicated to receiving packets from a particular association system, ensuring that packets are routed and processed correctly. The one-to-one correspondence not only improves the efficiency of data processing, but also enhances the manageability and expandability of the system. In this way, the data packets of each associated system are efficiently organized and managed, providing a solid basis for subsequent data processing and analysis.
And step S120, sequentially taking out the data packets of the associated queues by adopting a preset scheduling mode, and sequentially putting the data packets into a waiting queue.
Specifically, the system sequentially takes out data packets from each associated queue according to a preset scheduling mode, and puts the data packets into a waiting queue, wherein the waiting queue contains the data packets from different associated queues, and performs unified management to wait for scheduled operation, so that delay and accumulation of data packet processing are avoided, and meanwhile, the response speed and processing capacity of the system are improved.
And step S130, sequentially taking out the data packets in the waiting queue, putting the data packets into a consumer pool, and processing the data packets by consumers in the consumer pool.
Specifically, the system sequentially fetches the data packets from the waiting queue. The waiting queue acts as a buffer area that collects packets from various associated queues that have undergone preliminary sorting and ordering in preparation for the next processing stage.
After the packets are removed, the system places the packets into a customer pool. Consumer pools are made up of multiple consumers who work concurrently and are collectively responsible for the final processing of the packets. Each consumer is a "consumer" in the message queue, and pulls packets from the wait queue and independently processes the packets for business logic. The design of the consumer pool allows the system to process a plurality of data packets in parallel, so that the data processing efficiency can be improved, and the whole data streaming speed can be increased. This concurrent processing mechanism is an important means to increase system throughput and response speed. By the mode, the system can ensure that the data packet is processed timely and effectively, and the requirement of real-time data processing is met.
The data packet processing method provided by the application firstly receives the data packets from the data transmission platform, wherein the data transmission platform is used for storing the data packets sent by the association system, the data packets are stored in the file receiving catalog in the form of files, and the data packets contain the association system identification. And then, putting the data packets into corresponding associated queues according to the associated system identifiers, so that effective classification of the data packets is realized, each associated queue corresponds to a specific associated system, and the design ensures that the processing of the data packets is more orderly, and confusion and errors in the processing process are reduced. Further, a preset scheduling mode is adopted, and the data packets are sequentially taken out of the associated queues and put into the waiting queues. This scheduling mechanism avoids delays and accumulation of packet processing while also improving the response speed and processing capacity of the system. Finally, the packets are sequentially fetched from the wait queue and placed into the consumer pool. The consumer pool comprises one or more consumers which process the data packets in parallel, and in this way, the system can fully utilize the advantages of multi-thread processing, and the processing speed and throughput of the data packets are obviously improved. In summary, the data packet processing method provided by the application realizes the rapid, accurate and reliable processing of the data packet, improves the response speed and processing capacity of the system, and provides a high-efficiency and reliable solution for the data interaction between the systems.
Further, in some embodiments of the present application, the packet processing system may further include a response queue, referring to fig. 2, fig. 2 is a flow chart of another packet processing method provided in the embodiment of the present application, and the specific flow is as follows:
step 200, receiving a data packet sent by a data transmission platform, and placing the data packet into a file receiving directory in a file form.
Step S210, obtaining the data packet in the file receiving catalog, and putting the data packet into an association queue according to the association system identification of the data packet.
Step S220, the data packets of the related queues are sequentially taken out by adopting a preset scheduling mode and are sequentially put into the waiting queues.
Specifically, the steps S200-S220 correspond to the steps S100-S120 in the foregoing embodiments one by one, and are described in detail with reference to the foregoing description, and are not repeated here.
And step S230, sequentially taking out the data packets in the waiting queue, copying the data packets to the response queue, putting the data packets into a consumer pool, and processing the data packets by consumers in the consumer pool.
Specifically, when the system fetches packets from the wait queue, the packets are copied and stored in the reply queue. The purpose of the answer queue is to record which packets are currently being processed by the consumers in the consumer pool as a temporary storage area. In this way, the system is able to keep an accurate track of the packet processing status.
After the packets are removed, the system places the packets into a customer pool. Consumer pools are made up of multiple consumers who work concurrently and are collectively responsible for the final processing of the packets. Each consumer is a "consumer" in the message queue, and pulls packets from the wait queue and independently processes the packets for business logic. The design of the consumer pool allows the system to process a plurality of data packets in parallel, so that the data processing efficiency can be improved, and the whole data streaming speed can be increased. This concurrent processing mechanism is an important means to increase system throughput and response speed. By the mode, the system can ensure that the data packet is processed timely and effectively, and the requirement of real-time data processing is met.
Step S240, judging whether the data packet is processed, if so, executing step S250, and if not, executing step S260.
In particular, for packets that have been copied into the reply queue, the system needs to make a decision process to determine its processing status, and the system will examine each packet in the reply queue to determine if they have been fully processed by the consumers in the consumer pool.
Step S250, removing the data packet in the response queue.
Specifically, if the system determines that a packet has been processed, the packet is removed from the reply queue. This step is to maintain the accuracy and up-to-date state of the reply queue, ensuring that only those packets being processed or waiting to be processed are contained in the queue.
And step S260, when the consumer pool has running errors, copying the data packet in the response queue into the waiting queue after the consumer pool is restored.
Specifically, if the system finds that a certain data packet has not been processed during the inspection process, and an operation error occurs in the consumer pool during the process, the system will take corresponding recovery measures. Once the consumer pool resumes normal operation, the system re-copies the packets that were not processed by the reply queues into the wait queue so that they can re-enter the process flow to wait for re-retrieval and processing by the consumers in the consumer pool. This procedure ensures that the data packets are not lost even in the event of errors and can be correctly rearranged for processing, thus ensuring continuity and integrity of the data processing.
In this embodiment, the system sequentially takes out the data packets from the waiting queue and puts them into the consumer pool, and copies the data packets taken out from the waiting queue to the response queue. By examining each packet in the reply queue, the system can determine whether they have been fully processed by the consumers in the consumer pool. If the data packets are processed, the data packets are removed from the response queue, which not only maintains the accuracy and the latest state of the response queue, but also releases the memory resources, optimizing the performance of the system. However, if the consumer pool is in error during processing, the system provides an effective recovery measure. The system will re-copy the unprocessed data packets in the response queue to the waiting queue, once the consumer pool resumes normal operation, these data packets can re-enter the processing flow, waiting to be retrieved and processed again by the consumers in the consumer pool. This process ensures not only that the data packets are not lost due to system failure, but also that the continuity and integrity of the data processing is ensured, even in the face of operational errors, the stability and reliability of the data processing is ensured.
In some embodiments of the present application, in the packet processing method according to the present application, particularly for the operation characteristics of the distributed system, a processing mechanism for handling the system after downtime and restarting may be further adopted, and the processing mechanism is described below:
Alternatively, when a downtime event occurs and then the system is restarted, the system will perform a particular recovery procedure. In this process, the system first identifies the data packets that were sent to the file receiving directory through the data transmission platform during downtime. These packets cannot be handled in time when the system is down, so they can accumulate in the file receiving directory.
After the system is restarted, the file receiving directory is checked and all data packets received during downtime are collected. The system will then sort each packet according to its associated system identification and place it in its corresponding associated queue, respectively. This step ensures that each packet is properly assigned to its corresponding processing queue, ready for subsequent data processing.
In this way, the present embodiment can ensure that no data packets are lost even in the event of downtime, and all data packets received during downtime can be properly managed and processed. This mechanism increases the robustness of the system, ensures continuity and integrity of data processing, and maintains an orderly flow of data even in the face of unexpected system failures.
In some embodiments of the present application, a method for placing data packets into an associated queue using a listener thread is provided and is described below.
Alternatively, a listener thread may be used to continuously monitor the status of the file receiving directory. This file receiving directory is the designated location where the system receives data packets from the data transmission platform. The main task of the listener thread is to detect if a new packet arrives in the directory.
When the listener thread monitors that a new packet is added to the file receiving directory, it will initiate a process flow. This flow first involves identifying the association system identity of the newly added packets, from which the listener thread can determine to which association system each newly added packet should belong. After determining the association system, the listener thread places the newly added data packet into an association queue corresponding to the association system. Each associated system has its own associated queue for buffering packets from the system. Such a design allows the system to sort and organize the data packets in preparation for subsequent data processing and consumption flows. Meanwhile, in order to optimize the storage resources, the listener thread can delete the corresponding data packet file under the receiving directory after the data packet is successfully transferred, so as to save the disk space and optimize the use of the storage resources.
The listener thread in the embodiment ensures the continuity and accuracy of the receiving, identifying, classifying and preliminary processing of the data packet, and provides a solid foundation for the subsequent data packet processing. The process is automatic, ensures timeliness and accuracy of data packet processing, and improves efficiency and reliability of the whole data processing flow.
In some embodiments of the application, the application provides a flexible scheduling mechanism to ensure that data packets can be processed efficiently and fairly. The scheduling means may include a polling schedule and a priority schedule. The two scheduling modes can also be realized by a scheduling thread, wherein the scheduling thread is a key component responsible for managing and distributing data packets in a system, and the following process is introduced:
Optionally, in the polling scheduling mode, the scheduling thread takes out data packets from each associated queue in turn according to a preset order, and puts them into the waiting queue. The polling mechanism ensures that each associated system can obtain equal processing opportunities, and effectively avoids the condition that certain queues wait for processing for a long time, thereby enhancing the fairness and response speed of the system. The polling scheduling is simple and effective, and can be used in a scene of ensuring fairness of data packet processing.
Priority scheduling allows the system to process data packets according to preset priority rules. Specifically, the system may employ various policies including weights, packet sizes, processing times, etc. to determine the priority of the packets based on the actual conditions. In this mode, the dispatch thread takes the data packet from the associated queue and places it in the wait queue based on the urgency of the data packet or other business logic related priority indicator. This mechanism enables high priority packets to be processed faster, and is suitable for scenarios where timeliness of data processing is critical.
The flexible scheduling mechanism in the embodiment enables the system to adapt to different service demands and processing pressures, and improves the efficiency of data processing and the stability of the system. By the design, the data packet processing in the distributed system can be guaranteed to be fair and efficient, and different requirements on data processing in different service scenes are met.
In some embodiments of the present application, a consumer thread may be utilized to place data packets into a consumer pool, the process described below:
Alternatively, after the packets are successfully transferred from the associated queue to the wait queue, the packets may be further processed by the consumer thread. Consumer threads are a critical component within the system that is responsible for sequentially fetching packets from the wait queue and sending them to the consumers in the consumer pool for final processing.
The design of consumer threads allows the system to process multiple data packets in parallel, so that the data processing efficiency can be improved, and the whole data streaming speed can be increased. In practice, the system monitors the wait queue and, once a new packet is in the queue, triggers the consumer thread to fetch and process the packet. The consumer thread may assign the data packet to the most appropriate consumer for processing, depending on its type, size, or processing requirements. This allocation mechanism ensures data processing efficiency and balance, preventing the situation where some consumers in the consumer pool are overloaded while others are idle. There may be multiple consumers in the consumer pool that can independently service these packets, and they also work together to ensure that the packets are handled in a timely and efficient manner.
According to the embodiment, through the cooperative work of the consumer thread and the consumer pool, the rapid processing of a large number of data packets can be realized, and meanwhile, the accuracy and the reliability of data processing are ensured. This mechanism not only improves the efficiency of data processing, but also enhances the stability of the system.
Referring to fig. 3, fig. 3 is a schematic structural diagram of a packet processing system according to the present embodiment, and fig. 3 is described below based on the foregoing embodiments:
The system comprises a correlation system, a data transmission platform, a file receiving directory, a correlation queue, a waiting queue, a response queue, a consumer pool and other key components, wherein the file receiving directory, the correlation queue, the waiting queue, the response queue and the consumer pool can form a data packet processing system.
Optionally, the association system is an external system that performs data interaction with the data packet processing system, and the association system generates a data packet and sends the data packet to the data packet processing system through the data transmission platform. The data transmission platform serves as an intermediary and is responsible for receiving data packets sent from different association systems A, B, C and the like and storing the data packets in file form in a file receiving directory. These packets contain an associated system identification, which is key information to identify and classify the packets.
In the file receive directory, the data packets are then captured by the listener thread. The monitor thread continuously monitors the directory, and once the new data packet is detected, the data packet is distributed to the corresponding associated queue according to the associated system identifier. Each association queue serves specifically for an association system, ensuring that the packets can be properly classified and initially processed.
Then, the dispatch thread takes out the data packet from each associated queue according to a preset dispatch strategy and puts the data packet into a waiting queue. The scheduling policy may be polling scheduling or priority scheduling, so as to adapt to different service requirements. The polling schedule ensures that each associated queue is handled fairly, while the priority schedule allows the system to handle packets according to their urgency or other traffic logic related priority metrics.
Once the packet is placed in the wait queue, the consumer thread is invoked. The consumer thread is responsible for fetching packets from the wait queue and sending them to the consumers in the consumer pool for processing. The consumer pool is composed of a plurality of consumers, and the consumers work in parallel, so that the efficiency and the speed of data processing are improved.
In addition, in order to improve the fault tolerance of the system, the application further comprises a response queue for storing the data packets being processed by the consumer pool. The system periodically checks the packets in the reply queue to determine if they have been processed. If the packet processing is complete, they will be removed from the reply queue. If the consumer pool has operation errors in the processing process, the system can put unprocessed data packets in the response queue back to the waiting queue after recovery, so that the continuity and the integrity of data processing are ensured.
The schematic diagram of the data packet processing structure shown in the embodiment achieves a remarkable technical effect through the mutual cooperation of the components. The system firstly receives the data packets from different associated systems through the file receiving catalogue, and classifies and temporarily stores the data packets by utilizing the associated queues, so that the data processing order is ensured. And secondly, a mode of combining polling scheduling and priority scheduling is adopted, and data packets are transferred from the associated queue to the waiting queue through a scheduling thread, so that the scheduling flexibility and efficiency are improved. The design of the consumer thread pool allows multiple consumers to process data packets in parallel, and the concurrent processing capacity of the system is significantly improved. In addition, the introduction of the response queue provides a fault tolerant mechanism for the system, ensures that data packets are not lost when the consumer pool fails, and can be rescheduled. Overall, the system structure realizes the efficient, stable and reliable processing of the data packet through the cooperative work of the components, and is particularly suitable for the data processing requirements under the high concurrency and distributed environment.
In some embodiments of the present application, there is further provided a method for coordinating mutually exclusive access of resources using a distributed lock, see fig. 4, where fig. 4 is a schematic diagram of a distributed lock application structure. The procedure is described below:
Optionally, since the system application may be deployed in a distributed cluster manner, there may be multiple Pod applications running simultaneously to provide services, and the file receiving directory, the associated queue, and the waiting queue in the overall packet processing scheme are mutually exclusive accessed resources, so that a distributed lock is required to coordinate the work between different threads. Distributed locks may be applied to file receiving directories, associated queues, wait queues, and answer queues.
The key components of the file receiving directory, the associated queue, the waiting queue, the response queue, etc. are contained in fig. 4. The system utilizes distributed locks (lock 1, lock 2, lock 3) to ensure that the processing of data packets is mutually exclusive and safe in a high concurrency environment. The file monitor thread monitors the file receiving directory, after detecting a new data packet, ensures uniqueness through the distributed lock 1, and then distributes the data packet to a corresponding associated queue. The dispatch thread manages the associated queue through the distributed lock 2, and transfers the data packet to the waiting queue according to a preset strategy. The consumer thread fetches the data packet from the wait queue through the distributed lock 3 for processing. The whole flow is realized by independent operation units such as Pod1, pod2, pod3 and the like, each unit contains a monitor, a schedule and a consumer thread, and the high efficiency and the stability of data processing are commonly maintained. The distributed locks for the file receiving directory, associated queue, wait queue and answer queue, respectively, are described in detail below.
And the monitor thread is responsible for monitoring the file receiving directory in the process of inputting the data packet from the file receiving directory to the associated queue so as to timely capture the newly arrived data packet. When any listener thread detects a new data packet, it first attempts to access and acquire the distributed lock of the file receiving directory. This step is critical to ensuring mutual exclusivity in the packet processing process, and prevents multiple listener threads from writing to the file receiving directory at the same time, thereby avoiding data collisions and potential system errors.
Once the listener thread successfully acquires the distributed lock, it will perform the classification and distribution process of the data packet. Specifically, the listener thread accurately places the data packet into an association queue corresponding to the association system according to the association system identifier in the newly added data packet. This process not only ensures that the data packets can be correctly classified, but also lays a foundation for subsequent data processing flows. Distributed locking of file receiving directories plays a vital role in the overall process. It ensures that only one listener thread can perform the packet dropping operation at any given time, thereby maintaining system stability and data consistency.
Distributed lock of associated queues the dispatch thread is responsible for fetching packets from the associated queues and placing them in a wait queue so that the pool of consumer threads can further process the packets. To ensure thread security and data consistency for this process, the system incorporates a distributed lock mechanism for the associated queues.
When any dispatch thread is ready to perform its task, it will first attempt to acquire the distributed lock of the associated queue. The purpose of this lock is to ensure that only one dispatch thread can operate on the associated queue at the same time. If the scheduling thread successfully acquires the lock, it will process the data packet according to the scheduling policy preset by the system. In the poll scheduling mode, the scheduling thread will access each associated queue in turn, fairly fetch packets and place them in the wait queue. The polling mechanism ensures that all associated queues can be uniformly processed, and avoids the situation that certain queues wait for processing for a long time. Or if the system is configured with a priority scheduling mode, the scheduling thread will fetch the data packet according to the preset priority of the data packet. This means that urgent or important data packets can be processed preferentially, thus meeting the traffic demands that have strict requirements on the timeliness of the data processing. The distributed lock of the associated queue plays a key role in the process, not only ensures mutual exclusivity of the scheduling process, but also ensures the sequence of data packet processing and the overall stability of the system.
Distributed lock of wait queue-consumer thread is responsible for taking out data packets from wait queue and processing. To ensure thread security for this process and to avoid repeated processing or loss of data packets, the system employs a distributed lock mechanism for the wait queue.
When any consumer thread is ready to fetch a packet from the wait queue, it will first attempt to acquire the wait queue's distributed lock. The purpose of this lock is to ensure that only one consumer thread can operate on the wait queue at the same time. This mutual exclusion mechanism is necessary because in a distributed system, multiple consumer threads may attempt to access the wait queue at the same time, which may lead to confusion in packet processing if appropriate synchronization measures are not taken.
If the consumer thread successfully acquires the distributed lock, it will perform the process of fetching the data packet. The consumer thread will sequentially fetch the packets from the wait queue, which is ordered, ensuring that the packets are processed in the order they were placed in the wait queue. The distributed lock of the waiting queue not only ensures mutual exclusivity of the consumer thread when the data packet is taken out, but also ensures consistency of data packet processing and stability of the system.
Distributed lock of answer queue when consumer thread takes out packets from wait queue and starts processing, system copies these packets into answer queue. The distributed lock ensures that only one consumer thread can write to the response queue at any given time, preventing multiple threads from modifying the response queue at the same time, thereby avoiding repeated processing or loss of data packets.
The distributed lock provided by the embodiment plays a vital role, and ensures the stability and reliability of the system in a high concurrency environment. By acquiring the distributed lock before key operation, the system can realize exclusive access to the shared resource, and data conflict and inconsistency are prevented. The distributed locks are combined with a scheduling strategy to support load balancing and fault recovery, so that the fault tolerance and high availability of the system are improved.
The following describes a packet processing device provided in an embodiment of the present application, and the packet processing device described below and the packet processing method described above may be referred to correspondingly.
Referring to fig. 5, fig. 5 is a schematic structural diagram of a packet processing device according to an embodiment of the present application.
As shown in fig. 5, the apparatus may include:
The data packet transmission unit 11 is configured to receive a data packet sent by a data transmission platform, and put the data packet into a file receiving directory in a file form, where the data transmission platform is configured to store the data packet sent by an association system, and the data packet includes an association system identifier;
the data packet distributing unit 12 is configured to obtain a data packet in a file receiving directory, and put the data packet into an association queue according to an association system identifier of the data packet, where the association queue corresponds to the association system;
A data packet scheduling unit 13, configured to sequentially take out data packets of the associated queues in a preset scheduling manner, and sequentially put the data packets into a waiting queue;
And the data packet processing unit 14 is used for sequentially taking out the data packets in the waiting queue, putting the data packets into a consumer pool, and processing the data packets by using consumers in the consumer pool, wherein the consumer pool comprises more than one consumer.
Optionally, the data packet processing device of the embodiment of the present application further includes:
the data packet copying unit is used for copying the data packet to a response queue, and the response queue is used for storing the data packet which is being processed by the consumer pool;
The judging unit is used for judging whether the data packet is processed, if yes, removing the data packet in the response queue;
and the data packet recovery unit is used for copying the data packet in the response queue to the waiting queue after the consumer pool is recovered after the data packet is not processed and the running error occurs in the consumer pool.
Optionally, the data packet processing device of the embodiment of the present application further includes:
And the retransmission unit is used for retransmitting the data packet transmitted by the data transmission platform received in the file receiving catalog during the downtime of the distributed system after the downtime of the distributed system is restarted, and placing the data packet in the corresponding associated queue according to the associated system identifier.
In a possible implementation, the process of the packet distribution unit 12 obtaining a packet in the file receiving directory and placing the packet into the association queue according to the association system identifier of the packet includes:
And calling a monitor thread to monitor the file receiving directory, and when a newly added data packet is monitored, putting the newly added data packet into an associated queue corresponding to the associated system according to the associated system identifier of the newly added data packet.
In a possible implementation, the process of sequentially taking out the data packets of the associated queue by using a preset manner and sequentially putting the data packets into the waiting queue by using the data packet scheduling unit 13 includes:
And calling a scheduling thread to sequentially take out the data packets in the associated queues in a polling mode and sequentially put the data packets into the waiting queues, or calling the scheduling thread to sequentially take out the data packets in the associated queues according to a preset priority and sequentially put the data packets into the waiting queues.
In a possible implementation, the process of the packet processing unit 14 for sequentially fetching the packets in the waiting queue and putting the packets into the consumer pool includes:
and calling a consumer thread, sequentially taking out the data packets in the waiting queue by utilizing the consumer thread, and sending the data packets to consumers in the consumer pool.
In a possible implementation, the listener thread includes more than two, and the process of calling the listener thread to place the newly added data packet into the association queue corresponding to the association system when the listener thread listens for the newly added data packet includes:
When any monitor thread monitors a newly added data packet, accessing a distributed lock of a file receiving directory, and if the lock is successfully obtained, executing a process of putting the newly added data packet into an associated queue corresponding to the associated system, wherein the distributed lock of the file receiving directory is used for ensuring that only one monitor thread runs at the same time.
In one possible implementation, the scheduling thread includes more than two scheduling threads, and the process of calling the scheduling threads by the packet scheduling unit 13, sequentially taking out the packets in the associated queues by adopting a polling manner, and sequentially placing the packets in the waiting queues, or calling the scheduling threads, sequentially taking out the packets in the associated queues according to a preset priority, and sequentially placing the packets in the waiting queues includes:
Any dispatch thread accesses the distributed lock of the associated queue, if the lock is successfully obtained, the process of taking out the data packets in the associated queue in turn in a polling mode and putting the data packets into the waiting queue in turn is executed, wherein the distributed lock of the associated queue is used for ensuring that only one dispatch thread runs at the same time;
Or alternatively, the first and second heat exchangers may be,
Any dispatch thread accesses the distributed lock of the associated queue, if the lock is successfully obtained, the process of sequentially taking out the data packets in the associated queue according to the preset priority and sequentially putting the data packets into the waiting queue is executed, and the distributed lock of the associated queue is used for ensuring that only one dispatch thread runs at the same time.
In one possible implementation, the consumer thread includes more than two, and the packet processing unit 14 invokes the consumer thread and uses the consumer thread to sequentially fetch the packets in the waiting queue, including:
any consumer thread accesses the distributed lock of the waiting queue, and if the lock is successfully obtained, the process of sequentially taking out the data packets in the waiting queue by using the consumer thread is executed, wherein the distributed lock of the waiting queue is used for ensuring that only one consumer thread runs at the same time.
The embodiment of the application also provides electronic equipment. Referring to fig. 6, a schematic diagram of an electronic device suitable for use in implementing embodiments of the present application is shown. The electronic device in the embodiment of the application can include, but is not limited to, a fixed terminal such as a mobile phone, a tablet computer, a wearable device, and the like. The electronic device shown in fig. 6 is only an example and should not be construed as limiting the functionality and scope of use of the embodiments of the application.
As shown in fig. 6, the electronic device may include a processing means (e.g., a central processing unit, a graphic processor, etc.) 601, which may perform various appropriate actions and processes according to a program stored in a Read Only Memory (ROM) 602 or a program loaded from a storage means 608 into a Random Access Memory (RAM) 603, to implement the packet processing method of the foregoing embodiment of the present application. In the state where the electronic device is powered on, various programs and data necessary for the operation of the electronic device are also stored in the RAM 603. The processing device 601, the ROM 602, and the RAM 603 are connected to each other through a bus 604. An input/output (I/O) interface 605 is also connected to bus 604.
In general, devices may be connected to I/O interface 605 including input devices 606, including for example, touch screens, touch pads, keyboards, mice, cameras, microphones, accelerometers, gyroscopes, etc., output devices 607, including for example, liquid Crystal Displays (LCDs), speakers, vibrators, etc., storage devices 608, including for example, memory cards, hard disks, etc., and communication devices 609. The communication means 609 may allow the electronic device to communicate with other devices wirelessly or by wire to exchange data. While fig. 6 shows an electronic device having various means, it is to be understood that not all of the illustrated means are required to be implemented or provided. More or fewer devices may be implemented or provided instead.
Embodiments of the present application also provide a computer readable storage medium carrying one or more computer programs, which when executed by an electronic device, enable the electronic device to implement the data packet processing method of the foregoing embodiments of the present application.
It should be further noted that the above-described apparatus embodiments are merely illustrative, and that the units described as separate units may or may not be physically separate, and that units shown as units may or may not be physical units, may be located in one place, or may be distributed over a plurality of network units. Some or all of the modules may be selected according to actual needs to achieve the purpose of the solution of this embodiment. In addition, in the drawings of the embodiment of the device provided by the application, the connection relation between the modules represents that the modules have communication connection, and can be specifically implemented as one or more communication buses or signal lines.
From the above description of the embodiments, it will be apparent to those skilled in the art that the present application may be implemented by means of software plus necessary general purpose hardware, or of course by means of special purpose hardware including application specific integrated circuits, special purpose CPUs, special purpose memories, special purpose components, etc. Generally, functions performed by computer programs can be easily implemented by corresponding hardware, and specific hardware structures for implementing the same functions can be varied, such as analog circuits, digital circuits, or dedicated circuits. But a software program implementation is a preferred embodiment for many more of the cases of the present application. Based on such understanding, the technical solution of the present application may be embodied essentially or in a part contributing to the prior art in the form of a software product stored in a readable storage medium, such as a floppy disk, a usb disk, a removable hard disk, a ROM, a RAM, a magnetic disk or an optical disk of a computer, etc., comprising several instructions for causing a computer device (which may be a personal computer, a training device, a network device, etc.) to perform the method according to the embodiments of the present application.
In the above embodiments, it may be implemented in whole or in part by software, hardware, firmware, or any combination thereof. When implemented in software, may be implemented in whole or in part in the form of a computer program product.
The computer program product includes one or more computer instructions. When loaded and executed on a computer, produces a flow or function in accordance with embodiments of the present application, in whole or in part. The computer may be a general purpose computer, a special purpose computer, a computer network, or other programmable apparatus. The computer instructions may be stored in a computer-readable storage medium or transmitted from one computer-readable storage medium to another computer-readable storage medium, for example, the computer instructions may be transmitted from one website, computer, training device, or data center to another website, computer, training device, or data center via a wired (e.g., coaxial cable, optical fiber, digital Subscriber Line (DSL)) or wireless (e.g., infrared, wireless, microwave, etc.). The computer readable storage medium may be any available medium that can be stored by a computer or a data storage device such as a training device, a data center, or the like that contains an integration of one or more available media. The usable medium may be a magnetic medium (e.g., floppy disk, hard disk, tape), an optical medium (e.g., DVD), or a semiconductor medium (e.g., solid state disk (Solid STATE DISK, SSD)), etc.
In the present specification, each embodiment is described in a progressive manner, and each embodiment focuses on the difference from other embodiments, and may be combined according to needs, and the same similar parts may be referred to each other.