CN102023974B - A method, device and system for processing messages through a message queue - Google Patents
A method, device and system for processing messages through a message queue Download PDFInfo
- Publication number
- CN102023974B CN102023974B CN200910092421.0A CN200910092421A CN102023974B CN 102023974 B CN102023974 B CN 102023974B CN 200910092421 A CN200910092421 A CN 200910092421A CN 102023974 B CN102023974 B CN 102023974B
- Authority
- CN
- China
- Prior art keywords
- message
- database
- queue
- message queue
- processed
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Expired - Fee Related
Links
- 238000000034 method Methods 0.000 title claims abstract description 54
- 238000004519 manufacturing process Methods 0.000 claims abstract description 24
- 230000008569 process Effects 0.000 claims description 28
- 230000004048 modification Effects 0.000 claims description 8
- 238000012986 modification Methods 0.000 claims description 8
- 238000003780 insertion Methods 0.000 claims description 4
- 230000037431 insertion Effects 0.000 claims description 4
- 239000003550 marker Substances 0.000 claims description 2
- 238000010586 diagram Methods 0.000 description 8
- 230000002688 persistence Effects 0.000 description 4
- 238000009826 distribution Methods 0.000 description 3
- 238000005516 engineering process Methods 0.000 description 3
- 230000002085 persistent effect Effects 0.000 description 3
- 230000006870 function Effects 0.000 description 2
- 230000001960 triggered effect Effects 0.000 description 2
- 230000005540 biological transmission Effects 0.000 description 1
- 230000007547 defect Effects 0.000 description 1
- 238000001514 detection method Methods 0.000 description 1
- 238000005457 optimization Methods 0.000 description 1
- 238000011084 recovery Methods 0.000 description 1
- 230000000717 retained effect Effects 0.000 description 1
- 238000003860 storage Methods 0.000 description 1
- 230000001360 synchronised effect Effects 0.000 description 1
Images
Landscapes
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
本发明实施例公开了一种通过消息队列处理消息的方法,包括以下步骤:接收来自消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并根据所述消息中的标记位将所述消息添加到数据库中;将所述数据库中的消息发送到内存中的第二消息队列,所述第二消息队列用于存储供消息使用设备获取并处理的消息;接收所述消息使用设备处理过的消息,将所述消息发送到所述第一消息队列中,并根据所述处理过的消息的标记位从所述数据库中删除或修改所述处理过的消息。本发明实施例兼顾了消息队列的性能和可靠性。本发明实施例同样公开了一种应用上述方法的装置和系统。
The embodiment of the present invention discloses a method for processing messages through a message queue, which includes the following steps: receiving a message from a message production device, sending the message to the first message queue in the memory, and The message is added to the database; the message in the database is sent to the second message queue in the memory, and the second message queue is used to store the message obtained and processed by the message using device; receive the message Using the message processed by the device, sending the message to the first message queue, and deleting or modifying the processed message from the database according to the flag bit of the processed message. The embodiment of the present invention takes both performance and reliability of the message queue into consideration. The embodiment of the present invention also discloses a device and a system for applying the above method.
Description
技术领域 technical field
本发明涉及互联网技术领域,尤其涉及一种通过消息队列处理消息的方法、装置和系统。The invention relates to the technical field of the Internet, in particular to a method, device and system for processing messages through a message queue.
背景技术 Background technique
在数据业务技术中,消息是指在两台计算机间传送的数据单位,可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息队列是在消息的传输过程中保存消息的容器,是一种分布式应用间交换信息的消息处理技术,可驻留在内存或磁盘上,主要用于提供路由并保证消息的传递,为任何应用程序提供消息处理和消息队列功能,无论该应用程序所在的计算机是否在同一个网络上或者是否同时联机。如果发送消息时接收者不可用,消息队列会保留消息,直到该消息被应用程序读走。通过消息队列,应用程序可独立地访问消息,而不需要知道消息之间的位置关系,且消息队列在继续执行后续步骤之前,不需要等待接收程序接收消息。In data business technology, a message refers to a data unit transmitted between two computers, which can be very simple, such as containing only text strings, or more complex, possibly containing embedded objects. A message queue is a container for storing messages during message transmission. It is a message processing technology for exchanging information between distributed applications. It can reside in memory or on disk. It is mainly used to provide routing and ensure message delivery. An application provides message processing and message queuing functionality, regardless of whether the computer on which the application is running is on the same network or online at the same time. If the recipient is unavailable when the message is sent, the message queue will hold the message until it is read by the application. Through the message queue, the application program can independently access the message without knowing the positional relationship between the messages, and the message queue does not need to wait for the receiving program to receive the message before continuing to perform subsequent steps.
消息队列为构造以同步或异步方式实现的分布式应用提供了松耦合方法,消息队列的API(Application Programming Interface,应用程序编程接口)调用被嵌入到新的或现存的应用中,通过消息发送到基于内存或磁盘的消息队列中。消息队列可用在应用中以执行多种功能,比如要求服务、交换信息或异步处理等。The message queue provides a loosely coupled method for constructing distributed applications implemented in a synchronous or asynchronous manner. The API (Application Programming Interface, Application Programming Interface) call of the message queue is embedded in a new or existing application and sent to In-memory or disk-based message queues. Message queues can be used in applications to perform functions such as requesting services, exchanging information, or asynchronous processing.
衡量消息队列是否优秀的标准包括性能和可靠性,应用于大规模的分布式系统的消息队列,消息量巨大,因此,对消息队列的性能要求非常高,要求有高并发和高吞吐量。为满足高效率的要求,现有技术通常使用的是内存消息队列,消息保存在内存中,以保证消息处理速度;为实现可靠性,一些消息队列使用内存消息队列+文件系统的方式实现消息的持久化,或者完全使用数据库来实现消息队列;还有一些消息队列使用内存消息队列+数据库的方式实现消息的持久化,但并没有采用触发式方式在消息队列和数据库之间实现消息同步。The criteria to measure whether the message queue is excellent include performance and reliability. The message queue used in large-scale distributed systems has a huge amount of messages. Therefore, the performance requirements for the message queue are very high, and high concurrency and high throughput are required. In order to meet the requirements of high efficiency, the existing technology usually uses the memory message queue, and the message is stored in the memory to ensure the message processing speed; in order to achieve reliability, some message queues use the memory message queue + file system to realize the message Persistence, or completely use the database to implement the message queue; there are also some message queues that use the memory message queue + database to achieve message persistence, but do not use a trigger method to achieve message synchronization between the message queue and the database.
发明人在实现本发明的过程中,发现现有技术至少存在以下缺陷:In the process of realizing the present invention, the inventor finds that the prior art has at least the following defects:
内存消息队列降低了系统的可靠性,一旦出现掉电、意外死机,或者程序崩溃等情况,无法保证消息的完整性,还可能丢掉大量未来得及处理的消息;而使用内存消息队列+文件系统或内存消息队列+数据库的方式实现消息的持久性时,效率很低,而且实现复杂,不易使用。The memory message queue reduces the reliability of the system. In case of power failure, unexpected crash, or program crash, the integrity of the message cannot be guaranteed, and a large number of messages that cannot be processed in the future may be lost; while using the memory message queue + file system or When the memory message queue + database method is used to achieve message persistence, the efficiency is very low, and the implementation is complicated and difficult to use.
发明内容 Contents of the invention
本发明实施例提供了一种通过消息队列处理消息的方法、装置和系统,用于兼顾消息队列的性能和可靠性。Embodiments of the present invention provide a method, device and system for processing messages through a message queue, which are used to balance the performance and reliability of the message queue.
本发明实施例提供了一种通过消息队列处理消息的方法,包括以下步骤:An embodiment of the present invention provides a method for processing messages through a message queue, including the following steps:
接收来自消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并根据所述消息中的标记位将所述消息添加到数据库中;Receive a message from the message production device, send the message to the first message queue in the memory, and add the message to the database according to the flag bit in the message;
将所述数据库中的消息发送到内存中的第二消息队列,所述第二消息队列用于存储供消息使用设备获取并处理的消息;Sending the messages in the database to a second message queue in memory, where the second message queue is used to store messages acquired and processed by the message using device;
接收所述消息使用设备处理过的消息,将所述消息发送到所述第一消息队列中,并根据所述处理过的消息的标记位从所述数据库中删除或修改所述处理过的消息。receiving the message processed by the message usage device, sending the message to the first message queue, and deleting or modifying the processed message from the database according to the tag bit of the processed message .
本发明实施例还提供了一种消息队列服务器,包括:The embodiment of the present invention also provides a message queue server, including:
收发模块,用于接收来自消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并接收消息使用设备处理过的消息,将所述处理过的消息发送到所述第一消息队列中;The transceiver module is configured to receive a message from a message production device, send the message to the first message queue in the memory, receive a message processed by the message usage device, and send the processed message to the first message queue. in the message queue;
数据库模块,用于存储消息,并将所述消息发送到内存中的第二消息队列;A database module, configured to store messages, and send the messages to the second message queue in memory;
添加模块,与所述数据库模块和所述收发模块连接,用于根据所述收发模块发送到所述第一消息队列中的来自所述消息生产设备的消息的标记位,将所述消息添加到所述数据库模块中;An adding module, connected to the database module and the transceiver module, configured to add the message to the message according to the flag bit of the message from the message production device sent by the transceiver module to the first message queue In the database module;
修改模块,与所述收发模块和所述数据库模块连接,用于根据所述收发模块发送到所述第一消息队列中的所述消息使用设备处理过的消息的标记位,将所述处理过的消息从所述数据库模块中删除或者在所述数据库模块中修改所述处理过的消息。A modifying module, connected to the transceiver module and the database module, configured to convert the processed The message is deleted from the database module or the processed message is modified in the database module.
本发明实施例还提供了一种通过消息队列处理消息的系统,包括消息生产设备、消息队列服务器和消息使用设备,The embodiment of the present invention also provides a system for processing messages through a message queue, including a message production device, a message queue server, and a message usage device,
所述消息生产设备,用于生成消息,并将所述消息发送到所述消息队列服务器;The message production device is configured to generate a message and send the message to the message queue server;
所述消息队列服务器,用于接收来自所述消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并根据所述消息中的标记位将所述消息添加到数据库中;将所述数据库中的消息发送到内存中的第二消息队列,使消息使用设备获取并处理所述消息;接收所述消息使用设备处理过的消息,将所述消息发送到所述第一消息队列中,并根据所述处理过的消息的标记位从所述数据库中删除或修改所述消息;The message queue server is configured to receive the message from the message production device, send the message to the first message queue in the memory, and add the message to the database according to the flag bit in the message; Send the message in the database to the second message queue in the memory, so that the message using device acquires and processes the message; receives the message processed by the message using device, and sends the message to the first message in the queue, and delete or modify the message from the database according to the flag bit of the processed message;
所述消息使用设备,用于从所述第二消息队列中获取并处理消息,将处理完毕的消息添加标记位,并发送到所述第一消息队列。The message usage device is configured to acquire and process messages from the second message queue, add a flag to the processed messages, and send them to the first message queue.
与现有技术相比,本发明实施例具有以下优点:利用内存消息队列高并发和高吞吐量的优点,提高了处理消息的效率,并保证了数据的可靠性,完全保留了数据库的事务性优点,除了完成消息的分发和传递,还保留消息的处理结果,随时可供其他用途;由于消息实时的保存在数据库,而数据库可以很方便的进行各种操作,便于跟踪当前消息队列的使用情况,发现故障和进行优化。Compared with the prior art, the embodiment of the present invention has the following advantages: the advantages of high concurrency and high throughput of the memory message queue are used to improve the efficiency of processing messages, ensure the reliability of data, and fully retain the transactional nature of the database Advantages: In addition to completing the distribution and delivery of messages, the processing results of messages are also retained, which can be used for other purposes at any time; since the messages are stored in the database in real time, and the database can easily perform various operations, which is convenient for tracking the current usage of the message queue , fault detection and optimization.
附图说明 Description of drawings
为了更清楚地说明本发明实施例或现有技术中的技术方案,下面将对本发明实施例或现有技术描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本发明的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动性的前提下,还可以根据这些附图获得其他的附图。In order to more clearly illustrate the technical solutions in the embodiments of the present invention or the prior art, the following will briefly introduce the drawings that need to be used in the description of the embodiments of the present invention or the prior art. Obviously, the accompanying drawings in the following description These are only some embodiments of the present invention, and those skilled in the art can also obtain other drawings based on these drawings without any creative effort.
图1为本发明实施例中的一种通过消息队列处理消息的方法流程图;FIG. 1 is a flowchart of a method for processing messages through a message queue in an embodiment of the present invention;
图2为本发明实施例应用场景中的一种通过消息队列处理消息的系统结构示意图;FIG. 2 is a schematic structural diagram of a system for processing messages through a message queue in an application scenario of an embodiment of the present invention;
图3为本发明实施例应用场景中的通过消息队列处理消息的流程图;FIG. 3 is a flow chart of processing messages through a message queue in an application scenario of an embodiment of the present invention;
图4为本发明实施例中的一种消息队列服务器的结构示意图;FIG. 4 is a schematic structural diagram of a message queue server in an embodiment of the present invention;
图5为本发明实施例应用场景中的消息队列服务器的结构示意图;FIG. 5 is a schematic structural diagram of a message queue server in an application scenario according to an embodiment of the present invention;
图6为本发明实施例中的一种通过消息队列处理消息的系统结构示意图。FIG. 6 is a schematic structural diagram of a system for processing messages through a message queue in an embodiment of the present invention.
具体实施方式 Detailed ways
本发明实施例提供的技术方案中,通过触发来实现数据库和消息队列之间的同步,利用内存消息队列的高性能和数据库的高可靠性,将内存消息队列和数据可的优点结合起来,实现了一种以数据库作为持久化工具的消息队列,解决了性能和可靠性问题。In the technical solution provided by the embodiment of the present invention, the synchronization between the database and the message queue is realized by triggering, and the high performance of the memory message queue and the high reliability of the database are used to combine the advantages of the memory message queue and the data security to realize A message queue using a database as a persistent tool is developed, which solves the problems of performance and reliability.
下面将结合本发明实施例中的附图,对本发明实施例的技术方案进行清楚、完整地描述,显然,所描述的实施例是本发明一部分实施例,而不是全部的实施例。基于本发明中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都属于本发明保护的范围。The following will clearly and completely describe the technical solutions of the embodiments of the present invention with reference to the accompanying drawings in the embodiments of the present invention. Obviously, the described embodiments are some of the embodiments of the present invention, but not all of them. Based on the embodiments of the present invention, all other embodiments obtained by persons of ordinary skill in the art without making creative efforts belong to the protection scope of the present invention.
如图1所示,为本发明实施例中的一种通过消息队列处理消息的方法流程图,包括以下步骤:As shown in Figure 1, it is a flow chart of a method for processing messages through a message queue in an embodiment of the present invention, including the following steps:
步骤101,接收来自消息生产设备的消息,将该消息发送到内存中的第一消息队列,并根据消息中的标记位将该消息添加到数据库中。
步骤102,将数据库中的消息发送到内存中的第二消息队列。
其中,第二消息队列用于存储供消息使用设备获取并处理的消息。Wherein, the second message queue is used to store messages acquired and processed by the message using device.
步骤103,接收消息使用设备处理过的消息,将该消息发送到第一消息队列中,并根据该处理过的消息的标记位从数据库中删除或修改该处理过的消息。
如图2所示,为本发明实施例应用场景中的一种通过消息队列处理消息的系统结构示意图,该系统包括消息生产设备100、第一消息队列200、事务处理服务程序300、数据库和数据库触发器400、触发器外部过程500、第二消息队列600和消息使用设备700。其中,第一消息队列200、事务处理服务程序300、数据库400(包括数据库触发器)、触发器外部过程500和第二消息队列600组成完整的触发式数据库持久化的消息队列服务器800,为消息发送设备100和消息使用设备700提供消息交换服务。As shown in FIG. 2, it is a schematic structural diagram of a system for processing messages through a message queue in the application scenario of the embodiment of the present invention. The system includes a
以下结合上述应用场景,对本发明实施例中的通过消息队列处理消息的方法进行详细、具体的描述。The method for processing messages through the message queue in the embodiment of the present invention will be described in detail below in conjunction with the above application scenarios.
如图3所示,为本发明实施例应用场景中的通过消息队列处理消息的流程图,具体包括以下步骤:As shown in Figure 3, it is a flow chart of processing messages through message queues in the application scenario of the embodiment of the present invention, specifically including the following steps:
步骤301,创建第一消息队列200。
具体地,在消息队列服务器800中创建内存消息队列,作为第一消息队列200,该第一消息队列200用于接收来自各个消息生产设备100的消息,并将该消息传送给事务处理服务程序200。Specifically, a memory message queue is created in the message queue server 800 as the
步骤302,消息生产设备100生成消息,将消息发送到第一消息队列200。
其中,该消息携带一个标记位,用于指明如何处理该消息,包括对该消息进行数据库添加、删除、修改操作。由于该消息刚刚由消息生产设备100生成,并未被消息使用设备700处理过,因此,该消息携带的标记位指示将该消息添加到数据库400中。Wherein, the message carries a flag bit, which is used to indicate how to process the message, including adding, deleting, and modifying the message to the database. Since the message has just been generated by the
步骤303,启动事务处理服务程序300。
步骤304,事务处理服务程序300获取第一消息队列200中的消息,根据该消息携带的插入标记,将该消息添加到数据库400中。
步骤305,在数据库400中建立数据库触发器。
步骤306,建立触发器外部过程500。
具体地,建立触发器外部过程500包括,按照数据库系统的外部过程规范建立外触发器外部过程500,以及按照数据库系统规范为数据库系统加载触发器外部过程500。触发器外部过程500用于在数据库400内的消息增加时被触发开始工作,把新增的消息发送到内存。Specifically, establishing the trigger
步骤307,当数据库中的消息发生增加操作时,数据库触发器被触发,将该消息发送到触发器外部过程500。
步骤308,建立第二消息队列600。In
具体地,在消息队列服务器800上创建内存消息队列,作为第二消息队列600,该第二消息队列600用于接收来自触发器外部过程500的消息,并将该消息传送给消息使用设备700。Specifically, an in-memory message queue is created on the message queue server 800 as the
步骤309,触发器外部过程500将消息放入第二消息队列600。
步骤310,消息使用设备700从第二消息队列600中获取并处理消息,将处理完毕的消息添加标记位,并发送到第一消息队列200。
步骤311,事务处理服务程序300获取第一消息队列200中消息,根据该消息携带的修改标记,在数据库400中删除或修改该消息。
具体地,由于该消息已经被消息使用设备700处理过,因此,该消息携带的修改标记将该消息从数据库400中删除,或者在数据库100中修改该消息。Specifically, since the message has been processed by the
需要说明的是,本发明实施例方法可以根据实际需要对各个步骤顺序进行调整。It should be noted that, in the method of the embodiment of the present invention, the order of each step may be adjusted according to actual needs.
本发明实施例提供的技术方案中,消息生成后发送到的第一消息队列200和触发后发送到的第二消息队列600都是内存消息队列,因此,处理速度极快,保证了消息处理的效率;消息进入第一消息队列200后立即被放入数据库400,消息得到可靠的保存。对消息生产设备100和消息使用设备700而言,消息队列服务器800相当于一个普通的内存消息队列,与普通的内存消息队列的不同之处在于,消息使用设备700对消息操作完毕后,要向第一消息队列200发送一个处理后的消息,来删除或者修改数据库400里记录的消息,即只有消息使用设备700正确接收并且处理完消息,明确发出操作指令之后,该消息才会被删除或修改,因此,数据库中的消息不会因为服务器掉电或意外死机等原因而丢失。In the technical solution provided by the embodiment of the present invention, the
本发明实施例在上述实施方式中提供了通过消息队列处理消息的方法和应用场景,相应地,本发明实施例还提供了应用上述通过消息队列处理消息的方法的装置和系统。Embodiments of the present invention provide a method and application scenarios for processing messages through a message queue in the above implementation manners. Correspondingly, embodiments of the present invention also provide an apparatus and system for applying the above method for processing messages through a message queue.
如图4所示,为本发明实施例中的一种消息队列服务器的结构示意图,包括:As shown in Figure 4, it is a schematic structural diagram of a message queue server in an embodiment of the present invention, including:
收发模块410,用于接收来自消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并接收消息使用设备处理过的消息,将所述处理过的消息发送到所述第一消息队列中。The
数据库模块420,用于存储消息,并将所述消息发送到内存中的第二消息队列。The
添加模块430,与数据库模块420和收发模块410连接,用于根据收发模块410发送到所述第一消息队列中的来自所述消息生产设备的消息的标记位,将所述消息添加到数据库模块420中。The adding
修改模块440,与收发模块410和数据库模块420连接,用于根据收发模块410发送到所述第一消息队列中的所述消息使用设备处理过的消息的标记位,将所述处理过的消息从数据库模块420中删除或者在数据库模块420中修改上述处理过的消息。The
如图5所示,为本发明实施例应用场景中的消息队列服务器的结构示意图,具体包括创建模块510、数据库模块520、收发模块530、添加模块540和修改模块550,其中,As shown in Figure 5, it is a schematic structural diagram of the message queue server in the application scenario of the embodiment of the present invention, specifically including a creation module 510, a database module 520, a transceiver module 530, an addition module 540 and a modification module 550, wherein,
创建模块510,与数据库模块520和收发模块530连接,用于创建第一消息队列和所述第二消息队列,分别供收发模块530和数据库模块520使用。The creation module 510 is connected with the database module 520 and the transceiver module 530, and is used to create the first message queue and the second message queue, which are respectively used by the transceiver module 530 and the database module 520.
数据库模块520,用于存储消息,并将所述消息发送到内存中的第二消息队列。The database module 520 is configured to store messages and send the messages to the second message queue in memory.
数据库模块520,具体用于通过数据库触发器将所述消息发送到触发器外部过程,并通过所述触发器外部过程将所述消息发送到所述第二消息队列。The database module 520 is specifically configured to send the message to the trigger external process through the database trigger, and send the message to the second message queue through the trigger external process.
收发模块530,用于接收来自消息生产设备的消息,将所述消息发送到内存中的第一消息队列,并接收所述消息使用设备处理过的消息,将所述处理过的消息发送到所述第一消息队列中。The transceiver module 530 is configured to receive a message from a message production device, send the message to the first message queue in the memory, receive a message processed by the message consuming device, and send the processed message to the in the first message queue.
添加模块540,与数据库模块520和收发模块530连接,用于根据收发模块530发送到所述第一消息队列中的来自所述消息生产设备的消息的标记位,将所述消息添加到数据库模块520中。The adding module 540 is connected with the database module 520 and the transceiver module 530, and is used to add the message to the database module according to the flag bit of the message from the message production device sent by the transceiver module 530 to the first message queue 520 in.
上述添加模块540,具体用于根据所述第一消息队列中的消息携带的插入标记,将所述第一消息队列中消息添加到数据库模块520中。The adding module 540 is specifically configured to add the message in the first message queue to the database module 520 according to the insertion marker carried by the message in the first message queue.
修改模块550,与收发模块530和数据库模块520连接,用于根据收发模块530发送到所述第一消息队列中的所述消息使用设备处理过的消息的标记位,将所述处理过的消息从数据库模块520中删除或者在数据库模块520中修改上述处理过的消息。The modification module 550 is connected with the transceiver module 530 and the database module 520, and is configured to convert the processed message to The above-mentioned processed messages are deleted from the database module 520 or modified in the database module 520 .
本发明实施例充分利用了内存消息队列高并发和高吞吐量的优点,提高了处理消息的效率,并保证了数据的可靠性,完全保留了数据库的事务性优点,除了完成消息的分发和传递,还保留消息的处理结果,随时可供其他用途;由于消息实时的保存在数据库,而数据库可以很方便的进行各种操作,便于跟踪当前消息队列的使用情况,发现故障和进行优化。The embodiment of the present invention makes full use of the advantages of high concurrency and high throughput of the memory message queue, improves the efficiency of message processing, ensures the reliability of data, and fully retains the transactional advantages of the database. In addition to completing the distribution and delivery of messages , also retains the processing results of the message, which can be used for other purposes at any time; because the message is stored in the database in real time, and the database can easily perform various operations, which is convenient for tracking the current usage of the message queue, finding faults and optimizing.
如图6所示,为本发明实施例中的一种通过消息队列处理消息的系统结构示意图,包括消息生产设备610、消息队列服务器620和消息使用设备630,其中,As shown in FIG. 6, it is a schematic structural diagram of a system for processing messages through a message queue in an embodiment of the present invention, including a
消息生产设备610,用于生成消息,并将所述消息发送到所述消息队列服务器620。The
消息队列服务器620,用于接收来自所述消息生产设备610的消息,将所述消息发送到内存中的第一消息队列,并根据所述消息中的标记位将所述消息添加到数据库中;将所述数据库中的消息发送到内存中的第二消息队列,使消息使用设备630获取并处理所述消息;接收所述消息使用设备处理过的消息,将所述消息发送到所述第一消息队列中,并根据所述处理过的消息的标记位从所述数据库中删除或修改所述消息;The
消息使用设备630,用于从所述第二消息队列中获取并处理消息,将处理完毕的消息添加标记位,并发送到所述第一消息队列。The
上述消息队列服务器620,还用于创建内存消息队列,将所述内存消息队列作为所述第一消息队列。The above-mentioned
上述消息队列服务器620,还用于启动事物处理服务程序,所述事物处理服务程序用于根据所述第一消息队列中的消息携带的标记位将所述消息添加到数据库中。The above-mentioned
上述消息队列服务器620,还用于启动事物处理服务程序,所述事物处理服务程序用于根据所述第一消息队列中的消息携带的标记位,对所述数据库中的消息进行删除或修改。The above-mentioned
上述消息队列服务器620,还用于在所述数据库中建立数据库触发器,并建立触发器外部过程,所述数据库触发器用于将所述数据库中的消息发送到所述触发器外部过程,所述触发器外部过程用于将来自所述数据库触发器的消息发送到所述第二消息队列。The above-mentioned
上述消息队列服务器620,还用于创建内存消息队列,将所述内存消息队列作为所述第二消息队列。The above-mentioned
本发明实施例充分利用了内存消息队列高并发和高吞吐量的优点,效率远远超过使用文件系统持久化或者单纯以数据库为基础实现的消息队列,在内部实现中,事务处理服务程序和数据库之间采用连接池,减少数据库并发,而又充分利用数据库性能,采用的是触发式操作,比传统的轮询或者其他方式效率更高,CPU利用率更低;采用数据库作为持久化媒介,充分利用了数据库的可靠性优点,通过利用数据库自身携带的备份恢复功能,更保证了数据的可靠性;完全保留了数据库的事务性优点,除了完成消息的分发和传递,还保留消息的处理结果,随时可供其他用途;由于消息实时的保存在数据库,而数据库可以很方便的进行各种操作,便于跟踪当前消息队列的使用情况,发现故障和进行优化。The embodiments of the present invention make full use of the advantages of high concurrency and high throughput of memory message queues, and the efficiency far exceeds that of using file system persistence or message queues implemented solely on the basis of databases. In internal implementation, transaction processing service programs and databases The connection pool is used among them to reduce database concurrency and make full use of database performance. It adopts trigger operation, which is more efficient than traditional polling or other methods, and the CPU utilization rate is lower; using database as a persistent medium makes full use of Utilizing the reliability advantages of the database, by using the backup and recovery function carried by the database itself, the reliability of the data is guaranteed; the transactional advantages of the database are completely preserved, and in addition to completing the distribution and delivery of messages, it also retains the processing results of messages. It can be used for other purposes at any time; because the messages are stored in the database in real time, and the database can be easily used for various operations, which is convenient for tracking the current usage of the message queue, finding faults and optimizing them.
通过以上的实施方式的描述,本领域的技术人员可以清楚地了解到本发明可借助软件加必需的通用硬件平台的方式来实现,当然也可以通过硬件,但很多情况下前者是更佳的实施方式。基于这样的理解,本发明的技术方案本质上或者说对现有技术做出贡献的部分可以以软件产品的形式体现出来,该计算机软件产品存储在一个存储介质中,包括若干指令用以使得一台终端设备(可以是手机,个人计算机,服务器,或者网络设备等)执行本发明各个实施例所述的方法。Through the description of the above embodiments, those skilled in the art can clearly understand that the present invention can be implemented by means of software plus a necessary general-purpose hardware platform, and of course also by hardware, but in many cases the former is a better implementation Way. Based on this understanding, the essence of the technical solution of the present invention or the part that contributes to the prior art can be embodied in the form of a software product. The computer software product is stored in a storage medium and includes several instructions to make a A terminal device (which may be a mobile phone, a personal computer, a server, or a network device, etc.) executes the methods described in various embodiments of the present invention.
以上所述仅是本发明的优选实施方式,应当指出,对于本技术领域的普通技术人员来说,在不脱离本发明原理的前提下,还可以做出若干改进和润饰,这些改进和润饰也应视本发明的保护范围。The above is only a preferred embodiment of the present invention, it should be pointed out that, for those of ordinary skill in the art, without departing from the principle of the present invention, some improvements and modifications can also be made, and these improvements and modifications can also be made. It should be regarded as the protection scope of the present invention.
本领域技术人员可以理解实施例中的装置中的模块可以按照实施例描述进行分布于实施例的装置中,也可以进行相应变化位于不同于本实施例的一个或多个装置中。上述实施例的模块可以集成于一体,也可以分离部署;可以合并为一个模块,也可以进一步拆分成多个子模块。Those skilled in the art can understand that the modules in the device in the embodiment can be distributed in the device in the embodiment according to the description in the embodiment, or can be located in one or more devices different from the embodiment according to corresponding changes. The modules in the above embodiments can be integrated or deployed separately; they can be combined into one module, or further split into multiple sub-modules.
上述本发明实施例序号仅仅为了描述,不代表实施例的优劣。The serial numbers of the above embodiments of the present invention are for description only, and do not represent the advantages and disadvantages of the embodiments.
以上公开的仅为本发明的几个具体实施例,但是,本发明并非局限于此,任何本领域的技术人员能思之的变化都应落入本发明的保护范围。The above disclosures are only a few specific embodiments of the present invention, however, the present invention is not limited thereto, and any changes conceivable by those skilled in the art shall fall within the protection scope of the present invention.
Claims (10)
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN200910092421.0A CN102023974B (en) | 2009-09-14 | 2009-09-14 | A method, device and system for processing messages through a message queue |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN200910092421.0A CN102023974B (en) | 2009-09-14 | 2009-09-14 | A method, device and system for processing messages through a message queue |
Publications (2)
Publication Number | Publication Date |
---|---|
CN102023974A CN102023974A (en) | 2011-04-20 |
CN102023974B true CN102023974B (en) | 2012-08-22 |
Family
ID=43865282
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN200910092421.0A Expired - Fee Related CN102023974B (en) | 2009-09-14 | 2009-09-14 | A method, device and system for processing messages through a message queue |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN102023974B (en) |
Cited By (1)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN102890631A (en) * | 2012-09-13 | 2013-01-23 | 新浪网技术(中国)有限公司 | Method for transmitting message based on persistent message queue and message transmission device |
Families Citing this family (15)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN103503388B (en) * | 2011-09-01 | 2016-08-03 | 华为技术有限公司 | A kind of distributed queue's message read method and equipment, system |
CN102306197B (en) * | 2011-09-22 | 2013-07-03 | 用友软件股份有限公司 | Device and method for guaranteeing consistency of data-source-crossing operation results |
CN102970353B (en) * | 2012-11-08 | 2015-04-08 | 大唐软件技术股份有限公司 | Method and system for business data processing |
CN103064731A (en) * | 2012-12-26 | 2013-04-24 | 人民搜索网络股份公司 | Device and method for improving message queue system performance |
CN104216659A (en) * | 2013-05-30 | 2014-12-17 | 中兴通讯股份有限公司 | Off-line message storage method and servers |
CN104796456B (en) * | 2015-03-17 | 2019-06-28 | 青岛海尔智能家电科技有限公司 | A kind of message treatment method and device |
CN105159657B (en) * | 2015-06-12 | 2019-03-08 | 北京京东尚科信息技术有限公司 | Handle the method and system of message |
CN105471714A (en) * | 2015-12-09 | 2016-04-06 | 百度在线网络技术(北京)有限公司 | Message processing method and device |
CN107665139B (en) * | 2016-07-29 | 2021-07-27 | 北京新唐思创教育科技有限公司 | Implementation method and device for real-time bidirectional rendering in online teaching |
CN107277022B (en) * | 2017-06-27 | 2020-03-13 | 中国联合网络通信集团有限公司 | Process marking method and device |
CN108092918A (en) * | 2017-12-07 | 2018-05-29 | 长城计算机软件与系统有限公司 | A kind of method for message transmission and system |
CN109408203B (en) * | 2018-11-01 | 2019-10-18 | 无锡华云数据技术服务有限公司 | A kind of implementation method, device, the computing system of queue message consistency |
CN110109799A (en) * | 2019-03-29 | 2019-08-09 | 北京奇安信科技有限公司 | A kind of real time monitoring processing method and processing device of computing resource operation conditions |
CN110287046A (en) * | 2019-07-03 | 2019-09-27 | 浪潮云信息技术有限公司 | Business service message method and system based on queue |
CN111309467B (en) * | 2020-02-24 | 2023-07-14 | 拉扎斯网络科技(上海)有限公司 | Task distribution method and device, electronic device and storage medium |
Citations (3)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN1482766A (en) * | 2002-09-13 | 2004-03-17 | 华为技术有限公司 | A Method of Automatically Generating Network Management Report |
CN1561022A (en) * | 2004-03-04 | 2005-01-05 | 中兴通讯股份有限公司 | Method of internal data base main-spare synchronous |
US6970945B1 (en) * | 1999-11-01 | 2005-11-29 | Seebeyond Technology Corporation | Systems and methods of message queuing |
-
2009
- 2009-09-14 CN CN200910092421.0A patent/CN102023974B/en not_active Expired - Fee Related
Patent Citations (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US6970945B1 (en) * | 1999-11-01 | 2005-11-29 | Seebeyond Technology Corporation | Systems and methods of message queuing |
US20060053425A1 (en) * | 1999-11-01 | 2006-03-09 | Seebeyond Technology Corporation, A California Corporation | System and method of intelligent queuing |
CN1482766A (en) * | 2002-09-13 | 2004-03-17 | 华为技术有限公司 | A Method of Automatically Generating Network Management Report |
CN1561022A (en) * | 2004-03-04 | 2005-01-05 | 中兴通讯股份有限公司 | Method of internal data base main-spare synchronous |
Cited By (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN102890631A (en) * | 2012-09-13 | 2013-01-23 | 新浪网技术(中国)有限公司 | Method for transmitting message based on persistent message queue and message transmission device |
CN102890631B (en) * | 2012-09-13 | 2016-12-07 | 新浪网技术(中国)有限公司 | Method based on persistent message queue transmission message and massage transmission device |
Also Published As
Publication number | Publication date |
---|---|
CN102023974A (en) | 2011-04-20 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN102023974B (en) | A method, device and system for processing messages through a message queue | |
KR102006513B1 (en) | Application consistent snapshots of a shared volume | |
CN103312624B (en) | A kind of Message Queuing Services system and method | |
CN103516580B (en) | A kind of method and system that message sink and forwarding are realized based on message queue | |
US8886787B2 (en) | Notification for a set of sessions using a single call issued from a connection pool | |
CN112671760A (en) | Socket-based client cross-platform network communication method and related equipment thereof | |
CN112558875B (en) | Data verification method, device, electronic device and storage medium | |
CN102521712A (en) | Process instance data processing method and device | |
CN111343102B (en) | Flow rate control method, server, client server and system | |
CN103049317A (en) | Highly-concurrent data fast write system and method based on queues in cloud environment | |
CN102497427A (en) | Method and device for realizing data acquisition services of renewable energy source monitoring system | |
CN113852610B (en) | Message processing method, device, computer equipment and storage medium | |
CN116301568A (en) | A data access method, device and equipment | |
CN104866528B (en) | Multi-platform data acquisition method and system | |
CN113672410A (en) | Data processing method and electronic device | |
US10318385B2 (en) | Service recovery using snapshots and interservice messages | |
CN115510036A (en) | Data migration method, device, equipment and storage medium | |
CN116662035A (en) | Method and device for processing transaction message of message queue | |
CN111708835B (en) | Blockchain data storage method and device | |
CN108241616B (en) | Message pushing method and device | |
CN115914380A (en) | Communication Delay Optimization Method for Cloud Computing Resource Manager Based on Zlib Compression Algorithm | |
CN106557530B (en) | Operation system, data recovery method and device | |
KR102850653B1 (en) | Method and apparatus for linking multiple service in distributed server cluster | |
CN117632445A (en) | Request processing method and device, task execution method and device | |
CN103944875A (en) | Data exchanging method and data exchanging system |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
C06 | Publication | ||
PB01 | Publication | ||
C10 | Entry into substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
C14 | Grant of patent or utility model | ||
GR01 | Patent grant | ||
CF01 | Termination of patent right due to non-payment of annual fee |
Granted publication date: 20120822 Termination date: 20210914 |
|
CF01 | Termination of patent right due to non-payment of annual fee |