[go: up one dir, main page]

CN114461413A - Message track information processing method, system, terminal and storage medium of message queue - Google Patents

Message track information processing method, system, terminal and storage medium of message queue Download PDF

Info

Publication number
CN114461413A
CN114461413A CN202111520978.7A CN202111520978A CN114461413A CN 114461413 A CN114461413 A CN 114461413A CN 202111520978 A CN202111520978 A CN 202111520978A CN 114461413 A CN114461413 A CN 114461413A
Authority
CN
China
Prior art keywords
message
track information
message queue
module
storage module
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Pending
Application number
CN202111520978.7A
Other languages
Chinese (zh)
Inventor
李壮壮
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Shanghai Hujin Information Technology Co ltd
Original Assignee
Shanghai Hujin Information Technology Co ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Shanghai Hujin Information Technology Co ltd filed Critical Shanghai Hujin Information Technology Co ltd
Priority to CN202111520978.7A priority Critical patent/CN114461413A/en
Publication of CN114461413A publication Critical patent/CN114461413A/en
Pending legal-status Critical Current

Links

Images

Classifications

    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/54Interprogram communication
    • G06F9/546Message passing systems or structures, e.g. queues
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F12/00Accessing, addressing or allocating within memory systems or architectures
    • G06F12/02Addressing or allocation; Relocation
    • G06F12/08Addressing or allocation; Relocation in hierarchically structured memory systems, e.g. virtual memory systems
    • G06F12/0802Addressing of a memory level in which the access to the desired data or data block requires associative addressing means, e.g. caches
    • G06F12/0866Addressing of a memory level in which the access to the desired data or data block requires associative addressing means, e.g. caches for peripheral storage systems, e.g. disk cache

