[go: up one dir, main page]

CN102023974B - A method, device and system for processing messages through a message queue - Google Patents

A method, device and system for processing messages through a message queue Download PDF

Info

Publication number
CN102023974B
CN102023974B CN200910092421.0A CN200910092421A CN102023974B CN 102023974 B CN102023974 B CN 102023974B CN 200910092421 A CN200910092421 A CN 200910092421A CN 102023974 B CN102023974 B CN 102023974B
Authority
CN
China
Prior art keywords
message
database
queue
message queue
processed
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Expired - Fee Related
Application number
CN200910092421.0A
Other languages
Chinese (zh)
Other versions
CN102023974A (en
Inventor
王磊
范晓晖
刘越
于蓉蓉
郑冬冬
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
China Mobile Communications Group Co Ltd
Original Assignee
China Mobile Communications Group Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by China Mobile Communications Group Co Ltd filed Critical China Mobile Communications Group Co Ltd
Priority to CN200910092421.0A priority Critical patent/CN102023974B/en
Publication of CN102023974A publication Critical patent/CN102023974A/en
Application granted granted Critical
Publication of CN102023974B publication Critical patent/CN102023974B/en
Expired - Fee Related legal-status Critical Current
Anticipated expiration legal-status Critical

Links

Images

Landscapes

  • Information Retrieval, Db Structures And Fs Structures Therefor (AREA)

Abstract

本发明实施例公开了一种通过消息队列处理消息的方法,包括以下步骤:接收来自消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并根据所述消息中的标记位将所述消息添加到数据库中;将所述数据库中的消息发送到内存中的第二消息队列,所述第二消息队列用于存储供消息使用设备获取并处理的消息;接收所述消息使用设备处理过的消息,将所述消息发送到所述第一消息队列中,并根据所述处理过的消息的标记位从所述数据库中删除或修改所述处理过的消息。本发明实施例兼顾了消息队列的性能和可靠性。本发明实施例同样公开了一种应用上述方法的装置和系统。

Figure 200910092421

The embodiment of the present invention discloses a method for processing messages through a message queue, which includes the following steps: receiving a message from a message production device, sending the message to the first message queue in the memory, and The message is added to the database; the message in the database is sent to the second message queue in the memory, and the second message queue is used to store the message obtained and processed by the message using device; receive the message Using the message processed by the device, sending the message to the first message queue, and deleting or modifying the processed message from the database according to the flag bit of the processed message. The embodiment of the present invention takes both performance and reliability of the message queue into consideration. The embodiment of the present invention also discloses a device and a system for applying the above method.

Figure 200910092421

Description

一种通过消息队列处理消息的方法、装置和系统A method, device and system for processing messages through a message queue

技术领域 technical field

本发明涉及互联网技术领域,尤其涉及一种通过消息队列处理消息的方法、装置和系统。The invention relates to the technical field of the Internet, in particular to a method, device and system for processing messages through a message queue.

背景技术 Background technique

在数据业务技术中,消息是指在两台计算机间传送的数据单位,可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息队列是在消息的传输过程中保存消息的容器,是一种分布式应用间交换信息的消息处理技术,可驻留在内存或磁盘上,主要用于提供路由并保证消息的传递,为任何应用程序提供消息处理和消息队列功能,无论该应用程序所在的计算机是否在同一个网络上或者是否同时联机。如果发送消息时接收者不可用,消息队列会保留消息,直到该消息被应用程序读走。通过消息队列,应用程序可独立地访问消息,而不需要知道消息之间的位置关系,且消息队列在继续执行后续步骤之前,不需要等待接收程序接收消息。In data business technology, a message refers to a data unit transmitted between two computers, which can be very simple, such as containing only text strings, or more complex, possibly containing embedded objects. A message queue is a container for storing messages during message transmission. It is a message processing technology for exchanging information between distributed applications. It can reside in memory or on disk. It is mainly used to provide routing and ensure message delivery. An application provides message processing and message queuing functionality, regardless of whether the computer on which the application is running is on the same network or online at the same time. If the recipient is unavailable when the message is sent, the message queue will hold the message until it is read by the application. Through the message queue, the application program can independently access the message without knowing the positional relationship between the messages, and the message queue does not need to wait for the receiving program to receive the message before continuing to perform subsequent steps.

消息队列为构造以同步或异步方式实现的分布式应用提供了松耦合方法,消息队列的API(Application Programming Interface,应用程序编程接口)调用被嵌入到新的或现存的应用中,通过消息发送到基于内存或磁盘的消息队列中。消息队列可用在应用中以执行多种功能,比如要求服务、交换信息或异步处理等。The message queue provides a loosely coupled method for constructing distributed applications implemented in a synchronous or asynchronous manner. The API (Application Programming Interface, Application Programming Interface) call of the message queue is embedded in a new or existing application and sent to In-memory or disk-based message queues. Message queues can be used in applications to perform functions such as requesting services, exchanging information, or asynchronous processing.

衡量消息队列是否优秀的标准包括性能和可靠性,应用于大规模的分布式系统的消息队列,消息量巨大,因此,对消息队列的性能要求非常高,要求有高并发和高吞吐量。为满足高效率的要求,现有技术通常使用的是内存消息队列,消息保存在内存中,以保证消息处理速度;为实现可靠性,一些消息队列使用内存消息队列+文件系统的方式实现消息的持久化,或者完全使用数据库来实现消息队列;还有一些消息队列使用内存消息队列+数据库的方式实现消息的持久化,但并没有采用触发式方式在消息队列和数据库之间实现消息同步。The criteria to measure whether the message queue is excellent include performance and reliability. The message queue used in large-scale distributed systems has a huge amount of messages. Therefore, the performance requirements for the message queue are very high, and high concurrency and high throughput are required. In order to meet the requirements of high efficiency, the existing technology usually uses the memory message queue, and the message is stored in the memory to ensure the message processing speed; in order to achieve reliability, some message queues use the memory message queue + file system to realize the message Persistence, or completely use the database to implement the message queue; there are also some message queues that use the memory message queue + database to achieve message persistence, but do not use a trigger method to achieve message synchronization between the message queue and the database.

发明人在实现本发明的过程中,发现现有技术至少存在以下缺陷:In the process of realizing the present invention, the inventor finds that the prior art has at least the following defects:

内存消息队列降低了系统的可靠性,一旦出现掉电、意外死机,或者程序崩溃等情况,无法保证消息的完整性,还可能丢掉大量未来得及处理的消息;而使用内存消息队列+文件系统或内存消息队列+数据库的方式实现消息的持久性时,效率很低,而且实现复杂,不易使用。The memory message queue reduces the reliability of the system. In case of power failure, unexpected crash, or program crash, the integrity of the message cannot be guaranteed, and a large number of messages that cannot be processed in the future may be lost; while using the memory message queue + file system or When the memory message queue + database method is used to achieve message persistence, the efficiency is very low, and the implementation is complicated and difficult to use.

发明内容 Contents of the invention

本发明实施例提供了一种通过消息队列处理消息的方法、装置和系统,用于兼顾消息队列的性能和可靠性。Embodiments of the present invention provide a method, device and system for processing messages through a message queue, which are used to balance the performance and reliability of the message queue.

本发明实施例提供了一种通过消息队列处理消息的方法,包括以下步骤:An embodiment of the present invention provides a method for processing messages through a message queue, including the following steps:

接收来自消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并根据所述消息中的标记位将所述消息添加到数据库中;Receive a message from the message production device, send the message to the first message queue in the memory, and add the message to the database according to the flag bit in the message;

将所述数据库中的消息发送到内存中的第二消息队列,所述第二消息队列用于存储供消息使用设备获取并处理的消息;Sending the messages in the database to a second message queue in memory, where the second message queue is used to store messages acquired and processed by the message using device;

接收所述消息使用设备处理过的消息,将所述消息发送到所述第一消息队列中,并根据所述处理过的消息的标记位从所述数据库中删除或修改所述处理过的消息。receiving the message processed by the message usage device, sending the message to the first message queue, and deleting or modifying the processed message from the database according to the tag bit of the processed message .

本发明实施例还提供了一种消息队列服务器,包括:The embodiment of the present invention also provides a message queue server, including:

收发模块,用于接收来自消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并接收消息使用设备处理过的消息,将所述处理过的消息发送到所述第一消息队列中;The transceiver module is configured to receive a message from a message production device, send the message to the first message queue in the memory, receive a message processed by the message usage device, and send the processed message to the first message queue. in the message queue;

数据库模块,用于存储消息,并将所述消息发送到内存中的第二消息队列;A database module, configured to store messages, and send the messages to the second message queue in memory;

添加模块,与所述数据库模块和所述收发模块连接,用于根据所述收发模块发送到所述第一消息队列中的来自所述消息生产设备的消息的标记位,将所述消息添加到所述数据库模块中;An adding module, connected to the database module and the transceiver module, configured to add the message to the message according to the flag bit of the message from the message production device sent by the transceiver module to the first message queue In the database module;

修改模块,与所述收发模块和所述数据库模块连接,用于根据所述收发模块发送到所述第一消息队列中的所述消息使用设备处理过的消息的标记位,将所述处理过的消息从所述数据库模块中删除或者在所述数据库模块中修改所述处理过的消息。A modifying module, connected to the transceiver module and the database module, configured to convert the processed The message is deleted from the database module or the processed message is modified in the database module.

本发明实施例还提供了一种通过消息队列处理消息的系统,包括消息生产设备、消息队列服务器和消息使用设备,The embodiment of the present invention also provides a system for processing messages through a message queue, including a message production device, a message queue server, and a message usage device,

所述消息生产设备,用于生成消息,并将所述消息发送到所述消息队列服务器;The message production device is configured to generate a message and send the message to the message queue server;

所述消息队列服务器,用于接收来自所述消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并根据所述消息中的标记位将所述消息添加到数据库中;将所述数据库中的消息发送到内存中的第二消息队列,使消息使用设备获取并处理所述消息;接收所述消息使用设备处理过的消息,将所述消息发送到所述第一消息队列中,并根据所述处理过的消息的标记位从所述数据库中删除或修改所述消息;The message queue server is configured to receive the message from the message production device, send the message to the first message queue in the memory, and add the message to the database according to the flag bit in the message; Send the message in the database to the second message queue in the memory, so that the message using device acquires and processes the message; receives the message processed by the message using device, and sends the message to the first message in the queue, and delete or modify the message from the database according to the flag bit of the processed message;

所述消息使用设备,用于从所述第二消息队列中获取并处理消息,将处理完毕的消息添加标记位,并发送到所述第一消息队列。The message usage device is configured to acquire and process messages from the second message queue, add a flag to the processed messages, and send them to the first message queue.

与现有技术相比,本发明实施例具有以下优点:利用内存消息队列高并发和高吞吐量的优点,提高了处理消息的效率,并保证了数据的可靠性,完全保留了数据库的事务性优点,除了完成消息的分发和传递,还保留消息的处理结果,随时可供其他用途;由于消息实时的保存在数据库,而数据库可以很方便的进行各种操作,便于跟踪当前消息队列的使用情况,发现故障和进行优化。Compared with the prior art, the embodiment of the present invention has the following advantages: the advantages of high concurrency and high throughput of the memory message queue are used to improve the efficiency of processing messages, ensure the reliability of data, and fully retain the transactional nature of the database Advantages: In addition to completing the distribution and delivery of messages, the processing results of messages are also retained, which can be used for other purposes at any time; since the messages are stored in the database in real time, and the database can easily perform various operations, which is convenient for tracking the current usage of the message queue , fault detection and optimization.

附图说明 Description of drawings

为了更清楚地说明本发明实施例或现有技术中的技术方案,下面将对本发明实施例或现有技术描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本发明的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动性的前提下,还可以根据这些附图获得其他的附图。In order to more clearly illustrate the technical solutions in the embodiments of the present invention or the prior art, the following will briefly introduce the drawings that need to be used in the description of the embodiments of the present invention or the prior art. Obviously, the accompanying drawings in the following description These are only some embodiments of the present invention, and those skilled in the art can also obtain other drawings based on these drawings without any creative effort.

图1为本发明实施例中的一种通过消息队列处理消息的方法流程图;FIG. 1 is a flowchart of a method for processing messages through a message queue in an embodiment of the present invention;

图2为本发明实施例应用场景中的一种通过消息队列处理消息的系统结构示意图;FIG. 2 is a schematic structural diagram of a system for processing messages through a message queue in an application scenario of an embodiment of the present invention;

图3为本发明实施例应用场景中的通过消息队列处理消息的流程图;FIG. 3 is a flow chart of processing messages through a message queue in an application scenario of an embodiment of the present invention;

图4为本发明实施例中的一种消息队列服务器的结构示意图;FIG. 4 is a schematic structural diagram of a message queue server in an embodiment of the present invention;

图5为本发明实施例应用场景中的消息队列服务器的结构示意图;FIG. 5 is a schematic structural diagram of a message queue server in an application scenario according to an embodiment of the present invention;

图6为本发明实施例中的一种通过消息队列处理消息的系统结构示意图。FIG. 6 is a schematic structural diagram of a system for processing messages through a message queue in an embodiment of the present invention.

具体实施方式 Detailed ways

本发明实施例提供的技术方案中,通过触发来实现数据库和消息队列之间的同步,利用内存消息队列的高性能和数据库的高可靠性,将内存消息队列和数据可的优点结合起来,实现了一种以数据库作为持久化工具的消息队列,解决了性能和可靠性问题。In the technical solution provided by the embodiment of the present invention, the synchronization between the database and the message queue is realized by triggering, and the high performance of the memory message queue and the high reliability of the database are used to combine the advantages of the memory message queue and the data security to realize A message queue using a database as a persistent tool is developed, which solves the problems of performance and reliability.

下面将结合本发明实施例中的附图,对本发明实施例的技术方案进行清楚、完整地描述,显然,所描述的实施例是本发明一部分实施例,而不是全部的实施例。基于本发明中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都属于本发明保护的范围。The following will clearly and completely describe the technical solutions of the embodiments of the present invention with reference to the accompanying drawings in the embodiments of the present invention. Obviously, the described embodiments are some of the embodiments of the present invention, but not all of them. Based on the embodiments of the present invention, all other embodiments obtained by persons of ordinary skill in the art without making creative efforts belong to the protection scope of the present invention.

如图1所示,为本发明实施例中的一种通过消息队列处理消息的方法流程图,包括以下步骤:As shown in Figure 1, it is a flow chart of a method for processing messages through a message queue in an embodiment of the present invention, including the following steps:

步骤101,接收来自消息生产设备的消息,将该消息发送到内存中的第一消息队列,并根据消息中的标记位将该消息添加到数据库中。Step 101, receiving a message from a message production device, sending the message to a first message queue in memory, and adding the message to a database according to a flag bit in the message.

步骤102,将数据库中的消息发送到内存中的第二消息队列。Step 102, sending the messages in the database to the second message queue in the memory.

其中,第二消息队列用于存储供消息使用设备获取并处理的消息。Wherein, the second message queue is used to store messages acquired and processed by the message using device.

步骤103,接收消息使用设备处理过的消息,将该消息发送到第一消息队列中,并根据该处理过的消息的标记位从数据库中删除或修改该处理过的消息。Step 103, receiving the message processed by the message usage device, sending the message to the first message queue, and deleting or modifying the processed message from the database according to the tag bit of the processed message.

如图2所示,为本发明实施例应用场景中的一种通过消息队列处理消息的系统结构示意图,该系统包括消息生产设备100、第一消息队列200、事务处理服务程序300、数据库和数据库触发器400、触发器外部过程500、第二消息队列600和消息使用设备700。其中,第一消息队列200、事务处理服务程序300、数据库400(包括数据库触发器)、触发器外部过程500和第二消息队列600组成完整的触发式数据库持久化的消息队列服务器800,为消息发送设备100和消息使用设备700提供消息交换服务。As shown in FIG. 2, it is a schematic structural diagram of a system for processing messages through a message queue in the application scenario of the embodiment of the present invention. The system includes a message production device 100, a first message queue 200, a transaction processing service program 300, a database, and a database. A trigger 400 , a trigger external process 500 , a second message queue 600 and a message consuming device 700 . Wherein, the first message queue 200, the transaction processing service program 300, the database 400 (including the database trigger), the trigger external process 500 and the second message queue 600 form a complete trigger database persistent message queue server 800, which is a message The sending device 100 and the message using device 700 provide a message exchange service.

以下结合上述应用场景,对本发明实施例中的通过消息队列处理消息的方法进行详细、具体的描述。The method for processing messages through the message queue in the embodiment of the present invention will be described in detail below in conjunction with the above application scenarios.

如图3所示,为本发明实施例应用场景中的通过消息队列处理消息的流程图,具体包括以下步骤:As shown in Figure 3, it is a flow chart of processing messages through message queues in the application scenario of the embodiment of the present invention, specifically including the following steps:

步骤301,创建第一消息队列200。Step 301, create a first message queue 200.

具体地,在消息队列服务器800中创建内存消息队列,作为第一消息队列200,该第一消息队列200用于接收来自各个消息生产设备100的消息,并将该消息传送给事务处理服务程序200。Specifically, a memory message queue is created in the message queue server 800 as the first message queue 200, and the first message queue 200 is used to receive messages from each message production device 100 and transmit the messages to the transaction processing service program 200 .

步骤302,消息生产设备100生成消息,将消息发送到第一消息队列200。Step 302 , the message production device 100 generates a message, and sends the message to the first message queue 200 .

其中,该消息携带一个标记位,用于指明如何处理该消息,包括对该消息进行数据库添加、删除、修改操作。由于该消息刚刚由消息生产设备100生成,并未被消息使用设备700处理过,因此,该消息携带的标记位指示将该消息添加到数据库400中。Wherein, the message carries a flag bit, which is used to indicate how to process the message, including adding, deleting, and modifying the message to the database. Since the message has just been generated by the message producing device 100 and has not been processed by the message consuming device 700 , the flag bit carried in the message indicates that the message should be added to the database 400 .

步骤303,启动事务处理服务程序300。Step 303, start the transaction processing service program 300.

步骤304,事务处理服务程序300获取第一消息队列200中的消息,根据该消息携带的插入标记,将该消息添加到数据库400中。Step 304 , the transaction processing service program 300 obtains the message in the first message queue 200 , and adds the message to the database 400 according to the insertion tag carried in the message.

步骤305,在数据库400中建立数据库触发器。Step 305 , creating a database trigger in the database 400 .

步骤306,建立触发器外部过程500。Step 306, create a trigger external process 500.

具体地,建立触发器外部过程500包括,按照数据库系统的外部过程规范建立外触发器外部过程500,以及按照数据库系统规范为数据库系统加载触发器外部过程500。触发器外部过程500用于在数据库400内的消息增加时被触发开始工作,把新增的消息发送到内存。Specifically, establishing the trigger external process 500 includes establishing the external trigger external process 500 according to the external process specification of the database system, and loading the trigger external process 500 for the database system according to the database system specification. The trigger external process 500 is used to be triggered to start working when the message in the database 400 increases, and to send the newly added message to the memory.

步骤307,当数据库中的消息发生增加操作时,数据库触发器被触发,将该消息发送到触发器外部过程500。Step 307 , when an add operation occurs to a message in the database, the database trigger is triggered, and the message is sent to the trigger external process 500 .

步骤308,建立第二消息队列600。In step 308, the second message queue 600 is established.

具体地,在消息队列服务器800上创建内存消息队列,作为第二消息队列600,该第二消息队列600用于接收来自触发器外部过程500的消息,并将该消息传送给消息使用设备700。Specifically, an in-memory message queue is created on the message queue server 800 as the second message queue 600 , and the second message queue 600 is used to receive messages from the trigger external process 500 and transmit the messages to the message consuming device 700 .

步骤309,触发器外部过程500将消息放入第二消息队列600。Step 309 , the trigger external process 500 puts the message into the second message queue 600 .

步骤310,消息使用设备700从第二消息队列600中获取并处理消息,将处理完毕的消息添加标记位,并发送到第一消息队列200。Step 310 , the message usage device 700 obtains and processes messages from the second message queue 600 , adds a flag to the processed messages, and sends them to the first message queue 200 .

步骤311,事务处理服务程序300获取第一消息队列200中消息,根据该消息携带的修改标记,在数据库400中删除或修改该消息。Step 311 , the transaction processing service program 300 obtains the message in the first message queue 200 , and deletes or modifies the message in the database 400 according to the modification flag carried in the message.

具体地,由于该消息已经被消息使用设备700处理过,因此,该消息携带的修改标记将该消息从数据库400中删除,或者在数据库100中修改该消息。Specifically, since the message has been processed by the message consuming device 700 , the modification mark carried by the message deletes the message from the database 400 or modifies the message in the database 100 .

需要说明的是,本发明实施例方法可以根据实际需要对各个步骤顺序进行调整。It should be noted that, in the method of the embodiment of the present invention, the order of each step may be adjusted according to actual needs.

本发明实施例提供的技术方案中,消息生成后发送到的第一消息队列200和触发后发送到的第二消息队列600都是内存消息队列,因此,处理速度极快,保证了消息处理的效率;消息进入第一消息队列200后立即被放入数据库400,消息得到可靠的保存。对消息生产设备100和消息使用设备700而言,消息队列服务器800相当于一个普通的内存消息队列,与普通的内存消息队列的不同之处在于,消息使用设备700对消息操作完毕后,要向第一消息队列200发送一个处理后的消息,来删除或者修改数据库400里记录的消息,即只有消息使用设备700正确接收并且处理完消息,明确发出操作指令之后,该消息才会被删除或修改,因此,数据库中的消息不会因为服务器掉电或意外死机等原因而丢失。In the technical solution provided by the embodiment of the present invention, the first message queue 200 sent to after the message is generated and the second message queue 600 sent to after the trigger are both memory message queues, therefore, the processing speed is extremely fast, ensuring the smoothness of message processing Efficiency: After the message enters the first message queue 200, it is put into the database 400 immediately, and the message is reliably saved. For the message production device 100 and the message consumption device 700, the message queue server 800 is equivalent to a common memory message queue, and the difference from the common memory message queue is that after the message consumption device 700 completes the message operation, it needs to send The first message queue 200 sends a processed message to delete or modify the message recorded in the database 400, that is, the message will be deleted or modified only after the message using device 700 correctly receives and processes the message, and clearly issues an operation instruction. , therefore, the messages in the database will not be lost due to reasons such as server power failure or unexpected crash.

本发明实施例在上述实施方式中提供了通过消息队列处理消息的方法和应用场景,相应地,本发明实施例还提供了应用上述通过消息队列处理消息的方法的装置和系统。Embodiments of the present invention provide a method and application scenarios for processing messages through a message queue in the above implementation manners. Correspondingly, embodiments of the present invention also provide an apparatus and system for applying the above method for processing messages through a message queue.

如图4所示,为本发明实施例中的一种消息队列服务器的结构示意图,包括:As shown in Figure 4, it is a schematic structural diagram of a message queue server in an embodiment of the present invention, including:

收发模块410,用于接收来自消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并接收消息使用设备处理过的消息,将所述处理过的消息发送到所述第一消息队列中。The transceiver module 410 is configured to receive a message from a message producing device, send the message to the first message queue in memory, receive a message processed by the message using device, and send the processed message to the first message queue. in a message queue.

数据库模块420,用于存储消息,并将所述消息发送到内存中的第二消息队列。The database module 420 is configured to store messages and send the messages to the second message queue in memory.

添加模块430,与数据库模块420和收发模块410连接,用于根据收发模块410发送到所述第一消息队列中的来自所述消息生产设备的消息的标记位,将所述消息添加到数据库模块420中。The adding module 430 is connected with the database module 420 and the transceiver module 410, and is used to add the message to the database module according to the flag bit of the message from the message production device sent by the transceiver module 410 to the first message queue 420 in.

修改模块440,与收发模块410和数据库模块420连接,用于根据收发模块410发送到所述第一消息队列中的所述消息使用设备处理过的消息的标记位,将所述处理过的消息从数据库模块420中删除或者在数据库模块420中修改上述处理过的消息。The modification module 440 is connected with the transceiver module 410 and the database module 420, and is configured to convert the processed message to The above-mentioned processed messages are deleted from the database module 420 or modified in the database module 420 .

如图5所示,为本发明实施例应用场景中的消息队列服务器的结构示意图,具体包括创建模块510、数据库模块520、收发模块530、添加模块540和修改模块550,其中,As shown in Figure 5, it is a schematic structural diagram of the message queue server in the application scenario of the embodiment of the present invention, specifically including a creation module 510, a database module 520, a transceiver module 530, an addition module 540 and a modification module 550, wherein,

创建模块510,与数据库模块520和收发模块530连接,用于创建第一消息队列和所述第二消息队列,分别供收发模块530和数据库模块520使用。The creation module 510 is connected with the database module 520 and the transceiver module 530, and is used to create the first message queue and the second message queue, which are respectively used by the transceiver module 530 and the database module 520.

数据库模块520,用于存储消息,并将所述消息发送到内存中的第二消息队列。The database module 520 is configured to store messages and send the messages to the second message queue in memory.

数据库模块520,具体用于通过数据库触发器将所述消息发送到触发器外部过程,并通过所述触发器外部过程将所述消息发送到所述第二消息队列。The database module 520 is specifically configured to send the message to the trigger external process through the database trigger, and send the message to the second message queue through the trigger external process.

收发模块530,用于接收来自消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并接收所述消息使用设备处理过的消息,将所述处理过的消息发送到所述第一消息队列中。The transceiver module 530 is configured to receive a message from a message production device, send the message to the first message queue in the memory, receive a message processed by the message consuming device, and send the processed message to the in the first message queue.

添加模块540,与数据库模块520和收发模块530连接,用于根据收发模块530发送到所述第一消息队列中的来自所述消息生产设备的消息的标记位,将所述消息添加到数据库模块520中。The adding module 540 is connected with the database module 520 and the transceiver module 530, and is used to add the message to the database module according to the flag bit of the message from the message production device sent by the transceiver module 530 to the first message queue 520 in.

上述添加模块540,具体用于根据所述第一消息队列中的消息携带的插入标记,将所述第一消息队列中消息添加到数据库模块520中。The adding module 540 is specifically configured to add the message in the first message queue to the database module 520 according to the insertion marker carried by the message in the first message queue.

修改模块550,与收发模块530和数据库模块520连接,用于根据收发模块530发送到所述第一消息队列中的所述消息使用设备处理过的消息的标记位,将所述处理过的消息从数据库模块520中删除或者在数据库模块520中修改上述处理过的消息。The modification module 550 is connected with the transceiver module 530 and the database module 520, and is configured to convert the processed message to The above-mentioned processed messages are deleted from the database module 520 or modified in the database module 520 .

本发明实施例充分利用了内存消息队列高并发和高吞吐量的优点,提高了处理消息的效率,并保证了数据的可靠性,完全保留了数据库的事务性优点,除了完成消息的分发和传递,还保留消息的处理结果,随时可供其他用途;由于消息实时的保存在数据库,而数据库可以很方便的进行各种操作,便于跟踪当前消息队列的使用情况,发现故障和进行优化。The embodiment of the present invention makes full use of the advantages of high concurrency and high throughput of the memory message queue, improves the efficiency of message processing, ensures the reliability of data, and fully retains the transactional advantages of the database. In addition to completing the distribution and delivery of messages , also retains the processing results of the message, which can be used for other purposes at any time; because the message is stored in the database in real time, and the database can easily perform various operations, which is convenient for tracking the current usage of the message queue, finding faults and optimizing.

如图6所示,为本发明实施例中的一种通过消息队列处理消息的系统结构示意图,包括消息生产设备610、消息队列服务器620和消息使用设备630,其中,As shown in FIG. 6, it is a schematic structural diagram of a system for processing messages through a message queue in an embodiment of the present invention, including a message production device 610, a message queue server 620, and a message usage device 630, wherein,

消息生产设备610,用于生成消息,并将所述消息发送到所述消息队列服务器620。The message production device 610 is configured to generate a message and send the message to the message queue server 620 .

消息队列服务器620,用于接收来自所述消息生产设备610的消息,将所述消息发送到内存中的第一消息队列,并根据所述消息中的标记位将所述消息添加到数据库中;将所述数据库中的消息发送到内存中的第二消息队列,使消息使用设备630获取并处理所述消息;接收所述消息使用设备处理过的消息,将所述消息发送到所述第一消息队列中,并根据所述处理过的消息的标记位从所述数据库中删除或修改所述消息;The message queue server 620 is configured to receive the message from the message production device 610, send the message to the first message queue in the memory, and add the message to the database according to the flag bit in the message; Send the message in the database to the second message queue in the memory, make the message using device 630 acquire and process the message; receive the message processed by the message using device, and send the message to the first message queue, and delete or modify the message from the database according to the flag bit of the processed message;

消息使用设备630,用于从所述第二消息队列中获取并处理消息,将处理完毕的消息添加标记位,并发送到所述第一消息队列。The message using device 630 is configured to obtain and process messages from the second message queue, add a flag to the processed messages, and send them to the first message queue.

上述消息队列服务器620,还用于创建内存消息队列,将所述内存消息队列作为所述第一消息队列。The above-mentioned message queue server 620 is further configured to create an in-memory message queue, and use the in-memory message queue as the first message queue.

上述消息队列服务器620,还用于启动事物处理服务程序,所述事物处理服务程序用于根据所述第一消息队列中的消息携带的标记位将所述消息添加到数据库中。The above-mentioned message queue server 620 is further configured to start a transaction processing service program, and the transaction processing service program is used to add the message to the database according to the tag bit carried by the message in the first message queue.

上述消息队列服务器620,还用于启动事物处理服务程序,所述事物处理服务程序用于根据所述第一消息队列中的消息携带的标记位,对所述数据库中的消息进行删除或修改。The above-mentioned message queue server 620 is further configured to start a transaction processing service program, and the transaction processing service program is used to delete or modify messages in the database according to the flag bit carried by the messages in the first message queue.

上述消息队列服务器620,还用于在所述数据库中建立数据库触发器,并建立触发器外部过程,所述数据库触发器用于将所述数据库中的消息发送到所述触发器外部过程,所述触发器外部过程用于将来自所述数据库触发器的消息发送到所述第二消息队列。The above-mentioned message queue server 620 is further configured to establish a database trigger in the database, and establish an external process of the trigger, and the database trigger is used to send messages in the database to the external process of the trigger. A trigger external procedure is used to send messages from the database trigger to the second message queue.

上述消息队列服务器620,还用于创建内存消息队列,将所述内存消息队列作为所述第二消息队列。The above-mentioned message queue server 620 is further configured to create an in-memory message queue, and use the in-memory message queue as the second message queue.

本发明实施例充分利用了内存消息队列高并发和高吞吐量的优点,效率远远超过使用文件系统持久化或者单纯以数据库为基础实现的消息队列,在内部实现中,事务处理服务程序和数据库之间采用连接池,减少数据库并发,而又充分利用数据库性能,采用的是触发式操作,比传统的轮询或者其他方式效率更高,CPU利用率更低;采用数据库作为持久化媒介,充分利用了数据库的可靠性优点,通过利用数据库自身携带的备份恢复功能,更保证了数据的可靠性;完全保留了数据库的事务性优点,除了完成消息的分发和传递,还保留消息的处理结果,随时可供其他用途;由于消息实时的保存在数据库,而数据库可以很方便的进行各种操作,便于跟踪当前消息队列的使用情况,发现故障和进行优化。The embodiments of the present invention make full use of the advantages of high concurrency and high throughput of memory message queues, and the efficiency far exceeds that of using file system persistence or message queues implemented solely on the basis of databases. In internal implementation, transaction processing service programs and databases The connection pool is used among them to reduce database concurrency and make full use of database performance. It adopts trigger operation, which is more efficient than traditional polling or other methods, and the CPU utilization rate is lower; using database as a persistent medium makes full use of Utilizing the reliability advantages of the database, by using the backup and recovery function carried by the database itself, the reliability of the data is guaranteed; the transactional advantages of the database are completely preserved, and in addition to completing the distribution and delivery of messages, it also retains the processing results of messages. It can be used for other purposes at any time; because the messages are stored in the database in real time, and the database can be easily used for various operations, which is convenient for tracking the current usage of the message queue, finding faults and optimizing them.

通过以上的实施方式的描述,本领域的技术人员可以清楚地了解到本发明可借助软件加必需的通用硬件平台的方式来实现,当然也可以通过硬件,但很多情况下前者是更佳的实施方式。基于这样的理解,本发明的技术方案本质上或者说对现有技术做出贡献的部分可以以软件产品的形式体现出来,该计算机软件产品存储在一个存储介质中,包括若干指令用以使得一台终端设备(可以是手机,个人计算机,服务器,或者网络设备等)执行本发明各个实施例所述的方法。Through the description of the above embodiments, those skilled in the art can clearly understand that the present invention can be implemented by means of software plus a necessary general-purpose hardware platform, and of course also by hardware, but in many cases the former is a better implementation Way. Based on this understanding, the essence of the technical solution of the present invention or the part that contributes to the prior art can be embodied in the form of a software product. The computer software product is stored in a storage medium and includes several instructions to make a A terminal device (which may be a mobile phone, a personal computer, a server, or a network device, etc.) executes the methods described in various embodiments of the present invention.

以上所述仅是本发明的优选实施方式,应当指出,对于本技术领域的普通技术人员来说,在不脱离本发明原理的前提下,还可以做出若干改进和润饰,这些改进和润饰也应视本发明的保护范围。The above is only a preferred embodiment of the present invention, it should be pointed out that, for those of ordinary skill in the art, without departing from the principle of the present invention, some improvements and modifications can also be made, and these improvements and modifications can also be made. It should be regarded as the protection scope of the present invention.

本领域技术人员可以理解实施例中的装置中的模块可以按照实施例描述进行分布于实施例的装置中,也可以进行相应变化位于不同于本实施例的一个或多个装置中。上述实施例的模块可以集成于一体,也可以分离部署;可以合并为一个模块,也可以进一步拆分成多个子模块。Those skilled in the art can understand that the modules in the device in the embodiment can be distributed in the device in the embodiment according to the description in the embodiment, or can be located in one or more devices different from the embodiment according to corresponding changes. The modules in the above embodiments can be integrated or deployed separately; they can be combined into one module, or further split into multiple sub-modules.

上述本发明实施例序号仅仅为了描述,不代表实施例的优劣。The serial numbers of the above embodiments of the present invention are for description only, and do not represent the advantages and disadvantages of the embodiments.

以上公开的仅为本发明的几个具体实施例,但是,本发明并非局限于此,任何本领域的技术人员能思之的变化都应落入本发明的保护范围。The above disclosures are only a few specific embodiments of the present invention, however, the present invention is not limited thereto, and any changes conceivable by those skilled in the art shall fall within the protection scope of the present invention.

Claims (10)

1.一种通过消息队列处理消息的方法,其特征在于,包括:1. A method for processing messages by a message queue, characterized in that, comprising: 接收来自消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并根据所述消息中的标记位将所述消息添加到数据库中;Receive a message from the message production device, send the message to the first message queue in the memory, and add the message to the database according to the flag bit in the message; 将所述数据库中的消息发送到内存中的第二消息队列,所述第二消息队列用于存储供消息使用设备获取并处理的消息;Sending the messages in the database to a second message queue in memory, where the second message queue is used to store messages acquired and processed by the message using device; 接收所述消息使用设备处理过的消息,将所述消息发送到所述第一消息队列中,并根据所述处理过的消息的标记位从所述数据库中删除或修改所述处理过的消息。receiving the message processed by the message usage device, sending the message to the first message queue, and deleting or modifying the processed message from the database according to the tag bit of the processed message . 2.如权利要求1所述的方法,其特征在于,所述根据消息中的标记位将消息添加到数据库中,具体为:2. The method according to claim 1, wherein the message is added to the database according to the tag position in the message, specifically: 根据所述第一消息队列中的消息携带的插入标记,将所述消息添加到所述数据库中。Add the message to the database according to the insertion mark carried by the message in the first message queue. 3.如权利要求1所述的方法,其特征在于,所述根据处理过的消息的标记位从数据库中删除或修改处理过的消息,具体为:3. The method according to claim 1, characterized in that, deleting or revising the processed message from the database according to the tag position of the processed message, specifically: 根据所述处理过的消息携带的修改标记,将所述处理过的消息从所述数据库中删除或者在所述数据库中修改所述处理过的消息。According to the modification flag carried by the processed message, the processed message is deleted from the database or the processed message is modified in the database. 4.如权利要求1所述的方法,其特征在于,所述将数据库中的消息发送到内存中的第二消息队列,具体为:4. The method according to claim 1, wherein the sending of the message in the database to the second message queue in the internal memory is specifically: 通过所述数据库中的数据库触发器将所述数据库中的消息发送到触发器外部过程,并通过所述触发器外部过程将所述消息发送到所述第二消息队列。A message in the database is sent to a trigger external procedure by a database trigger in the database, and the message is sent to the second message queue by the trigger external procedure. 5.如权利要求1所述的方法,其特征在于,所述接收消息使用设备处理过的消息之前,还包括:5. The method according to claim 1, characterized in that before receiving the message processed by the device, further comprising: 所述消息使用设备从所述第二消息队列中获取并处理消息,将处理完毕的消息添加标记位,并发送到所述第一消息队列。The message usage device acquires and processes messages from the second message queue, adds a flag to the processed messages, and sends them to the first message queue. 6.一种消息队列服务器,其特征在于,包括:6. A message queue server, characterized in that, comprising: 收发模块,用于接收来自消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并接收消息使用设备处理过的消息,将所述处理过的消息发送到所述第一消息队列中;The transceiver module is configured to receive a message from a message production device, send the message to the first message queue in the memory, receive a message processed by the message usage device, and send the processed message to the first message queue. in the message queue; 数据库模块,用于存储消息,并将所述消息发送到内存中的第二消息队列;A database module, configured to store messages, and send the messages to the second message queue in memory; 添加模块,与所述数据库模块和所述收发模块连接,用于根据所述收发模块发送到所述第一消息队列中的来自所述消息生产设备的消息的标记位,将所述消息添加到所述数据库模块中;An adding module, connected to the database module and the transceiver module, configured to add the message to the message according to the flag bit of the message from the message production device sent by the transceiver module to the first message queue In the database module; 修改模块,与所述收发模块和所述数据库模块连接,用于根据所述收发模块发送到所述第一消息队列中的所述消息使用设备处理过的消息的标记位,将所述处理过的消息从所述数据库模块中删除或者在所述数据库模块中修改所述处理过的消息。A modifying module, connected to the transceiver module and the database module, configured to convert the processed The message is deleted from the database module or the processed message is modified in the database module. 7.如权利要求6所述消息队列服务器,其特征在于,7. message queue server as claimed in claim 6, is characterized in that, 所述添加模块,具体用于根据所述第一消息队列中的消息携带的插入标记,将所述第一消息队列中消息添加到所述数据库模块中。The adding module is specifically configured to add the message in the first message queue to the database module according to the insertion marker carried by the message in the first message queue. 8.如权利要求6所述消息队列服务器,其特征在于,还包括:8. The message queue server according to claim 6, further comprising: 创建模块,与所述收发模块连接,用于创建所述第一消息队列和所述第二消息队列,分别供所述收发模块和所述数据库模块使用。A creation module, connected to the transceiver module, is used to create the first message queue and the second message queue, which are respectively used by the transceiver module and the database module. 9.如权利要求6所述消息队列服务器,其特征在于,9. message queue server as claimed in claim 6, is characterized in that, 所述数据库模块,具体用于通过数据库触发器将所述消息发送到触发器外部过程,并通过所述触发器外部过程将所述消息发送到所述第二消息队列。The database module is specifically configured to send the message to the trigger external process through the database trigger, and send the message to the second message queue through the trigger external process. 10.一种通过消息队列处理消息的系统,包括消息生产设备、消息队列服务器和消息使用设备,其特征在于,10. A system for processing messages through a message queue, comprising a message production device, a message queue server and a message usage device, characterized in that, 所述消息生产设备,用于生成消息,并将所述消息发送到所述消息队列服务器;The message production device is configured to generate a message and send the message to the message queue server; 所述消息队列服务器,用于接收来自所述消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并根据所述消息中的标记位将所述消息添加到数据库中;将所述数据库中的消息发送到内存中的第二消息队列,使消息使用设备获取并处理所述消息;接收所述消息使用设备处理过的消息,将所述消息发送到所述第一消息队列中,并根据所述处理过的消息的标记位从所述数据库中删除或修改所述消息;The message queue server is configured to receive the message from the message production device, send the message to the first message queue in the memory, and add the message to the database according to the flag bit in the message; Send the message in the database to the second message queue in the memory, so that the message using device acquires and processes the message; receives the message processed by the message using device, and sends the message to the first message in the queue, and delete or modify the message from the database according to the flag bit of the processed message; 所述消息使用设备,用于从所述第二消息队列中获取并处理消息,将处理完毕的消息添加标记位,并发送到所述第一消息队列。The message usage device is configured to acquire and process messages from the second message queue, add a flag to the processed messages, and send them to the first message queue.
CN200910092421.0A 2009-09-14 2009-09-14 A method, device and system for processing messages through a message queue Expired - Fee Related CN102023974B (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN200910092421.0A CN102023974B (en) 2009-09-14 2009-09-14 A method, device and system for processing messages through a message queue

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN200910092421.0A CN102023974B (en) 2009-09-14 2009-09-14 A method, device and system for processing messages through a message queue

Publications (2)

Publication Number Publication Date
CN102023974A CN102023974A (en) 2011-04-20
CN102023974B true CN102023974B (en) 2012-08-22

Family

ID=43865282

Family Applications (1)

Application Number Title Priority Date Filing Date
CN200910092421.0A Expired - Fee Related CN102023974B (en) 2009-09-14 2009-09-14 A method, device and system for processing messages through a message queue

Country Status (1)

Country Link
CN (1) CN102023974B (en)

Cited By (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN102890631A (en) * 2012-09-13 2013-01-23 新浪网技术(中国)有限公司 Method for transmitting message based on persistent message queue and message transmission device

Families Citing this family (15)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN103503388B (en) * 2011-09-01 2016-08-03 华为技术有限公司 A kind of distributed queue's message read method and equipment, system
CN102306197B (en) * 2011-09-22 2013-07-03 用友软件股份有限公司 Device and method for guaranteeing consistency of data-source-crossing operation results
CN102970353B (en) * 2012-11-08 2015-04-08 大唐软件技术股份有限公司 Method and system for business data processing
CN103064731A (en) * 2012-12-26 2013-04-24 人民搜索网络股份公司 Device and method for improving message queue system performance
CN104216659A (en) * 2013-05-30 2014-12-17 中兴通讯股份有限公司 Off-line message storage method and servers
CN104796456B (en) * 2015-03-17 2019-06-28 青岛海尔智能家电科技有限公司 A kind of message treatment method and device
CN105159657B (en) * 2015-06-12 2019-03-08 北京京东尚科信息技术有限公司 Handle the method and system of message
CN105471714A (en) * 2015-12-09 2016-04-06 百度在线网络技术(北京)有限公司 Message processing method and device
CN107665139B (en) * 2016-07-29 2021-07-27 北京新唐思创教育科技有限公司 Implementation method and device for real-time bidirectional rendering in online teaching
CN107277022B (en) * 2017-06-27 2020-03-13 中国联合网络通信集团有限公司 Process marking method and device
CN108092918A (en) * 2017-12-07 2018-05-29 长城计算机软件与系统有限公司 A kind of method for message transmission and system
CN109408203B (en) * 2018-11-01 2019-10-18 无锡华云数据技术服务有限公司 A kind of implementation method, device, the computing system of queue message consistency
CN110109799A (en) * 2019-03-29 2019-08-09 北京奇安信科技有限公司 A kind of real time monitoring processing method and processing device of computing resource operation conditions
CN110287046A (en) * 2019-07-03 2019-09-27 浪潮云信息技术有限公司 Business service message method and system based on queue
CN111309467B (en) * 2020-02-24 2023-07-14 拉扎斯网络科技(上海)有限公司 Task distribution method and device, electronic device and storage medium

Citations (3)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN1482766A (en) * 2002-09-13 2004-03-17 华为技术有限公司 A Method of Automatically Generating Network Management Report
CN1561022A (en) * 2004-03-04 2005-01-05 中兴通讯股份有限公司 Method of internal data base main-spare synchronous
US6970945B1 (en) * 1999-11-01 2005-11-29 Seebeyond Technology Corporation Systems and methods of message queuing

Patent Citations (4)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US6970945B1 (en) * 1999-11-01 2005-11-29 Seebeyond Technology Corporation Systems and methods of message queuing
US20060053425A1 (en) * 1999-11-01 2006-03-09 Seebeyond Technology Corporation, A California Corporation System and method of intelligent queuing
CN1482766A (en) * 2002-09-13 2004-03-17 华为技术有限公司 A Method of Automatically Generating Network Management Report
CN1561022A (en) * 2004-03-04 2005-01-05 中兴通讯股份有限公司 Method of internal data base main-spare synchronous

Cited By (2)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN102890631A (en) * 2012-09-13 2013-01-23 新浪网技术(中国)有限公司 Method for transmitting message based on persistent message queue and message transmission device
CN102890631B (en) * 2012-09-13 2016-12-07 新浪网技术(中国)有限公司 Method based on persistent message queue transmission message and massage transmission device

Also Published As

Publication number Publication date
CN102023974A (en) 2011-04-20

Similar Documents

Publication Publication Date Title
CN102023974B (en) A method, device and system for processing messages through a message queue
KR102006513B1 (en) Application consistent snapshots of a shared volume
CN103312624B (en) A kind of Message Queuing Services system and method
CN103516580B (en) A kind of method and system that message sink and forwarding are realized based on message queue
US8886787B2 (en) Notification for a set of sessions using a single call issued from a connection pool
CN112671760A (en) Socket-based client cross-platform network communication method and related equipment thereof
CN112558875B (en) Data verification method, device, electronic device and storage medium
CN102521712A (en) Process instance data processing method and device
CN111343102B (en) Flow rate control method, server, client server and system
CN103049317A (en) Highly-concurrent data fast write system and method based on queues in cloud environment
CN102497427A (en) Method and device for realizing data acquisition services of renewable energy source monitoring system
CN113852610B (en) Message processing method, device, computer equipment and storage medium
CN116301568A (en) A data access method, device and equipment
CN104866528B (en) Multi-platform data acquisition method and system
CN113672410A (en) Data processing method and electronic device
US10318385B2 (en) Service recovery using snapshots and interservice messages
CN115510036A (en) Data migration method, device, equipment and storage medium
CN116662035A (en) Method and device for processing transaction message of message queue
CN111708835B (en) Blockchain data storage method and device
CN108241616B (en) Message pushing method and device
CN115914380A (en) Communication Delay Optimization Method for Cloud Computing Resource Manager Based on Zlib Compression Algorithm
CN106557530B (en) Operation system, data recovery method and device
KR102850653B1 (en) Method and apparatus for linking multiple service in distributed server cluster
CN117632445A (en) Request processing method and device, task execution method and device
CN103944875A (en) Data exchanging method and data exchanging system

Legal Events

Date Code Title Description
C06 Publication
PB01 Publication
C10 Entry into substantive examination
SE01 Entry into force of request for substantive examination
C14 Grant of patent or utility model
GR01 Patent grant
CF01 Termination of patent right due to non-payment of annual fee

Granted publication date: 20120822

Termination date: 20210914

CF01 Termination of patent right due to non-payment of annual fee