CN111753007A - A data aggregation system and aggregation method based on pluggable components under multi-system - Google Patents
A data aggregation system and aggregation method based on pluggable components under multi-system Download PDFInfo
- Publication number
- CN111753007A CN111753007A CN202010546939.3A CN202010546939A CN111753007A CN 111753007 A CN111753007 A CN 111753007A CN 202010546939 A CN202010546939 A CN 202010546939A CN 111753007 A CN111753007 A CN 111753007A
- Authority
- CN
- China
- Prior art keywords
- data
- aggregation
- message middleware
- server
- processor
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Pending
Links
Images
Classifications
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/25—Integrating or interfacing systems involving database management systems
- G06F16/252—Integrating or interfacing systems involving database management systems between a Database Management System and a front-end application
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/25—Integrating or interfacing systems involving database management systems
- G06F16/258—Data format conversion from or to a database
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/54—Interprogram communication
- G06F9/546—Message passing systems or structures, e.g. queues
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F2209/00—Indexing scheme relating to G06F9/00
- G06F2209/54—Indexing scheme relating to G06F9/54
- G06F2209/547—Messaging middleware
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F2209/00—Indexing scheme relating to G06F9/00
- G06F2209/54—Indexing scheme relating to G06F9/54
- G06F2209/548—Queue
Landscapes
- Engineering & Computer Science (AREA)
- Databases & Information Systems (AREA)
- Theoretical Computer Science (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Data Mining & Analysis (AREA)
- Software Systems (AREA)
- Computer And Data Communications (AREA)
Abstract
Description
技术领域technical field
本发明涉及多种协议下多数据源的数据处理技术,涉及多系统下如何适配不同数据源,不同结构汇聚方法,具体涉及一种基于多系统下可插拔式组件数据汇聚系统及汇聚方法。The present invention relates to a data processing technology of multiple data sources under multiple protocols, how to adapt different data sources under multiple systems, and different structure aggregation methods, in particular to a data aggregation system and aggregation method based on pluggable components under multiple systems .
背景技术Background technique
当前在企业大数据分析应用或互联网数据应用中对数据源的适配都是需要一对一定制化开发进行适配,耗费大量的时间和工作量,为了解决在企业、互联网应用中多数据源下如何自动化适配接入运行数据的汇聚技术方法,尤其是对多业务系统的数据源进行数据归集汇聚等,目前市场上通常是采用定制化开发针对不同业务系统进行对接和适配,定制化开发这种方式对数据需求方和数据接入方的人力、物力、时间影响很大,增加了被接入系统数据的时间风险和技术风险。At present, the adaptation of data sources in enterprise big data analysis applications or Internet data applications requires one-to-one customized development, which consumes a lot of time and workload. In order to solve the problem of multiple data sources in enterprise and Internet applications The following is how to automatically adapt the aggregation technology method of access operation data, especially the data collection and aggregation of data sources of multi-service systems. At present, customized development is usually used in the market to connect and adapt to different business systems. This method of development has a great impact on the manpower, material resources, and time of the data demander and data access party, and increases the time risk and technical risk of the data being connected to the system.
发明内容SUMMARY OF THE INVENTION
发明目的:针对复杂环境解决中需要通过汇聚多业务系统下、不同协议的业务数据问题,本发明的第一目的是提供一种多系统下可插拔式组件数据汇聚系统。同时,本发明的第二目的是提供一种多系统下可插拔式组件数据汇聚方法。Purpose of the invention: Aiming at the problem of needing to aggregate business data of different protocols under multi-service systems in solving complex environments, the first object of the present invention is to provide a pluggable component data aggregation system under multi-system. Meanwhile, the second object of the present invention is to provide a method for data aggregation of pluggable components under multiple systems.
为实现上述目的,本发明所提供的技术方案如下:For achieving the above object, the technical scheme provided by the present invention is as follows:
一种基于多系统下可插拔式组件数据采集系统,包括汇聚服务器、消息中间件、转换处理器和消费处理器;A data acquisition system based on pluggable components under multiple systems, comprising a convergence server, a message middleware, a conversion processor and a consumption processor;
汇聚服务器,部署有可插拔的汇聚组件,用于适配不同系统、不同结构数据源,通过汇聚组件采集业务系统数据源系统的数据流,并将采集的数据流归集到消息中间件;The aggregation server is deployed with pluggable aggregation components, which are used to adapt to different systems and data sources of different structures, collect the data streams of the business system data source systems through the aggregation components, and collect the collected data streams into the message middleware;
消息中间件,用于存储汇聚服务器采集的数据流和经过转换处理器处理后结构化数据;The message middleware is used to store the data stream collected by the aggregation server and the structured data processed by the conversion processor;
转换处理器,基于数据源格式的通用匹配数据格式处理器,将源数据处理成结构化数据,格式为标准json数据;The conversion processor is a general matching data format processor based on the data source format, which processes the source data into structured data, and the format is standard json data;
消费处理器,基于从消息中间件中获取结构化数据流,通过消费处理器进行实时调用消费并将数据流向到大数据实时库。The consumer processor, based on the structured data stream obtained from the message middleware, makes real-time call consumption through the consumer processor and streams the data to the big data real-time library.
进一步,所述汇聚服务器为云服务器或物理服务器,云服务器或物理服务器与被采集业务系统网络互通,数据端口开放共享,且网络协议中支持主流TCP协议。Further, the aggregation server is a cloud server or a physical server, the cloud server or physical server and the collected business system network communicate with each other, the data port is open and shared, and the network protocol supports the mainstream TCP protocol.
更进一步,所述汇聚服务器、消息中间件和消费处理器均能兼容搭载两种及其以上的操作系统的服务器设备,实现两种及其以上不同操作系统下的数据汇聚,操作系统包括Windows操作系统、Unix操作系统、linux操作系统。Further, the aggregation server, message middleware and consumer processor are all compatible with server devices equipped with two or more operating systems, and realize data aggregation under two or more different operating systems. The operating systems include Windows operation. system, Unix operating system, linux operating system.
更进一步,所述消息中间件为kafka、配置数据库为MySql、Oracle。Further, the message middleware is kafka, and the configuration database is MySql and Oracle.
实施上述汇聚系统的一种基于多系统下可插拔式组件数据汇聚方法,所述方法包括如下步骤:A method for data aggregation based on pluggable components under multiple systems for implementing the above aggregation system, the method includes the following steps:
(1)汇聚系统组建:开放业务系统的数据协议接口,将汇聚服务器与被接入业务系统数据流进行接口适配,并将采集的数据流经转换处理器进行数据转换,形成结构化数据存储到消息中间件,并调用消息中间件进行消息消费处理;(1) Convergence system establishment: open the data protocol interface of the business system, adapt the interface between the convergence server and the data flow of the connected business system, and convert the collected data flow through the conversion processor to form a structured data storage Go to the message middleware, and call the message middleware for message consumption processing;
(2)数据汇聚处理:汇聚服务器通过Filebeat收集被采集业务系统的数据流,数据流经过数据转换器进行过滤处理后传送给消息中间件中的kafka或集群进行存储,通过Logstash工具再到消息中间件中的kafka中获取数据,传给Elasticsearch工具进行数据分析处理,最后到kibana工具进行展示。(2) Data aggregation processing: The aggregation server collects the data stream of the collected business system through Filebeat. The data stream is filtered and processed by the data converter and then sent to the kafka or cluster in the message middleware for storage, and then to the message center through the Logstash tool. The data is obtained from the kafka in the file, passed to the Elasticsearch tool for data analysis and processing, and finally displayed to the kibana tool.
进一步,步骤(1)中,汇聚服务器适配被采集业务系统的数据,并将汇聚的数据经过数据转换处理器转换形成结构化数据,并将结构化数据存储到消息中间件,具体方式如下:Further, in step (1), the aggregation server is adapted to the data of the collected business system, and the aggregated data is converted into structured data by the data conversion processor, and the structured data is stored in the message middleware. The specific methods are as follows:
基于Java语言调用SocketServer、RestfulAPI、JDBC不同协议的数据流,并将不同结构的数据经过统一结构处理器转换封装成标准的结构化数据对象,将数据对象再次封装成统一结构化的json数据,并对数据进行标签化,然后将数据对象存储至消息中间件。Call data streams of different protocols of SocketServer, RestfulAPI, and JDBC based on Java language, and convert and encapsulate data of different structures into standard structured data objects through a unified structure processor, and re-encapsulate the data objects into unified structured json data. Tag the data and store the data objects in the message middleware.
进一步,消费处理器从消息中间件获取数据时,可以采用多进程及多线程的方式对数据对象进行处理,消息中间件采用kafka,实时数据库采用ElasticSearch7.0进行数据存储。Further, when the consumer processor obtains data from the message middleware, it can process the data objects in a multi-process and multi-threaded manner. The message middleware adopts kafka, and the real-time database adopts ElasticSearch7.0 for data storage.
更进一步,所述方法汇聚的各类数据经过数据转换处理器进行数据结构化转换,形成标准的json数据格式,从消息中间件中调用数据进行消费处理具体方式如下:Further, the various types of data gathered by the method are subjected to data structure conversion through a data conversion processor to form a standard json data format, and the specific method of calling data from the message middleware for consumption processing is as follows:
首先从消息中间件中获取数据流,通过Flume或Logstash对数据包进行拆分和重组,进而保存数据对象至Elastic Search中。First, obtain the data stream from the message middleware, split and reorganize the data packets through Flume or Logstash, and then save the data objects to Elastic Search.
有益效果:与现有技术对比,本发明显著效果如下:Beneficial effect: Compared with the prior art, the remarkable effect of the present invention is as follows:
(1)本发明所述汇聚系统通过在汇聚服务器部署热插拔式管理的汇聚组件可以实现自动化适配业务数据采集,节省大量的时间和工作量;同时支持兼容Windows、Centos、Unix、Linux操作系统,也可以兼容32位、64位操作平台,适配不同业务系统汇聚需求,比现有技术更灵活、更便捷,具有覆盖范围广、技术接入简化,自动化适配效果强效果;(1) The convergence system of the present invention can realize automatic adaptation of business data collection by deploying hot-swappable management convergence components on the convergence server, saving a lot of time and workload; at the same time, it supports compatible Windows, Centos, Unix, and Linux operations. The system is also compatible with 32-bit and 64-bit operating platforms, adapting to the convergence requirements of different business systems, more flexible and more convenient than the existing technology, with wide coverage, simplified technical access, and strong automatic adaptation effect;
(2)基于汇聚系统所提供的汇聚方法能够根据业务场景自动化适配数据汇聚服务器,无需大量时间、工作量和技术攻关,和数据需求研发适配工作,对解决业务系统运行数据开始快速的数据接入、数据分析、数据挖掘都提供了非常便利的条件,且对被汇聚的业务系统无干扰,不需要进行嵌入式集成编码,采用分布式架构,以微服务组件式独立存在,且部署快速,实施条件简单、支持扩展性,可以通过插件形式进行新汇聚器的安装、同时也支持汇聚器卸载能力;另一方面,所述方法采用多平台、多进程、多线程架构,可以兼容Windows、Centos、Unix、Linux操作系统,也可以兼容32位、64位操作平台,适配不同业务系统数据汇聚需求,为企事业单位和互联网企业应用等提供了对接多业务系统多数据结构方法。(2) Based on the aggregation method provided by the aggregation system, the data aggregation server can be automatically adapted according to the business scenario, without requiring a lot of time, workload and technical research, and the R&D adaptation work to meet the data requirements. Access, data analysis, and data mining all provide very convenient conditions, and do not interfere with the aggregated business systems, do not require embedded integration coding, adopt a distributed architecture, exist independently in the form of micro-service components, and deploy quickly , the implementation conditions are simple, the expansion is supported, the new aggregator can be installed in the form of a plug-in, and the ability to uninstall the aggregator is also supported; on the other hand, the method adopts a multi-platform, multi-process and multi-thread architecture, which is compatible with Windows, Centos, Unix, and Linux operating systems are also compatible with 32-bit and 64-bit operating platforms, adapting to the data aggregation requirements of different business systems, providing a method for connecting multiple business systems and multiple data structures for enterprises, institutions and Internet enterprise applications.
附图说明Description of drawings
图1为本发明所述方法的主要流程结构图;Fig. 1 is the main flow chart of the method of the present invention;
图2为本发明所述方法的具体流程示意图;Fig. 2 is the concrete schematic flow chart of the method of the present invention;
图3为本发明热插拔组件汇聚器架构示意图;FIG. 3 is a schematic diagram of the architecture of the hot-pluggable component aggregator according to the present invention;
图4为本发明实施例2中应用系统的框架示意图。FIG. 4 is a schematic diagram of a framework of an application system in Embodiment 2 of the present invention.
具体实施方式Detailed ways
为详细说明本发明所公开的技术方案,以下结合附图对本发明做具体的说明。In order to describe the technical solutions disclosed by the present invention in detail, the present invention is described in detail below with reference to the accompanying drawings.
本发明所提供的是一种基于多系统下可插拔式组件数据汇聚系统及汇聚方法,所述方法通过数据汇聚系统的汇聚采集器,适配多数据源、多协议化、多接口数据源的一种通用型数据的方法,通过可热插拔的汇聚模型器采集目标数据源、采集反馈回的数据通过中间数据模型转换层转换成标准化结构数据,并将数据流存储到消息中间件,再从消息中间件中调用数据流进行分析处理。具体的,通过适配业务系统的协议接口,汇聚被采集的业务系统的数据流,并将采集的数据流通过数据转换处理器形成结构化数据,并将结构化的数据存储到消息中间件,再从消息中间件中调用数据进行数据消费处理。The invention provides a data aggregation system and aggregation method based on pluggable components under multi-system. The method adapts to multi-data sources, multi-protocol and multi-interface data sources through the aggregation collector of the data aggregation system. A general-purpose data method, collects the target data source through a hot-pluggable aggregation modeler, collects the feedback data and converts it into standardized structured data through the intermediate data model conversion layer, and stores the data stream in the message middleware, Then call the data flow from the message middleware for analysis and processing. Specifically, by adapting the protocol interface of the business system, the collected data flow of the business system is aggregated, and the collected data flow is formed into structured data through the data conversion processor, and the structured data is stored in the message middleware, Then call the data from the message middleware for data consumption processing.
在上述技术方案的基础上,本实施例根据收集的业务系统需求进行数据源适配,选用与数据源相匹配的适配器。On the basis of the above technical solutions, this embodiment performs data source adaptation according to the collected business system requirements, and selects an adapter that matches the data source.
实施例1Example 1
如图1和图2所示,本发明所提供的一种基于多系统下可插拔式组件数据汇聚方法,具体步骤如下:As shown in Figure 1 and Figure 2, a method for data aggregation based on pluggable components under multiple systems provided by the present invention, the specific steps are as follows:
首先根据业务需求汇聚被采集业务系统的数据。先通过被采集端的数据进行适配,通过已有的汇聚器作为数据适配条件进行数据结构适配,被采集的业务系统数据汇聚器,通过汇聚器汇聚业务系统数据并由数据转换器进行结构化数据转换,形成结构化json数据,再由消息中间件将结构化数据存储到内存消息中间件,再由消费处理器调用结构化数据进行消费处理。Firstly, the data of the collected business system is aggregated according to business requirements. First, it is adapted through the data of the collected end, and the data structure is adapted through the existing aggregator as the data adaptation condition. The collected business system data aggregator aggregates the business system data through the aggregator and is structured by the data converter. The data is converted into structured json data, and then the message middleware stores the structured data in the memory message middleware, and then the consumer processor calls the structured data for consumption processing.
其中,采集汇聚器采用微服务架构的微服务,具体包括采用Java语言及其数据处理方式调用SocketServer、restfulAPI、JDBC等不同协议的数据流,并将不同结构的数据经过统一结构处理器转换封装成标准的结构化数据对象,将数据对象封装成统一结构化的json数据,并对数据进行标签化,然后将数据对象存储至消息中间件。Among them, the collection aggregator adopts microservices with microservice architecture, which specifically includes using Java language and its data processing methods to call data streams of different protocols such as SocketServer, restfulAPI, JDBC, etc., and convert and encapsulate data of different structures through a unified structure processor into Standard structured data objects, encapsulate the data objects into unified structured json data, label the data, and then store the data objects in the message middleware.
对应的,上述方法实施做组建的一种基于多系统下可插拔式组件数据采集系统,包括如下几个部分:Correspondingly, a multi-system-based pluggable component data acquisition system constructed by implementing the above method includes the following parts:
汇聚服务器,用于适配不同数据源结构,采集业务系统的运行数据,并将采集的数据包存储到消息中间件。The aggregation server is used to adapt to different data source structures, collect the operation data of the business system, and store the collected data packets in the message middleware.
消息中间件,用于将汇聚服务器采集的数据包存储到消息中间件,存储的是处理后结构化数据。The message middleware is used to store the data packets collected by the aggregation server into the message middleware, which stores the processed structured data.
转换处理器:是基于数据源格式的通用匹配数据格式处理器,将源数据处理成结构化数据,格式为json数据。Conversion processor: It is a general matching data format processor based on the data source format, which processes the source data into structured data and the format is json data.
消费处理器,用于从消息中间件中获取结构化数据包,通过消费处理器进行实时消费,并将数据包存储到大数据实时库中即Elastic Search。The consumption processor is used to obtain structured data packets from the message middleware, consume them in real time through the consumption processor, and store the data packets in the big data real-time database, namely Elastic Search.
其中,所述汇聚服务器、消息中间件和消费处理器均能兼容多种操作系统,包括Windows操作系统、Unix操作系统和Linux操作系统,且均能兼容多种操作平台。Wherein, the convergence server, message middleware and consumer processor are all compatible with various operating systems, including Windows operating system, Unix operating system and Linux operating system, and are compatible with various operating platforms.
其中,汇聚业务系统的数据包,并将采集的数据包通过数据转换处理器进行结构化处理,并通过消息中间存储到消息中间件,具体包括:采用Java技术调用SocketServer、restfulAPI、JDBC等不同协议数据流,并将不同结构的数据包经过统一结构处理器转换,封装成标准的结构化数据对象,再将数据对象封装成统一结构化json数据,并对json数据进行标签化,然后将数据对象存储至消息中间件。并且汇聚与被接入业务系统的数据包,支持多数据包并发汇聚,通过热插拔组件管理器对适配的汇聚器进行管理,由热插拔管理器对汇聚器进行组织协调、启动和停止,并可以支持在不停止应用情况下对组件进行热插拔管理,支持对新汇聚器进行安装、卸载管理,热插拔汇聚器只需要支持微服务架构就满足要求。汇聚的各种数据经过数据转换处理器进行数据结构化转换,形成标准的json数据格式,从消息中间件中调用消费处理,具体包括:从消息中间件中获取数据包,通过Flume或Logstash对数据包进行拆分和重组,进而保存数据对象到Elastic Search中。热插拔组件汇聚器架构示意图如图3所示。Among them, the data packets of the business system are aggregated, and the collected data packets are structured and processed through the data conversion processor, and stored in the message middleware through the message intermediate, which includes: using Java technology to call different protocols such as SocketServer, restfulAPI, JDBC, etc. data flow, and convert the data packets of different structures through the unified structure processor, encapsulate them into standard structured data objects, and then encapsulate the data objects into unified structured json data, label the json data, and then convert the data objects into Store to message middleware. And it aggregates and accesses the data packets of the business system, supports concurrent aggregation of multiple data packets, manages the adapted aggregator through the hot-plug component manager, and organizes, coordinates, starts and manages the aggregator by the hot-pluggable component manager. Stop, and can support hot-plug management of components without stopping the application, support installation and uninstall management of new aggregators, and hot-plug aggregators only need to support the microservice architecture to meet the requirements. The aggregated data is converted into a standard json data format by the data conversion processor, and the consumption processing is invoked from the message middleware. Specifically, it includes: obtaining data packets from the message middleware, and processing the data through Flume or Logstash. The package is split and reorganized to save the data objects in Elastic Search. The schematic diagram of the hot-pluggable component aggregator architecture is shown in Figure 3.
本发明所组建的一种基于多系统下可插拔式组件数据汇聚方法的系统,包括:A system based on the pluggable component data aggregation method under multiple systems formed by the present invention includes:
汇聚服务器,用于适配不同数据源结构,采集业务系统的运行数据,并将采集的数据包存储到消息中间件;The aggregation server is used to adapt to different data source structures, collect the operation data of the business system, and store the collected data packets in the message middleware;
消息中间件,用于将汇聚服务器采集的数据包存储到消息中间件,存储的是处理后结构化数据。The message middleware is used to store the data packets collected by the aggregation server into the message middleware, which stores the processed structured data.
消费服务器,用于从消息中间件中获取结构化数据包,通过消费处理器对数据实时消费,并将数据包存储到大数据实时库中即Elastic Search。The consumer server is used to obtain structured data packets from the message middleware, consume the data in real time through the consumer processor, and store the data packets in the big data real-time database, namely Elastic Search.
本实施例中,采集服务器通过把汇聚上来的业务数据以1000条/s数据量,每个数据包大小为1KB,再由数据转换处理器进行数据转换,形成结构化数据,转换速度以1000条/s,大小1KB数量为例,转换时间小于1秒。再由转换处理器将数据存储到消息中间件,存储到消息中间件的数据由消费处理器对其进行数据消费,其数据交互是通过Java通信实现的,所述汇聚服务器是兼容多种操作系统和多种操作平台,消息中间件、消费处理器所承载运行的服务器采用Windows、Linux、Centos、Unix等服务器。In this embodiment, the collection server converts the aggregated business data with a data volume of 1,000 pieces/s, and the size of each data packet is 1KB, and then performs data conversion by the data conversion processor to form structured data, and the conversion speed is 1,000 pieces. /s, the size is 1KB as an example, the conversion time is less than 1 second. The conversion processor stores the data in the message middleware, and the data stored in the message middleware is consumed by the consumer processor. The data interaction is realized through Java communication, and the aggregation server is compatible with a variety of operating systems. And a variety of operating platforms, the message middleware, the server running on the consumer processor adopts Windows, Linux, Centos, Unix and other servers.
本实例针对单一数据源进行数据适配,适配成功则采用此适配器,如适配失败则退出。与现有技术相比,增加数据适配器功能,具备数据自适配能力。同时采用实时数据处理架构,如实时中间件KAFKA、FLUME、Logstash、Elastic Search。能够保证数据在有效时间内进行处理需求。This example performs data adaptation for a single data source. If the adaptation is successful, this adapter is used, and if the adaptation fails, it exits. Compared with the prior art, the data adapter function is added, and the data self-adaptation capability is provided. At the same time, it adopts real-time data processing architecture, such as real-time middleware KAFKA, FLUME, Logstash, Elastic Search. It can ensure that the data needs to be processed within the effective time.
实施例2Example 2
实施例2是基于本发明所述系统架构所实现的一种系统示意,其框架结构示意图,如图4所示。汇聚系统通过访问被采集系统,如业务系统数据源可以是多组接入,然后通过数据适配器自动适配到汇聚器,如业务系统源数据到自动适配到相应的汇聚器上,其它数据源与此方式相同,通过汇聚器汇聚的数据包经由中间数据转换器进行数据转换,形成json数据,再由结构化数据处理器把json数据形成结构化数据,将格式化后数据推送到消息中间件,再由消费处理器和消息中间件进行通信,消费消息中间件中的数据,将消费后的数据推送到Elastic Search进行数据保存。Embodiment 2 is a schematic diagram of a system implemented based on the system architecture of the present invention, and a schematic diagram of its frame structure is shown in FIG. 4 . The aggregation system accesses the collected system. For example, the data source of the business system can be accessed by multiple groups, and then automatically adapts to the aggregator through the data adapter. For example, the source data of the business system is automatically adapted to the corresponding aggregator. Other data sources In the same way, the data packets gathered by the aggregator are converted through the intermediate data converter to form json data, and then the structured data processor converts the json data into structured data, and pushes the formatted data to the message middleware , and then the consumer processor communicates with the message middleware, consumes the data in the message middleware, and pushes the consumed data to Elastic Search for data storage.
其中,所述的业务系统数据源为企业应用中的数据源数据,数据适配器为根据数据源自动进行适配的中间组件;热插拔汇聚器为数据适配器后,所匹配对应的数据汇聚器;数据转换器为汇聚器所汇聚的数据包进行数据处理转换的组件;结构化处理器为数据所要形成结构化数据的必要组件;消息中间件处理器为将结构化数据推送到消息中间件组件;消息中间件为kafka;数据库为MySql数据库、oracle数据库等。Wherein, the business system data source is the data source data in the enterprise application, and the data adapter is an intermediate component that automatically adapts according to the data source; the hot-plug aggregator is the data aggregator corresponding to the data adapter; The data converter is the component that performs data processing and conversion for the data packets gathered by the aggregator; the structured processor is the necessary component for data to form structured data; the message middleware processor is the component that pushes the structured data to the message middleware; The message middleware is kafka; the database is MySql database, oracle database, etc.
其中所有要进行汇聚的业务系统都统称为业务系统数据源,所有的数据交易都会进行自动化适配,适配对应的汇聚器,汇聚处理器采用JAVA技术把采集到的数据与内存消息中间件进行数据交易,再由中间件kafka与消费服务器进行通信交易,最后由消费服务器将数据推送到Elastic Search中,完成整个链条化数据的归集。All business systems to be aggregated are collectively referred to as business system data sources. All data transactions will be automatically adapted to the corresponding aggregator. The aggregation processor uses JAVA technology to process the collected data with the memory message middleware. Data transaction, and then the middleware kafka communicates with the consumer server, and finally the consumer server pushes the data to Elastic Search to complete the collection of the entire chained data.
在本实施例中,基于JAVA语言编程技术开发汇聚服务器,也可以基于其它语言开发例如Python、Go等。扩展的汇聚器要符合微服务架构,并且本身汇聚器是一个微服务,在本发明中采用了多进程和多线程的方式进行并发处理,模块具备独立性和低耦合。In this embodiment, the convergence server is developed based on the JAVA language programming technology, and can also be developed based on other languages such as Python, Go, and the like. The extended aggregator should conform to the micro-service architecture, and the aggregator itself is a micro-service. In the present invention, a multi-process and multi-thread mode is adopted for concurrent processing, and the modules are independent and low-coupling.
本实例针对现有技术中存在不能处理的问题是实现数据处理零延时,对于采集处理器、消息中间件、消费处理器对于数据的处理都具有延时处理问题,无法实现数据的实时处理实时消费。本发明采用可适配多数据源汇聚器,适配多数据源适配器本身具备创新性、实用性特点。汇聚处理器的延时≤1秒,中间件处理器延时≤1秒,消费处理器延时≤1秒。本发明不需要通过在被采集业务系统中修改代码和加入插件等方式获取数据,对被采集的业务系统无感知,对采集系统无性能影响,无干扰,部署快速,且实施条件简单,兼容多种操作系统和多种操作平台。The problem that this example cannot handle in the prior art is to achieve zero delay in data processing. The acquisition processor, message middleware, and consumer processor all have delay processing problems for data processing, and real-time data processing cannot be realized in real time. Consumption. The invention adopts an adaptable multi-data source aggregator, and the adaptable multi-data source adapter itself has the characteristics of innovation and practicability. The latency of the aggregation processor is ≤1 second, the latency of the middleware processor is ≤1 second, and the latency of the consumer processor is ≤1 second. The present invention does not need to acquire data by modifying codes and adding plug-ins in the collected business system, has no perception of the collected business system, has no performance impact on the collection system, has no interference, has quick deployment, simple implementation conditions, and is compatible with many operating systems and multiple operating platforms.
Claims (8)
Priority Applications (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202010546939.3A CN111753007A (en) | 2020-06-16 | 2020-06-16 | A data aggregation system and aggregation method based on pluggable components under multi-system |
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202010546939.3A CN111753007A (en) | 2020-06-16 | 2020-06-16 | A data aggregation system and aggregation method based on pluggable components under multi-system |
Publications (1)
| Publication Number | Publication Date |
|---|---|
| CN111753007A true CN111753007A (en) | 2020-10-09 |
Family
ID=72675334
Family Applications (1)
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| CN202010546939.3A Pending CN111753007A (en) | 2020-06-16 | 2020-06-16 | A data aggregation system and aggregation method based on pluggable components under multi-system |
Country Status (1)
| Country | Link |
|---|---|
| CN (1) | CN111753007A (en) |
Cited By (2)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN113918549A (en) * | 2021-09-29 | 2022-01-11 | 重庆富民银行股份有限公司 | System and method for data conversion and protocol adaptation based on configuration mode |
| CN114089977A (en) * | 2021-11-16 | 2022-02-25 | 云镝智慧科技有限公司 | Receipt docking method and device for business system and computer equipment |
Citations (10)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| WO2007109047A2 (en) * | 2006-03-18 | 2007-09-27 | Peter Lankford | Content-aware routing of subscriptions for streaming and static data |
| WO2013075490A1 (en) * | 2011-11-25 | 2013-05-30 | 中兴通讯股份有限公司 | Method for implementing terminal adaptation processing, protocol adaptation module and terminal |
| CN104333485A (en) * | 2014-10-31 | 2015-02-04 | 北京思特奇信息技术股份有限公司 | Business data acquisition and analysis method and system based on interchanger total quantity |
| CN107071006A (en) * | 2017-03-27 | 2017-08-18 | 广州数字方舟信息技术股份有限公司 | A kind of acquisition system and acquisition method towards navigation channel big data |
| US20180253342A1 (en) * | 2017-03-03 | 2018-09-06 | International Business Machines Corporation | Discovery and exposure of transactional middleware server-based applications as consumable service endpoints |
| CN108549671A (en) * | 2018-03-28 | 2018-09-18 | 微梦创科网络科技(中国)有限公司 | Real time data acquisition and visual implementation method and device |
| CN109104487A (en) * | 2018-08-20 | 2018-12-28 | 浪潮软件股份有限公司 | Data transmission method based on logstack + kafka |
| CN109918148A (en) * | 2019-02-21 | 2019-06-21 | 上海伊巢网络科技有限公司 | The component dynamic pluggable system of Internet of things middleware |
| CN110083436A (en) * | 2019-05-14 | 2019-08-02 | 上海理想信息产业(集团)有限公司 | A kind of business datum real-time monitoring system and method based on Java bytecode enhancing technology |
| CN110351315A (en) * | 2018-04-03 | 2019-10-18 | 中兴通讯股份有限公司 | Method, system and storage medium, the electronic device of data processing |
-
2020
- 2020-06-16 CN CN202010546939.3A patent/CN111753007A/en active Pending
Patent Citations (10)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| WO2007109047A2 (en) * | 2006-03-18 | 2007-09-27 | Peter Lankford | Content-aware routing of subscriptions for streaming and static data |
| WO2013075490A1 (en) * | 2011-11-25 | 2013-05-30 | 中兴通讯股份有限公司 | Method for implementing terminal adaptation processing, protocol adaptation module and terminal |
| CN104333485A (en) * | 2014-10-31 | 2015-02-04 | 北京思特奇信息技术股份有限公司 | Business data acquisition and analysis method and system based on interchanger total quantity |
| US20180253342A1 (en) * | 2017-03-03 | 2018-09-06 | International Business Machines Corporation | Discovery and exposure of transactional middleware server-based applications as consumable service endpoints |
| CN107071006A (en) * | 2017-03-27 | 2017-08-18 | 广州数字方舟信息技术股份有限公司 | A kind of acquisition system and acquisition method towards navigation channel big data |
| CN108549671A (en) * | 2018-03-28 | 2018-09-18 | 微梦创科网络科技(中国)有限公司 | Real time data acquisition and visual implementation method and device |
| CN110351315A (en) * | 2018-04-03 | 2019-10-18 | 中兴通讯股份有限公司 | Method, system and storage medium, the electronic device of data processing |
| CN109104487A (en) * | 2018-08-20 | 2018-12-28 | 浪潮软件股份有限公司 | Data transmission method based on logstack + kafka |
| CN109918148A (en) * | 2019-02-21 | 2019-06-21 | 上海伊巢网络科技有限公司 | The component dynamic pluggable system of Internet of things middleware |
| CN110083436A (en) * | 2019-05-14 | 2019-08-02 | 上海理想信息产业(集团)有限公司 | A kind of business datum real-time monitoring system and method based on Java bytecode enhancing technology |
Cited By (2)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN113918549A (en) * | 2021-09-29 | 2022-01-11 | 重庆富民银行股份有限公司 | System and method for data conversion and protocol adaptation based on configuration mode |
| CN114089977A (en) * | 2021-11-16 | 2022-02-25 | 云镝智慧科技有限公司 | Receipt docking method and device for business system and computer equipment |
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| CN106534257B (en) | A multi-source security log collection system and method with a multi-level cluster architecture | |
| CN111737329A (en) | Unified data acquisition platform for rail transit | |
| CN102739452A (en) | Method and system for monitoring resources | |
| CN105119752A (en) | Distributed log acquisition method, device and system | |
| CN104410662A (en) | Parallel mass data transmitting middleware of Internet of things and working method thereof | |
| CN103647685B (en) | A kind of test result information uploads receiving handling method | |
| CN106506519A (en) | System and method for cross-platform communication of WCF framework net.tcp protocol | |
| CN113703997A (en) | Bidirectional asynchronous communication middleware system integrating multiple message agents and implementation method | |
| CN115686540A (en) | RPA control method and system based on Hongmeng system | |
| CN111753007A (en) | A data aggregation system and aggregation method based on pluggable components under multi-system | |
| CN111343239B (en) | Communication request processing method, communication request processing device and transaction system | |
| CN111190875A (en) | Log aggregation method and device based on container platform | |
| WO2025118967A1 (en) | Big data access method based on protocol adaptation and scheduling in seismological industry | |
| CN103544060A (en) | WEBSERVICE based service dispatching system and method | |
| CN116244141A (en) | A method and system for real-time monitoring of microservice network infrastructure | |
| CN115938397A (en) | Speech recognition method, device, equipment and storage medium | |
| CN113971200B (en) | A map service traffic recording system and method for a cloud native platform | |
| CN113297148B (en) | Method, device and equipment for collecting service log data and readable storage medium | |
| CN104217314B (en) | Routing iinformation grasping means and device | |
| CN118642872B (en) | Multi-source fusion OPC data acquisition system and method based on track system | |
| CN114003410A (en) | Message-based flink real-time service scheduling configuration method and system | |
| CN109413185A (en) | A kind of equipment routing inspection system and its Cloud Server design method | |
| CN108809725A (en) | A kind of collection method and device of journal file | |
| CN112131070A (en) | Call relationship tracking method, apparatus, device, and computer-readable storage medium | |
| CN118474183A (en) | Data transmission method, CUC architecture, storage medium and program product based on data analysis |
Legal Events
| Date | Code | Title | Description |
|---|---|---|---|
| PB01 | Publication | ||
| PB01 | Publication | ||
| SE01 | Entry into force of request for substantive examination | ||
| SE01 | Entry into force of request for substantive examination | ||
| RJ01 | Rejection of invention patent application after publication |
Application publication date: 20201009 |
|
| RJ01 | Rejection of invention patent application after publication |