Landscapes

  • Engineering & Computer Science (AREA)
  • Theoretical Computer Science (AREA)
  • Software Systems (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • Debugging And Monitoring (AREA)

Abstract

The invention provides a message track information processing method, a system, a terminal and a storage medium of a message queue, wherein the method comprises the following steps: packaging the message tracks of the message queue into track information, and coding and serializing the track information; sending the track information after the coding and the serialization to a message queue for caching; deserializing and decoding the track information cached in the message queue, and analyzing the track information into objects which can be identified by a storage module; and calling an application program interface of the storage module and storing the object for query.

Description

消息队列的消息轨迹信息处理方法、系统、终端和存储介质Method, system, terminal and storage medium for processing message track information of message queue

技术领域technical field

本申请涉及一种消息队列的消息轨迹信息处理方法、系统、终端和存储介质。The present application relates to a message track information processing method, system, terminal and storage medium of a message queue.

背景技术Background technique

消息队列是分布式系统中重要的组件,目前,使用较多的消息队列有RabbitMQ、Kafka、ZeroMQ、ActiveMQ等。现有基于消息队列的生产者和消费者的应用中,上游消息生产者发送消息并发送到消息队列,下游消息消费者从消费队列中消费消息进行数据处理。The message queue is an important component in the distributed system. At present, RabbitMQ, Kafka, ZeroMQ, ActiveMQ, etc. are widely used. In the existing application of producers and consumers based on message queues, upstream message producers send messages and send them to message queues, and downstream message consumers consume messages from consumption queues for data processing.

但是现实应用中容易出现异常场景,例如上游生产者反馈发送了消息,但是下游消费者没有消费到,或者下游消费者对相同的消息消费了多次等等。这会导致消息队列传递和调用的不准确、不稳定。当这种异常场景出现时,上下游的消息记录就显得尤为必须和重要。However, in real applications, abnormal scenarios are prone to occur. For example, the upstream producer sends a message, but the downstream consumer does not consume it, or the downstream consumer consumes the same message multiple times. This will lead to inaccurate and unstable message queue delivery and invocation. When such an abnormal scenario occurs, the upstream and downstream message records are particularly necessary and important.

基于此,消息队列提供了轨迹插件(Tracing plugin),在消息队列集群上通过命令就可以开启轨迹插件,将轨迹文件记录(tracer log)文件储存在消息队列集群磁盘上,后台通过页面配置后,可查询Exchange和Queue的消息轨迹。但是轨迹插件提供的功能过于简单,轨迹内容无法定制化,无法搜索,且Tracer log文件保存在集群磁盘上,对集群的稳定性会带来风险。如果集群的消息较多并且监控的轨迹较多,那么Tracer log文件会占用大量的磁盘,而且需要定期清理,给维护工作带来很大的挑战。Tracer log文件是纯文本,不方便进行历史消息的查询,其内容格式固定,缺乏关键信息,比如Message ID,RequestID等用户关注的信息,无法通过关键信息进行精确查找。Based on this, the message queue provides a tracking plugin (Tracing plugin), which can be opened on the message queue cluster through commands, and the tracer log file is stored on the disk of the message queue cluster. After the background is configured through the page, You can query the message traces of Exchange and Queue. However, the functions provided by the trace plug-in are too simple, the content of the trace cannot be customized or searched, and the tracer log file is saved on the cluster disk, which will bring risks to the stability of the cluster. If there are many messages in the cluster and many tracks are monitored, the Tracer log file will occupy a large amount of disks, and it needs to be cleaned regularly, which brings great challenges to the maintenance work. Tracer log files are plain text, which is inconvenient to query historical messages. The content format is fixed and lacks key information, such as Message ID, RequestID and other information that users are concerned about, and cannot be accurately searched through key information.

发明内容SUMMARY OF THE INVENTION

下文呈现各种示例性技术方案的概述。在以下概述中可以进行一些简化和省略,其意在突出并介绍各种示例性技术方案的一些方面,但不限制本发明的范围。将在后续部分呈现足以允许本领域的普通技术人员产生并使用本发明概念的示例性技术方案的详细描述。An overview of various exemplary technical solutions is presented below. Some simplifications and omissions may be made in the following summary, which is intended to highlight and introduce some aspects of various exemplary solutions, but not to limit the scope of the invention. A detailed description of exemplary solutions sufficient to allow one of ordinary skill in the art to make and use the inventive concepts will be presented in subsequent sections.

本发明的技术方案提供一种消息队列的消息轨迹信息处理方法、系统、终端和存储介质,可以提供大容量且稳定的轨迹信息的存储,并且实现轨迹信息内容的精确便捷地查找。The technical scheme of the present invention provides a message track information processing method, system, terminal and storage medium of a message queue, which can provide large-capacity and stable track information storage, and realize accurate and convenient search of track information content.

技术方案提供了一种消息队列的消息轨迹信息处理方法,包括:将消息队列的消息轨迹封装成轨迹信息并进行编码和序列化;将所述编码和序列化后的轨迹信息发送至消息队列进行缓存;将所述消息队列中缓存的轨迹信息进行反序列化和解码,并解析成存储模块能识别的对象;调用存储模块的应用程序接口并存储所述对象,以供查询。The technical solution provides a message track information processing method of a message queue, comprising: encapsulating the message track of the message queue into track information and encoding and serializing it; sending the encoded and serialized track information to the message queue for processing Cache; deserialize and decode the track information cached in the message queue, and parse it into an object that can be identified by the storage module; call the application program interface of the storage module and store the object for query.

在一个技术方案中,所述方法还包括通过以下至少之一的对象参数查询所述对象:资源类型、资源名称、消息ID、消息路由键、消息Tag、Request ID、Polaris trace ID、消息时间范围。In one technical solution, the method further includes querying the object through at least one of the following object parameters: resource type, resource name, message ID, message routing key, message Tag, Request ID, Polaris trace ID, message time range .

在一个技术方案中,所述消息队列的消息轨迹包括以下至少一种:消息发送时间、生产者IP、发送耗时、消费者IP、消息id、消息key、接收到消息的时间、开始消费时间、消费完成时间、消费结果状态、消费耗时。In a technical solution, the message track of the message queue includes at least one of the following: message sending time, producer IP, sending time, consumer IP, message id, message key, time when the message was received, and start consumption time , consumption completion time, consumption result status, consumption time.

在一个技术方案中,所述存储模块包括以下至少一种:Elasticsearch、HIVE、Mysql、DB。In a technical solution, the storage module includes at least one of the following: Elasticsearch, HIVE, Mysql, and DB.

在一个技术方案中,所述消息队列包括以下至少一种:ActiveMQ、RabbitMQ、ZeroMQ、Kafka。In a technical solution, the message queue includes at least one of the following: ActiveMQ, RabbitMQ, ZeroMQ, and Kafka.

在一个技术方案中,所述将消息队列的消息轨迹封装成轨迹信息包括将消息轨迹封装成以下至少之一:Java对象、C语言对象、C++对象、Python对象。In a technical solution, the encapsulating the message trace of the message queue into trace information includes encapsulating the message trace into at least one of the following: Java objects, C language objects, C++ objects, and Python objects.

在一个技术方案中,所述编码是通过开发工具包进行编码。通过编码压缩数据,从而减少轨迹信息对业务端带宽的占用。In a technical solution, the coding is performed through a development kit. The data is compressed by encoding, thereby reducing the occupation of the bandwidth of the service end by the trajectory information.

技术方案还提供了一种消息队列的消息轨迹信息处理系统,包括:消息处理模块,被配置为将消息队列的消息封装成轨迹信息并进行编码和序列化,并将所述编码和序列化后的轨迹信息发送至消息队列;消息队列模块,被配置为接收并缓存所述编码和序列化后的轨迹信息;轨迹服务模块,被配置为将所述消息队列中缓存的轨迹信息进行解码、反序列化、解析成存储模块能识别的对象、调用存储模块的应用程序接口,并将所述对象发送到存储模块;存储模块,存储所述对象以供查询。The technical solution also provides a message track information processing system for a message queue, comprising: a message processing module configured to encapsulate the message of the message queue into track information, encode and serialize it, and encode and serialize the encoded and serialized messages. The track information is sent to the message queue; the message queue module is configured to receive and cache the encoded and serialized track information; the track service module is configured to decode and reverse the track information cached in the message queue. Serialize and parse into an object that can be recognized by the storage module, call the application program interface of the storage module, and send the object to the storage module; the storage module stores the object for query.

在一个技术方案中,所述的消息队列的消息轨迹信息处理系统,还包括:查询模块,被配置为调用存储模块的应用程序接口查询所述对象。In a technical solution, the message track information processing system of the message queue further comprises: a query module, configured to call the application program interface of the storage module to query the object.

在一个技术方案中,所述消息处理模块是软件工具开发包。In a technical solution, the message processing module is a software tool development kit.

在一个技术方案中,所述轨迹服务模块是可执行档案文件。In one technical solution, the trajectory service module is an executable archive file.

技术方案还提供了一种终端,包括:处理器、存储器以及存储在存储器上运行的计算机程序,所述处理器执行计算机程序时实现上述任一技术方案方法的步骤。The technical solution also provides a terminal, comprising: a processor, a memory, and a computer program stored and run on the memory, and the processor implements the steps of any of the foregoing technical solutions when the processor executes the computer program.

技术方案还提供了一种计算机可读存储介质,所述计算机程序被处理器执行时实现上述任一技术方案方法的步骤。The technical solution also provides a computer-readable storage medium, and when the computer program is executed by a processor, the steps of any of the above-mentioned technical solutions are implemented.

本发明具有以下有益效果:为消息轨迹信息提供更稳定可靠的存储,且不会对消息队列集群造成影响,并且能提供更方便的查询和定位。The invention has the following beneficial effects: providing more stable and reliable storage for message track information, without affecting the message queue cluster, and providing more convenient query and positioning.

技术方案的轨迹信息并不直接发到消息队列中存储,也不直接发到存储模块中存储。而是先发送给性能更好单独的消息队列集群上进行缓存,再解码后发送给存储模块,这样,消息与存储模块解耦了,也与其他消息队列集群解耦了。这样的优点在于,即使更换存储模块的种类,对消息处理模块是没有影响的,不需要进行升级、更改设置等操作,方便日后扩展。通过单独的存储模块进行存储,比存储在消息队列中更为稳定可靠,数据不易丢失、存储容量也更大,并且不占用消息队列集群的资源,从而提高了消息队列集群的性能。技术方案还提供了更为强大的查找功能,可以通过关键信息更便捷快速地查找轨迹信息。The track information of the technical solution is not directly sent to the message queue for storage, nor is it directly sent to the storage module for storage. Instead, it is sent to a separate message queue cluster with better performance for caching, and then decoded and sent to the storage module. In this way, the message is decoupled from the storage module and from other message queue clusters. The advantage of this is that even if the type of the storage module is changed, the message processing module is not affected, and operations such as upgrading and changing settings are not required, which is convenient for future expansion. Storage through a separate storage module is more stable and reliable than storing in a message queue, data is not easily lost, the storage capacity is larger, and it does not occupy the resources of the message queue cluster, thereby improving the performance of the message queue cluster. The technical solution also provides a more powerful search function, which can search for trajectory information more conveniently and quickly through key information.

附图说明Description of drawings

为了更好地理解各种示例性实施例,可以参考附图,在附图中:For a better understanding of the various exemplary embodiments, reference may be made to the accompanying drawings, in which:

图1示出了实施例提供的一种消息队列的消息轨迹信息处理方法的流程示意图;1 shows a schematic flowchart of a method for processing message track information of a message queue provided by an embodiment;

图2示出了实施例提供的一种消息队列的消息轨迹信息处理系统的结构示意图;并且FIG. 2 shows a schematic structural diagram of a message track information processing system of a message queue provided by an embodiment; and

图3A和3B示出了实施例提供的一种消息队列的消息轨迹信息处理系统的查询界面示意图;3A and 3B show a schematic diagram of a query interface of a message track information processing system of a message queue provided by an embodiment;

图4示出了实施例提供的一种消息队列的消息轨迹信息处理系统的查询结果示意图。FIG. 4 shows a schematic diagram of a query result of a message track information processing system of a message queue provided by an embodiment.

为了便于理解,相同的附图标记已用于指代具有基本上相同或类似结构和/或基本上相同或类似功能的元件。To facilitate understanding, the same reference numerals have been used to refer to elements having substantially the same or similar structure and/or substantially the same or similar function.

具体实施方式Detailed ways

描述和图式示出了本发明的原理。因此,将了解,本领域的技术人员将能够设计各种布置,尽管本文中未明确地描述或示出所述布置,但所述布置体现本发明的原理且包括在本发明的范围内。此外,本文中所引述的所有例子主要旨在明确地用于教学目的,以帮助读者理解本发明的原理和由发明人提供的用以深化本领域的概念,并且所有例子应视为并不限于此类特定引述的例子和条件。另外,如本文中所使用,除非另有指示(例如,“或另外”或“或在替代方案中”),否则术语“或”是指非排他性的或(即,和/或)。并且,本文中所描述的各种实施例不一定相互排斥,因为一些实施例可以与一个或多个其它实施例组合以形成新的实施例。The description and drawings illustrate the principles of the invention. Accordingly, it will be appreciated that those skilled in the art will be able to devise various arrangements that, although not expressly described or illustrated herein, embody the principles of and are included within the scope of the invention. Furthermore, all examples cited herein are primarily intended to be expressly used for teaching purposes to assist the reader in understanding the principles of the invention and the concepts provided by the inventors to deepen the art, and all examples should be construed as not limiting Examples and conditions of such specific citations. Also, as used herein, the term "or" refers to a non-exclusive or (ie, and/or) unless otherwise indicated (eg, "or otherwise" or "or in the alternative"). Also, the various embodiments described herein are not necessarily mutually exclusive, as some embodiments can be combined with one or more other embodiments to form new embodiments.

图1示出了实施例提供的一种消息队列的消息轨迹信息处理方法,包括以下步骤:FIG. 1 shows a method for processing message trace information of a message queue provided by an embodiment, including the following steps:

步骤S101,将消息队列的消息轨迹封装成轨迹信息并进行编码和序列化。Step S101, encapsulate the message track of the message queue into track information, and encode and serialize it.

在本实施例中,可以由软件工具开发包(SDK)或其他适合的工具将消息轨迹拼装成Java对象或其他适合的对象,例如C语言、C++、Python等。软件工具开发包(SDK)可以包括生产者软件工具包(Producer SDK)和消费者软件工具包(Consumer SDK)。本实施例所指的消息包括上游生产者发送的消息和下游消费者消费的消息。消息轨迹包括以下至少一种:消息发送时间、生产者IP、发送结果状态、发送耗时、接收消息时间、消费者IP、消息id、消息key、开始消费时间、消费完成时间、消费结果状态、消费耗时等。本实施例可以在发送端和消费端做处理,将消息发送前、消息发送后、消息消费前、消息消费后进行全程的记录,可以清楚地知道消息发送和消费的具体情况。将封装后的轨迹信息Java对象进行编码,可以采用Java开发工具包(Java Development Kit(JDK))进行编码或其他适合的方式进行编码,例如采用其他开发工具包(Development Kit)。编码的过程可以将轨迹信息压缩,从而减少轨迹信息对业务端带宽的占用。编码完成后,将编码后的轨迹信息序列化。In this embodiment, a software tool development kit (SDK) or other suitable tools can assemble the message traces into Java objects or other suitable objects, such as C language, C++, Python, and the like. The software tool development kit (SDK) may include a producer software kit (Producer SDK) and a consumer software kit (Consumer SDK). The messages referred to in this embodiment include messages sent by upstream producers and messages consumed by downstream consumers. The message trace includes at least one of the following: message sending time, producer IP, sending result status, sending time, receiving message time, consumer IP, message id, message key, start consumption time, consumption completion time, consumption result status, Consumption time etc. In this embodiment, processing can be performed at the sending end and the consuming end, and the whole process can be recorded before the message is sent, after the message is sent, before the message is consumed, and after the message is consumed, so that the specific situation of the message sending and consumption can be clearly known. The encapsulated track information Java object can be encoded by using a Java Development Kit (Java Development Kit (JDK)) or other suitable methods, such as using other development kits (Development Kit). The encoding process can compress the track information, thereby reducing the occupation of the bandwidth of the service end by the track information. After the encoding is completed, the encoded trajectory information is serialized.

步骤S102,将所述编码和序列化后的轨迹信息发送至消息队列进行缓存。Step S102, sending the encoded and serialized trajectory information to a message queue for buffering.

在本实施例中,消息队列包括以下至少一种:ActiveMQ、RabbitMQ、ZeroMQ、Kafka。优选的,轨迹信息可以发送单独的MQ集群上,可以让轨迹信息与其他MQ集群解耦。轨迹信息可以发送到消息队列单独服务器上交换机里存储。该交换机可以名为production_default_subscription.SYS.MERCURY.TRACE.DATA.rabbitmq。In this embodiment, the message queue includes at least one of the following: ActiveMQ, RabbitMQ, ZeroMQ, and Kafka. Preferably, the trajectory information can be sent to a separate MQ cluster, so that the trajectory information can be decoupled from other MQ clusters. Trajectory information can be sent to the message queue for storage on a separate server on the switch. The switch can be named production_default_subscription.SYS.MERCURY.TRACE.DATA.rabbitmq.

步骤S103,将所述消息队列中缓存的轨迹信息进行反序列化和解码,并解析成存储模块能识别的对象。Step S103, deserialize and decode the track information cached in the message queue, and parse it into an object that can be identified by the storage module.

在本实施例中,可通过可执行Java档案文件(Java Archive File)或其他适合的工具,如可执行C语言档案文件等,将所述消息队列中缓存的轨迹信息进行反序列化和解码,并解析成存储模块能识别的对象,然后调用存储模块应用程序编程接口(API)并将该对象发送到存储模块中。In this embodiment, the track information cached in the message queue can be deserialized and decoded by using an executable Java Archive File or other suitable tools, such as an executable C language archive file, etc., And parse it into an object that the storage module can recognize, then call the storage module application programming interface (API) and send the object to the storage module.

步骤S104,调用存储模块的应用程序接口API存储所述对象。Step S104, calling the application program interface API of the storage module to store the object.

在本实施例中,可通过可执行Java档案文件调用存储模块的应用程序接口将所述对象存储在存储模块中,以供查询。存储模块可以选择Elasticsearch、HIVE、Mysql、DB等。优选的,可以使用Elasticsearch(ES)。ES的数据会持久化并且备份,所以不需要担心轨迹信息的丢失,也不需要担心会影响MQ集群的资源。通过单独的存储模块进行存储,比存储在消息队列中更为稳定可靠、不易丢失,存储容量也更大,并且不占用消息队列集群的资源,从而提高了消息队列集群的性能。In this embodiment, the object can be stored in the storage module by invoking the application program interface of the storage module through the executable Java archive file for query. The storage module can choose Elasticsearch, HIVE, Mysql, DB, etc. Preferably, Elasticsearch (ES) can be used. ES data will be persistent and backed up, so there is no need to worry about the loss of trajectory information or affecting the resources of the MQ cluster. Storage through a separate storage module is more stable and reliable than storage in a message queue, less likely to be lost, and has a larger storage capacity, and does not occupy the resources of the message queue cluster, thereby improving the performance of the message queue cluster.

优选的,还可以包括步骤S105,调用存储模块的应用程序接口API查询所述对象。具体地,可以通过后台界面(Portal)调用存储模块的API查询所述对象。以ES作为存储模块来举例,ES具有强大的查询检索功能,可以使得我们可以根据消息Id或者请求Id等参数秒级将历史轨迹数据查询出来,快速的定位问题。另外如果查询不到有效信息还能够根据资源类型、资源名称、消息ID、消息路由键、Request ID、Polaris trace ID、时间范围等进行查询,缩小查询范围。Preferably, step S105 may also be included, calling the application program interface API of the storage module to query the object. Specifically, the object can be queried by calling the API of the storage module through the background interface (Portal). Taking ES as an example of a storage module, ES has a powerful query and retrieval function, which allows us to query historical trajectory data in seconds based on parameters such as message ID or request ID, and quickly locate problems. In addition, if you cannot find valid information, you can also query based on resource type, resource name, message ID, message routing key, Request ID, Polaris trace ID, time range, etc. to narrow the query scope.

在本实施例中,轨迹信息并不直接发到MQ中存储,也不直接发到存储模块中存储。而是先发送给性能更好单独的MQ集群上进行缓存,再解码后发送给存储模块,这样,消息与存储模块解耦了,也与其他MQ集群解耦了。这样的优点在于,即使把ES换成其他存储引擎,对SDK是无感知的,不需要用户进行升级等操作。In this embodiment, the trajectory information is not directly sent to the MQ for storage, nor directly sent to the storage module for storage. Instead, it is sent to a separate MQ cluster with better performance for caching, and then decoded and sent to the storage module. In this way, the message is decoupled from the storage module and from other MQ clusters. The advantage of this is that even if ES is replaced with another storage engine, it is unaware of the SDK and does not require users to perform upgrades and other operations.

如图2示出了实施例提供的一种消息队列的消息轨迹信息处理系统,包括:消息处理模块201、消息队列模块202、轨迹服务模块203、存储模块204和查询模块205。FIG. 2 shows a message track information processing system of a message queue provided by an embodiment, including: a message processing module 201 , a message queue module 202 , a track service module 203 , a storage module 204 and a query module 205 .

消息处理模块201将消息轨迹封装成轨迹信息。优选的,消息处理模块201可以包括软件工具开发包(SDK)或其他适合的工具。软件工具开发包(SDK)可以包括生产者软件工具包(Producer SDK)和消费者软件工具包(Consumer SDK)。消息处理模块201可以将消息轨迹拼装成Java对象或其他适合的对象,例如C语言、C++、Python等。本实施例所指的消息包括上游生产者发送的消息和下游消费者消费的消息。消息轨迹包括以下至少一种:消息发送时间、生产者IP、发送结果状态、发送耗时、接收消息时间、消费者IP、消息id、消息key、开始消费时间、消费完成时间、消费结果状态、消费耗时等。本实施例可以在发送端和消费端做处理,将消息发送前、消息发送后、消息消费前、消息消费后进行全程的记录,可以清楚地知道消息发送和消费的具体情况。消息处理模块201将封装后的轨迹信息Java对象进行编码,编码方式可以采用JDK或其他适合的方式。编码的过程可以将轨迹信息压缩,从而减少轨迹信息对业务端带宽的占用。编码完成后,消息处理模块201将编码后的轨迹信息序列化,并发送至消息队列模块202。The message processing module 201 encapsulates the message trace into trace information. Preferably, the message processing module 201 may include a software tool development kit (SDK) or other suitable tools. The software tool development kit (SDK) may include a producer software kit (Producer SDK) and a consumer software kit (Consumer SDK). The message processing module 201 can assemble the message traces into Java objects or other suitable objects, such as C language, C++, Python and so on. The messages referred to in this embodiment include messages sent by upstream producers and messages consumed by downstream consumers. The message trace includes at least one of the following: message sending time, producer IP, sending result status, sending time, receiving message time, consumer IP, message id, message key, start consumption time, consumption completion time, consumption result status, Consumption time etc. In this embodiment, processing can be performed at the sending end and the consuming end, and the whole process can be recorded before the message is sent, after the message is sent, before the message is consumed, and after the message is consumed, so that the specific situation of the message sending and consumption can be clearly known. The message processing module 201 encodes the encapsulated track information Java object, and the encoding method may adopt JDK or other suitable methods. The encoding process can compress the track information, thereby reducing the occupation of the bandwidth of the service end by the track information. After the encoding is completed, the message processing module 201 serializes the encoded trajectory information and sends it to the message queue module 202 .

消息队列模块202,被配置为接收并缓存所述编码和序列化后的轨迹信息。所述消息队列包括以下至少一种:ActiveMQ、RabbitMQ、ZeroMQ、Kafka。优选的,轨迹信息可以发送单独的MQ集群上,可以让轨迹信息与其他MQ集群解耦。轨迹信息可以发送到消息队列单独服务器上交换机里存储。该交换机可以名为production_default_subscription.SYS.MERCURY.TRACE.DATA.rabbitmq。The message queue module 202 is configured to receive and buffer the encoded and serialized track information. The message queue includes at least one of the following: ActiveMQ, RabbitMQ, ZeroMQ, and Kafka. Preferably, the trajectory information can be sent to a separate MQ cluster, so that the trajectory information can be decoupled from other MQ clusters. Trajectory information can be sent to the message queue for storage on a separate server on the switch. The switch can be named production_default_subscription.SYS.MERCURY.TRACE.DATA.rabbitmq.

轨迹服务模块203,被配置为将所述消息队列中缓存的轨迹信息进行反序列化和解码,解析成存储模块能识别的对象,调用存储模块应用程序编程接口(API)并将该对象存储到存储模块中。优选的,轨迹服务模块203可以是可执行Java档案文件(Java ArchiveFile),即JAR文件。The track service module 203 is configured to deserialize and decode the track information cached in the message queue, parse it into an object that can be recognized by the storage module, call the storage module application programming interface (API) and store the object in the storage module. in the storage module. Preferably, the track service module 203 may be an executable Java archive file (Java ArchiveFile), that is, a JAR file.

存储模块204,被配置为存储接收到的所述对象。存储模块可以选择Elasticsearch、HIVE、Mysql、DB等。优选的,可以使用Elasticsearch(ES)。ES的数据会持久化并且备份,所以不需要担心轨迹信息的丢失,也不需要担心会影响MQ集群的资源。The storage module 204 is configured to store the received object. The storage module can choose Elasticsearch, HIVE, Mysql, DB, etc. Preferably, Elasticsearch (ES) can be used. ES data will be persistent and backed up, so there is no need to worry about the loss of trajectory information or affecting the resources of the MQ cluster.

查询模块205,查询模块可以是后台界面(Portal)或其他合适的工具。查询模块205被配置为调用存储模块的API查询所述对象。以ES作为存储模块来举例,ES具有强大的查询检索功能。Query module 205. The query module may be a background interface (Portal) or other suitable tools. The query module 205 is configured to call the API of the storage module to query the object. Taking ES as an example of a storage module, ES has powerful query and retrieval functions.

参见图3A和3B示出的实施例提供的一种消息队列的消息轨迹信息处理系统的查询界面示意图,可以通过消息ID查询(图3A)和通用查询(图3B)两种方式进行查询,消息ID查询包括:资源类型、资源名称和消息ID等。通用查询包括资源类型、资源名称、消息路由键、消息Tag、Request ID、Polaris trace ID、消息时间范围等进行查询。可以根据消息ID中的参数秒级将历史轨迹数据查询出来,快速的定位。另外如果查询不到有效信息还能够根据通用查询种的参数进行查询,缩小查询范围。查询结果包括:消息ID、消息路由键、消息Tags、Request ID、Polaris trace ID、Client ID、发送状态、发送时间、消费状态、操作、查询条数等。Referring to the schematic diagram of the query interface of a message track information processing system of a message queue provided by the embodiment shown in FIGS. 3A and 3B , the query can be performed in two ways: message ID query ( FIG. 3A ) and general query ( FIG. 3B ). The ID query includes: resource type, resource name, and message ID. Common queries include resource type, resource name, message routing key, message Tag, Request ID, Polaris trace ID, message time range, etc. According to the parameters in the message ID, the historical trajectory data can be queried in seconds for quick positioning. In addition, if you cannot find valid information, you can also query according to the parameters of the general query to narrow the query scope. The query results include: message ID, message routing key, message Tags, Request ID, Polaris trace ID, Client ID, sending status, sending time, consumption status, operation, number of queries, etc.

参加图4示出了实施例提供的一种消息队列的消息轨迹信息处理系统的查询结果示意图。可以根据用户发起的查询请求,获取查询结果,进行组装后,展示给用户生产者、服务端、消费者的轨迹详情。生产者轨迹信息可以包括消息ID、发送者IP、发送时间、发送耗时、发送状态、Request ID、Polaris trace ID等。服务端轨迹信息可以包括集群信息、主题、路由键、存储时间等。消费者轨迹信息包括订阅、第几次消费、消费者IP、投递时间、消费耗时、消费状态、Request ID、Polaris trace ID、Client ID等。Referring to FIG. 4 , a schematic diagram of a query result of a message track information processing system of a message queue provided by an embodiment is shown. According to the query request initiated by the user, the query result can be obtained, and after assembly, the track details of the producer, server and consumer can be displayed to the user. The producer trace information can include message ID, sender IP, sending time, sending time, sending status, Request ID, Polaris trace ID, etc. The server track information can include cluster information, topics, routing keys, storage time, etc. Consumer trace information includes subscription, number of times of consumption, consumer IP, delivery time, consumption time, consumption status, Request ID, Polaris trace ID, Client ID, etc.

实施例还提供了一种终端,包括:处理器、存储器以及存储在存储器上运行的计算机程序,所述处理器执行计算机程序时实现上述任一实施例方法的步骤,例如步骤S101至S104,或者所述处理器执行所述计算机程序时实现上述各实施例中各模块/单元的功能,例如图1所示单元201至205的功能。所述计算机程序可以被分割成一个或多个模块/单元,所述一个或者多个模块/单元被存储在所述存储器中,并由所述处理器执行。所述一个或多个模块/单元可以是能够完成特定功能的一系列计算机程序指令段,该指令段用于描述所述计算机程序在所述终端中的执行过程。The embodiment further provides a terminal, including: a processor, a memory, and a computer program stored on the memory to run, and the processor implements the steps of the method in any of the foregoing embodiments when the processor executes the computer program, for example, steps S101 to S104, or When the processor executes the computer program, the functions of the modules/units in the above embodiments, such as the functions of the units 201 to 205 shown in FIG. 1 , are implemented. The computer program may be divided into one or more modules/units, which are stored in the memory and executed by the processor. The one or more modules/units may be a series of computer program instruction segments capable of performing specific functions, and the instruction segments are used to describe the execution process of the computer program in the terminal.

所述终端可以是智能手机等移动终端,或者是桌上型计算机、笔记本、掌上电脑及云端服务器等计算设备。所述终端可包括,但不仅限于,处理器、存储器,可以包括更多或更少的部件,或者组合某些部件,例如所述终端还可以包括输入输出设备、网络接入设备、总线等。所称处理器可以是中央处理单元(Central Processing Unit,CPU),还可以是其他通用处理器、数字消息队列的管理器(Digital Signal Processor,DSP)、专用集成电路(Application Specific Integrated Circuit,ASIC)、现成可编程门阵列(FieldProgrammable Gate Array,FPGA)或者其他可编程逻辑器件、分立门或者晶体管逻辑器件、分立硬件组件等。通用处理器可以是微处理器或者该处理器也可以是任何常规的处理器等。所述存储器可以是所述终端的内部存储单元,例如终端的硬盘或内存。所述存储器也可以是所述终端的外部存储设备,例如所述终端上配备的插接式硬盘,智能存储卡(Smart Media Card,SMC),安全数字(Secure Digital,SD)卡,闪存卡(Flash Card)等。进一步地,所述存储器还可以既包括所述终端的内部存储单元也包括外部存储设备。The terminal may be a mobile terminal such as a smart phone, or a computing device such as a desktop computer, a notebook, a palmtop computer, and a cloud server. The terminal may include, but is not limited to, a processor and a memory, may include more or less components, or combine certain components, for example, the terminal may also include an input and output device, a network access device, a bus, and the like. The processor may be a central processing unit (Central Processing Unit, CPU), other general-purpose processors, a digital message queue manager (Digital Signal Processor, DSP), an application specific integrated circuit (Application Specific Integrated Circuit, ASIC) , Off-the-shelf programmable gate array (FieldProgrammable Gate Array, FPGA) or other programmable logic devices, discrete gate or transistor logic devices, discrete hardware components, etc. A general purpose processor may be a microprocessor or the processor may be any conventional processor or the like. The memory may be an internal storage unit of the terminal, such as a hard disk or a memory of the terminal. The memory may also be an external storage device of the terminal, such as a plug-in hard disk, a smart memory card (Smart Media Card, SMC), a Secure Digital (Secure Digital, SD) card, a flash memory card ( Flash Card), etc. Further, the memory may also include both an internal storage unit of the terminal and an external storage device.

实施例还提供了一种计算机可读存储介质,所述计算机程序被处理器执行时实现上述任一实施例方法的步骤。The embodiment also provides a computer-readable storage medium, when the computer program is executed by a processor, the steps of the method of any of the above-mentioned embodiments are implemented.

在本申请各个实施例中的各功能单元可以集成在一个处理单元中,也可以是各个单元单独物理存在,也可以两个或两个以上单元集成在一个单元中。上述集成的单元既可以采用硬件的形式实现,也可以采用软件功能单元的形式实现。所述集成的模块/单元如果以软件功能单元的形式实现并作为独立的产品销售或使用时,可以存储在一个计算机可读取存储介质中。基于这样的理解,本申请实现上述实施例方法中的全部或部分流程,也可以通过计算机程序来指令相关的硬件来完成,所述的计算机程序可存储于一计算机可读存储介质中,该计算机程序在被处理器执行时,可实现上述各个方法实施例的步骤。其中,所述计算机程序包括计算机程序代码,所述计算机程序代码可以为源代码形式、对象代码形式、可执行文件或某些中间形式等。所述计算机可读介质可以包括:能够携带所述计算机程序代码的任何实体或装置、记录介质、U盘、移动硬盘、磁碟、光盘、计算机存储器、只读存储器(Read-Only Memory,ROM)、随机存取存储器(Random Access Memory,RAM)、电载波信号、电信信号以及软件分发介质等。需要说明的是,所述计算机可读介质包含的内容可以根据司法管辖区内立法和专利实践的要求进行适当的增减,例如在某些司法管辖区,根据立法和专利实践,计算机可读介质不包括电载波信号和电信信号。Each functional unit in each embodiment of the present application may be integrated into one processing unit, or each unit may exist physically alone, or two or more units may be integrated into one unit. The above-mentioned integrated units may be implemented in the form of hardware, or may be implemented in the form of software functional units. The integrated modules/units, if implemented in the form of software functional units and sold or used as independent products, may be stored in a computer-readable storage medium. Based on this understanding, the present application can implement all or part of the processes in the methods of the above embodiments, and can also be completed by instructing the relevant hardware through a computer program. The computer program can be stored in a computer-readable storage medium, and the computer When the program is executed by the processor, the steps of the foregoing method embodiments can be implemented. Wherein, the computer program includes computer program code, and the computer program code may be in the form of source code, object code, executable file or some intermediate form, and the like. The computer-readable medium may include: any entity or device capable of carrying the computer program code, recording medium, U disk, removable hard disk, magnetic disk, optical disk, computer memory, Read-Only Memory (ROM) , Random Access Memory (Random Access Memory, RAM), electric carrier signal, telecommunication signal and software distribution medium, etc. It should be noted that the content contained in the computer-readable media may be appropriately increased or decreased according to the requirements of legislation and patent practice in the jurisdiction, for example, in some jurisdictions, according to legislation and patent practice, the computer-readable media Electric carrier signals and telecommunication signals are not included.

所属领域的技术人员可以清楚地了解到,为了描述的方便和简洁,仅以上述各功能单元、模块的划分进行举例说明,实际应用中,可以根据需要而将上述功能分配由不同的功能单元、模块完成,即将所述装置的内部结构划分成不同的功能单元或模块,以完成以上描述的全部或者部分功能。实施例中的各功能单元、模块可以集成在一个处理单元中,也可以是各个单元单独物理存在,也可以两个或两个以上单元集成在一个单元中,上述集成的单元既可以采用硬件的形式实现,也可以采用软件功能单元的形式实现。另外,各功能单元、模块的具体名称也只是为了便于相互区分,并不用于限制本申请的保护范围。上述系统中单元、模块的具体工作过程,可以参考前述方法实施例中的对应过程,在此不再赘述。Those skilled in the art can clearly understand that, for the convenience and simplicity of description, only the division of the above-mentioned functional units and modules is used as an example. Module completion, that is, dividing the internal structure of the device into different functional units or modules to complete all or part of the functions described above. Each functional unit and module in the embodiment may be integrated in one processing unit, or each unit may exist physically alone, or two or more units may be integrated in one unit, and the above-mentioned integrated units may adopt hardware. It can also be realized in the form of software functional units. In addition, the specific names of the functional units and modules are only for the convenience of distinguishing from each other, and are not used to limit the protection scope of the present application. For the specific working processes of the units and modules in the above-mentioned system, reference may be made to the corresponding processes in the foregoing method embodiments, which will not be repeated here.

在上述实施例中,对各个实施例的描述都各有侧重,某个实施例中没有详述或记载的部分,可以参见其它实施例的相关描述。本领域普通技术人员可以意识到,结合本文中所公开的实施例描述的各示例的单元及算法步骤,能够以电子硬件、或者计算机软件和电子硬件的结合来实现。这些功能究竟以硬件还是软件方式来执行,取决于技术方案的特定应用和设计约束条件。专业技术人员可以对每个特定的应用来使用不同方法来实现所描述的功能,但是这种实现不应认为超出本申请的范围。In the foregoing embodiments, the description of each embodiment has its own emphasis. For parts that are not described or described in detail in a certain embodiment, reference may be made to the relevant descriptions of other embodiments. Those of ordinary skill in the art can realize that the units and algorithm steps of each example described in conjunction with the embodiments disclosed herein can be implemented in electronic hardware, or a combination of computer software and electronic hardware. Whether these functions are performed in hardware or software depends on the specific application and design constraints of the technical solution. Skilled artisans may implement the described functionality using different methods for each particular application, but such implementations should not be considered beyond the scope of this application.

在本申请所提供的实施例中,应该理解到,所揭露的系统、终端和方法,可以通过其它的方式实现。例如,以上所描述的系统、终端实施例仅仅是示意性的,例如,所述模块或单元的划分,仅仅为一种逻辑功能划分,实际实现时可以有另外的划分方式,例如多个单元或组件可以结合或者可以集成到另一个系统,或一些特征可以忽略,或不执行。另外,所显示或讨论的相互之间的耦合或直接耦合或通讯连接可以是通过一些接口,系统或单元的间接耦合或通讯连接,可以是电性,机械或其它的形式。所述作为分离部件说明的单元可以是或者也可以不是物理上分开的,作为单元显示的部件可以是或者也可以不是物理单元,即可以位于一个地方,或者也可以分布到多个网络单元上。可以根据实际的需要选择其中的部分或者全部单元来实现本实施例方案的目的。In the embodiments provided in this application, it should be understood that the disclosed system, terminal and method may be implemented in other manners. For example, the above-described system and terminal embodiments are only illustrative. For example, the division of the modules or units is only a logical function division. In actual implementation, there may be other division methods, such as multiple units or Components may be combined or may be integrated into another system, or some features may be omitted, or not implemented. In addition, the shown or discussed mutual coupling or direct coupling or communication connection may be through some interfaces, indirect coupling or communication connection of systems or units, and may be in electrical, mechanical or other forms. The units described as separate components may or may not be physically separated, and components displayed as units may or may not be physical units, that is, may be located in one place, or may be distributed to multiple network units. Some or all of the units may be selected according to actual needs to achieve the purpose of the solution in this embodiment.

以上所述实施例仅用以说明本申请的技术方案,而非对其限制;尽管参照前述实施例对本申请进行了详细的说明,本领域的普通技术人员应当理解:其依然可以对前述各实施例所记载的技术方案进行修改,或者对其中部分技术特征进行等同替换;而这些修改或者替换,并不使相应技术方案的本质脱离本申请各实施例技术方案的精神和范围,均应包含在本申请的保护范围之内。The above-mentioned embodiments are only used to illustrate the technical solutions of the present application, but not to limit them; although the present application has been described in detail with reference to the above-mentioned embodiments, those of ordinary skill in the art should understand that: it can still be used for the above-mentioned implementations. The technical solutions described in the examples are modified, or some technical features thereof are equivalently replaced; and these modifications or replacements do not make the essence of the corresponding technical solutions deviate from the spirit and scope of the technical solutions in the embodiments of the application, and should be included in the within the scope of protection of this application.

Claims (13)

1. A message track information processing method of a message queue is characterized by comprising the following steps:
packaging the message tracks of the message queue into track information, and coding and serializing the track information;
sending the track information after the coding and the serialization to a message queue for caching;
deserializing and decoding the track information cached in the message queue, and analyzing the track information into objects which can be identified by a storage module;
and calling an application program interface of the storage module and storing the object for query.
2. The message track information processing method of the message queue according to claim 1, further comprising: querying the object with object parameters of at least one of: resource type, resource name, message ID, message routing key, message Tag, RequestID, Polaris trace ID, message timeframe.
3. The message track information processing method of the message queue according to claim 1, wherein the message track of the message queue comprises at least one of: message sending time, producer IP, sending time consumption, consumer IP, message ID, message Key, time of receiving message, consumption starting time, consumption finishing time, consumption result state and consumption time consumption.
4. The message track information processing method of the message queue according to claim 1, wherein the storage module comprises at least one of: elasticisearch, HIVE, Mysql, DB.
5. The message track information processing method of the message queue according to claim 1, wherein the message queue comprises at least one of: ActiveMQ, RabbitMQ, ZeroMQ, Kafka.
6. The method of claim 1, wherein encapsulating message tracks of a message queue into track information comprises encapsulating message tracks into at least one of: java objects, C language objects, C + + objects, Python objects.
7. The message track information processing method of the message queue according to claim 1, wherein the encoding is performed by a development kit.
8. A message track information processing system for a message queue, comprising:
the message processing module is configured to encapsulate the messages of the message queue into track information, encode and serialize the track information, and send the encoded and serialized track information to the message queue module;
a message queue module configured to receive and buffer the encoded and serialized track information;
the track service module is configured to decode, deserialize and analyze the track information cached in the message queue into an object which can be identified by the storage module, call an application program interface of the storage module and send the object to the storage module;
a storage module to store the object for querying.
9. The message track information processing system of the message queue of claim 8, further comprising:
and the query module is configured to call an application program interface of the storage module to query the object.
10. The message track information processing system of a message queue of claim 8, wherein the message processing module is a software tool development kit.
11. The message track information handling system of a message queue of claim 8, wherein the track service module is an executable archive file.
12. A terminal, comprising: a processor, a memory, and a computer program stored for execution on the memory, the processor when executing the computer program implementing the method of any one of claims 1-7.
13. A computer-readable storage medium, characterized in that the computer program, when being executed by a processor, implements the method of any one of claims 1-7.
CN202111520978.7A 2021-12-13 2021-12-13 Message track information processing method, system, terminal and storage medium of message queue Pending CN114461413A (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN202111520978.7A CN114461413A (en) 2021-12-13 2021-12-13 Message track information processing method, system, terminal and storage medium of message queue

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN202111520978.7A CN114461413A (en) 2021-12-13 2021-12-13 Message track information processing method, system, terminal and storage medium of message queue

Publications (1)

Publication Number Publication Date
CN114461413A true CN114461413A (en) 2022-05-10

Family

ID=81406227

Family Applications (1)

Application Number Title Priority Date Filing Date
CN202111520978.7A Pending CN114461413A (en) 2021-12-13 2021-12-13 Message track information processing method, system, terminal and storage medium of message queue

Country Status (1)

Country Link
CN (1) CN114461413A (en)

Citations (4)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US6848108B1 (en) * 1998-06-30 2005-01-25 Microsoft Corporation Method and apparatus for creating, sending, and using self-descriptive objects as messages over a message queuing network
CN107465549A (en) * 2017-08-18 2017-12-12 北京奇艺世纪科技有限公司 A kind of distributed message processing method and system
CN112751893A (en) * 2019-10-30 2021-05-04 中移(苏州)软件技术有限公司 Message track data processing method and device and electronic equipment
CN113505037A (en) * 2021-06-24 2021-10-15 北京天九云电子商务有限公司 Message management monitoring system, method, readable medium and electronic device

Patent Citations (4)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US6848108B1 (en) * 1998-06-30 2005-01-25 Microsoft Corporation Method and apparatus for creating, sending, and using self-descriptive objects as messages over a message queuing network
CN107465549A (en) * 2017-08-18 2017-12-12 北京奇艺世纪科技有限公司 A kind of distributed message processing method and system
CN112751893A (en) * 2019-10-30 2021-05-04 中移(苏州)软件技术有限公司 Message track data processing method and device and electronic equipment
CN113505037A (en) * 2021-06-24 2021-10-15 北京天九云电子商务有限公司 Message management monitoring system, method, readable medium and electronic device

Similar Documents

Publication Publication Date Title
CN111767143A (en) Transaction data processing method, device, equipment and system
CN111339186A (en) Workflow engine data synchronization method, device, medium and electronic equipment
CN103401934A (en) Method and system for acquiring log data
CN113010542B (en) Service data processing method, device, computer equipment and storage medium
CN110765165B (en) Method, device and system for synchronously processing cross-system data
CN111382146A (en) Data storage control method, device, equipment and storage medium
CN115297183B (en) A data processing method, device, electronic equipment and storage medium
CN115809222A (en) A log processing method, device, equipment and computer storage medium
US11494437B1 (en) System and method for performing object-modifying commands in an unstructured storage service
CN111666145A (en) Message processing method and system of message queue and computer equipment
CN111459980A (en) Monitoring data storage and query method and device
CN117435670A (en) Heterogeneous data source synchronization method, system, electronic device and storage medium
CN113051244B (en) Data access method and device, data acquisition method and device
CN110866001A (en) Method and device for determining order to be processed
CN114461413A (en) Message track information processing method, system, terminal and storage medium of message queue
US12367199B2 (en) Endpoint scan and profile generation
CN118540371A (en) Cluster resource management method and device, storage medium and electronic equipment
CN113779018A (en) A data processing method and device
CN111143461A (en) Mapping relation processing system and method and electronic equipment
CN111259013A (en) Method and device for storing data
CN115982133A (en) Data processing method and device
CN117272077A (en) Data processing method, device, computer equipment and storage medium
CN115576967A (en) A primary key generation method and device
CN104216914B (en) large-capacity data transmission
CN114461595A (en) Method, apparatus, medium and electronic device for sending messages

Legal Events

Date Code Title Description
PB01 Publication
PB01 Publication
SE01 Entry into force of request for substantive examination
SE01 Entry into force of request for substantive examination