[go: up one dir, main page]

CN111753007A - A data aggregation system and aggregation method based on pluggable components under multi-system - Google Patents

A data aggregation system and aggregation method based on pluggable components under multi-system Download PDF

Info

Publication number
CN111753007A
CN111753007A CN202010546939.3A CN202010546939A CN111753007A CN 111753007 A CN111753007 A CN 111753007A CN 202010546939 A CN202010546939 A CN 202010546939A CN 111753007 A CN111753007 A CN 111753007A
Authority
CN
China
Prior art keywords
data
aggregation
message middleware
server
processor
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Pending
Application number
CN202010546939.3A
Other languages
Chinese (zh)
Inventor
邓志东
刘旭生
孙林檀
李子乾
唐振营
杨自兴
徐胤
杨睿
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Customer Service Center of State Grid Corp of China
Original Assignee
Customer Service Center of State Grid Corp of China
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Customer Service Center of State Grid Corp of China filed Critical Customer Service Center of State Grid Corp of China
Priority to CN202010546939.3A priority Critical patent/CN111753007A/en
Publication of CN111753007A publication Critical patent/CN111753007A/en
Pending legal-status Critical Current

Links

Images

Classifications

    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F16/00Information retrieval; Database structures therefor; File system structures therefor
    • G06F16/20Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
    • G06F16/25Integrating or interfacing systems involving database management systems
    • G06F16/252Integrating or interfacing systems involving database management systems between a Database Management System and a front-end application
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F16/00Information retrieval; Database structures therefor; File system structures therefor
    • G06F16/20Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
    • G06F16/25Integrating or interfacing systems involving database management systems
    • G06F16/258Data format conversion from or to a database
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/54Interprogram communication
    • G06F9/546Message passing systems or structures, e.g. queues
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F2209/00Indexing scheme relating to G06F9/00
    • G06F2209/54Indexing scheme relating to G06F9/54
    • G06F2209/547Messaging middleware
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F2209/00Indexing scheme relating to G06F9/00
    • G06F2209/54Indexing scheme relating to G06F9/54
    • G06F2209/548Queue

Landscapes

  • Engineering & Computer Science (AREA)
  • Databases & Information Systems (AREA)
  • Theoretical Computer Science (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • Data Mining & Analysis (AREA)
  • Software Systems (AREA)
  • Computer And Data Communications (AREA)

Abstract

The invention discloses a pluggable component data aggregation system and an aggregation method based on multiple systems, wherein the method adapts a universal aggregation data method of multiple data sources, multiple protocols and multiple interface data sources through an aggregation collector of the data aggregation system, a target data source is aggregated through a hot-pluggable aggregation model device, the acquired data is converted into standard structured data through an intermediate data model conversion layer, a data stream is stored in a message middleware, and the data stream is called from the message middleware for analysis and processing. The invention does not need to collect the target data by modifying codes and adding plug-ins in the collected business system, has no perception to the collected business system, has no influence on the performance of the collection system, has no interference, is quick to deploy, has simple implementation conditions, and is compatible with various operating systems and various system platforms.

Description

一种基于多系统下可插拔式组件数据汇聚系统及汇聚方法A data aggregation system and aggregation method based on pluggable components under multi-system

技术领域technical field

本发明涉及多种协议下多数据源的数据处理技术,涉及多系统下如何适配不同数据源,不同结构汇聚方法,具体涉及一种基于多系统下可插拔式组件数据汇聚系统及汇聚方法。The present invention relates to a data processing technology of multiple data sources under multiple protocols, how to adapt different data sources under multiple systems, and different structure aggregation methods, in particular to a data aggregation system and aggregation method based on pluggable components under multiple systems .

背景技术Background technique

当前在企业大数据分析应用或互联网数据应用中对数据源的适配都是需要一对一定制化开发进行适配,耗费大量的时间和工作量,为了解决在企业、互联网应用中多数据源下如何自动化适配接入运行数据的汇聚技术方法,尤其是对多业务系统的数据源进行数据归集汇聚等,目前市场上通常是采用定制化开发针对不同业务系统进行对接和适配,定制化开发这种方式对数据需求方和数据接入方的人力、物力、时间影响很大,增加了被接入系统数据的时间风险和技术风险。At present, the adaptation of data sources in enterprise big data analysis applications or Internet data applications requires one-to-one customized development, which consumes a lot of time and workload. In order to solve the problem of multiple data sources in enterprise and Internet applications The following is how to automatically adapt the aggregation technology method of access operation data, especially the data collection and aggregation of data sources of multi-service systems. At present, customized development is usually used in the market to connect and adapt to different business systems. This method of development has a great impact on the manpower, material resources, and time of the data demander and data access party, and increases the time risk and technical risk of the data being connected to the system.

发明内容SUMMARY OF THE INVENTION

发明目的:针对复杂环境解决中需要通过汇聚多业务系统下、不同协议的业务数据问题,本发明的第一目的是提供一种多系统下可插拔式组件数据汇聚系统。同时,本发明的第二目的是提供一种多系统下可插拔式组件数据汇聚方法。Purpose of the invention: Aiming at the problem of needing to aggregate business data of different protocols under multi-service systems in solving complex environments, the first object of the present invention is to provide a pluggable component data aggregation system under multi-system. Meanwhile, the second object of the present invention is to provide a method for data aggregation of pluggable components under multiple systems.

为实现上述目的,本发明所提供的技术方案如下:For achieving the above object, the technical scheme provided by the present invention is as follows:

一种基于多系统下可插拔式组件数据采集系统,包括汇聚服务器、消息中间件、转换处理器和消费处理器;A data acquisition system based on pluggable components under multiple systems, comprising a convergence server, a message middleware, a conversion processor and a consumption processor;

汇聚服务器,部署有可插拔的汇聚组件,用于适配不同系统、不同结构数据源,通过汇聚组件采集业务系统数据源系统的数据流,并将采集的数据流归集到消息中间件;The aggregation server is deployed with pluggable aggregation components, which are used to adapt to different systems and data sources of different structures, collect the data streams of the business system data source systems through the aggregation components, and collect the collected data streams into the message middleware;

消息中间件,用于存储汇聚服务器采集的数据流和经过转换处理器处理后结构化数据;The message middleware is used to store the data stream collected by the aggregation server and the structured data processed by the conversion processor;

转换处理器,基于数据源格式的通用匹配数据格式处理器,将源数据处理成结构化数据,格式为标准json数据;The conversion processor is a general matching data format processor based on the data source format, which processes the source data into structured data, and the format is standard json data;

消费处理器,基于从消息中间件中获取结构化数据流,通过消费处理器进行实时调用消费并将数据流向到大数据实时库。The consumer processor, based on the structured data stream obtained from the message middleware, makes real-time call consumption through the consumer processor and streams the data to the big data real-time library.

进一步,所述汇聚服务器为云服务器或物理服务器,云服务器或物理服务器与被采集业务系统网络互通,数据端口开放共享,且网络协议中支持主流TCP协议。Further, the aggregation server is a cloud server or a physical server, the cloud server or physical server and the collected business system network communicate with each other, the data port is open and shared, and the network protocol supports the mainstream TCP protocol.

更进一步,所述汇聚服务器、消息中间件和消费处理器均能兼容搭载两种及其以上的操作系统的服务器设备,实现两种及其以上不同操作系统下的数据汇聚,操作系统包括Windows操作系统、Unix操作系统、linux操作系统。Further, the aggregation server, message middleware and consumer processor are all compatible with server devices equipped with two or more operating systems, and realize data aggregation under two or more different operating systems. The operating systems include Windows operation. system, Unix operating system, linux operating system.

更进一步,所述消息中间件为kafka、配置数据库为MySql、Oracle。Further, the message middleware is kafka, and the configuration database is MySql and Oracle.

实施上述汇聚系统的一种基于多系统下可插拔式组件数据汇聚方法,所述方法包括如下步骤:A method for data aggregation based on pluggable components under multiple systems for implementing the above aggregation system, the method includes the following steps:

(1)汇聚系统组建:开放业务系统的数据协议接口,将汇聚服务器与被接入业务系统数据流进行接口适配,并将采集的数据流经转换处理器进行数据转换,形成结构化数据存储到消息中间件,并调用消息中间件进行消息消费处理;(1) Convergence system establishment: open the data protocol interface of the business system, adapt the interface between the convergence server and the data flow of the connected business system, and convert the collected data flow through the conversion processor to form a structured data storage Go to the message middleware, and call the message middleware for message consumption processing;

(2)数据汇聚处理:汇聚服务器通过Filebeat收集被采集业务系统的数据流,数据流经过数据转换器进行过滤处理后传送给消息中间件中的kafka或集群进行存储,通过Logstash工具再到消息中间件中的kafka中获取数据,传给Elasticsearch工具进行数据分析处理,最后到kibana工具进行展示。(2) Data aggregation processing: The aggregation server collects the data stream of the collected business system through Filebeat. The data stream is filtered and processed by the data converter and then sent to the kafka or cluster in the message middleware for storage, and then to the message center through the Logstash tool. The data is obtained from the kafka in the file, passed to the Elasticsearch tool for data analysis and processing, and finally displayed to the kibana tool.

进一步,步骤(1)中,汇聚服务器适配被采集业务系统的数据,并将汇聚的数据经过数据转换处理器转换形成结构化数据,并将结构化数据存储到消息中间件,具体方式如下:Further, in step (1), the aggregation server is adapted to the data of the collected business system, and the aggregated data is converted into structured data by the data conversion processor, and the structured data is stored in the message middleware. The specific methods are as follows:

基于Java语言调用SocketServer、RestfulAPI、JDBC不同协议的数据流,并将不同结构的数据经过统一结构处理器转换封装成标准的结构化数据对象,将数据对象再次封装成统一结构化的json数据,并对数据进行标签化,然后将数据对象存储至消息中间件。Call data streams of different protocols of SocketServer, RestfulAPI, and JDBC based on Java language, and convert and encapsulate data of different structures into standard structured data objects through a unified structure processor, and re-encapsulate the data objects into unified structured json data. Tag the data and store the data objects in the message middleware.

进一步,消费处理器从消息中间件获取数据时,可以采用多进程及多线程的方式对数据对象进行处理,消息中间件采用kafka,实时数据库采用ElasticSearch7.0进行数据存储。Further, when the consumer processor obtains data from the message middleware, it can process the data objects in a multi-process and multi-threaded manner. The message middleware adopts kafka, and the real-time database adopts ElasticSearch7.0 for data storage.

更进一步,所述方法汇聚的各类数据经过数据转换处理器进行数据结构化转换,形成标准的json数据格式,从消息中间件中调用数据进行消费处理具体方式如下:Further, the various types of data gathered by the method are subjected to data structure conversion through a data conversion processor to form a standard json data format, and the specific method of calling data from the message middleware for consumption processing is as follows:

首先从消息中间件中获取数据流,通过Flume或Logstash对数据包进行拆分和重组,进而保存数据对象至Elastic Search中。First, obtain the data stream from the message middleware, split and reorganize the data packets through Flume or Logstash, and then save the data objects to Elastic Search.

有益效果:与现有技术对比,本发明显著效果如下:Beneficial effect: Compared with the prior art, the remarkable effect of the present invention is as follows:

(1)本发明所述汇聚系统通过在汇聚服务器部署热插拔式管理的汇聚组件可以实现自动化适配业务数据采集,节省大量的时间和工作量;同时支持兼容Windows、Centos、Unix、Linux操作系统,也可以兼容32位、64位操作平台,适配不同业务系统汇聚需求,比现有技术更灵活、更便捷,具有覆盖范围广、技术接入简化,自动化适配效果强效果;(1) The convergence system of the present invention can realize automatic adaptation of business data collection by deploying hot-swappable management convergence components on the convergence server, saving a lot of time and workload; at the same time, it supports compatible Windows, Centos, Unix, and Linux operations. The system is also compatible with 32-bit and 64-bit operating platforms, adapting to the convergence requirements of different business systems, more flexible and more convenient than the existing technology, with wide coverage, simplified technical access, and strong automatic adaptation effect;

(2)基于汇聚系统所提供的汇聚方法能够根据业务场景自动化适配数据汇聚服务器,无需大量时间、工作量和技术攻关,和数据需求研发适配工作,对解决业务系统运行数据开始快速的数据接入、数据分析、数据挖掘都提供了非常便利的条件,且对被汇聚的业务系统无干扰,不需要进行嵌入式集成编码,采用分布式架构,以微服务组件式独立存在,且部署快速,实施条件简单、支持扩展性,可以通过插件形式进行新汇聚器的安装、同时也支持汇聚器卸载能力;另一方面,所述方法采用多平台、多进程、多线程架构,可以兼容Windows、Centos、Unix、Linux操作系统,也可以兼容32位、64位操作平台,适配不同业务系统数据汇聚需求,为企事业单位和互联网企业应用等提供了对接多业务系统多数据结构方法。(2) Based on the aggregation method provided by the aggregation system, the data aggregation server can be automatically adapted according to the business scenario, without requiring a lot of time, workload and technical research, and the R&D adaptation work to meet the data requirements. Access, data analysis, and data mining all provide very convenient conditions, and do not interfere with the aggregated business systems, do not require embedded integration coding, adopt a distributed architecture, exist independently in the form of micro-service components, and deploy quickly , the implementation conditions are simple, the expansion is supported, the new aggregator can be installed in the form of a plug-in, and the ability to uninstall the aggregator is also supported; on the other hand, the method adopts a multi-platform, multi-process and multi-thread architecture, which is compatible with Windows, Centos, Unix, and Linux operating systems are also compatible with 32-bit and 64-bit operating platforms, adapting to the data aggregation requirements of different business systems, providing a method for connecting multiple business systems and multiple data structures for enterprises, institutions and Internet enterprise applications.

附图说明Description of drawings

图1为本发明所述方法的主要流程结构图;Fig. 1 is the main flow chart of the method of the present invention;

图2为本发明所述方法的具体流程示意图;Fig. 2 is the concrete schematic flow chart of the method of the present invention;

图3为本发明热插拔组件汇聚器架构示意图;FIG. 3 is a schematic diagram of the architecture of the hot-pluggable component aggregator according to the present invention;

图4为本发明实施例2中应用系统的框架示意图。FIG. 4 is a schematic diagram of a framework of an application system in Embodiment 2 of the present invention.

具体实施方式Detailed ways

为详细说明本发明所公开的技术方案,以下结合附图对本发明做具体的说明。In order to describe the technical solutions disclosed by the present invention in detail, the present invention is described in detail below with reference to the accompanying drawings.

本发明所提供的是一种基于多系统下可插拔式组件数据汇聚系统及汇聚方法,所述方法通过数据汇聚系统的汇聚采集器,适配多数据源、多协议化、多接口数据源的一种通用型数据的方法,通过可热插拔的汇聚模型器采集目标数据源、采集反馈回的数据通过中间数据模型转换层转换成标准化结构数据,并将数据流存储到消息中间件,再从消息中间件中调用数据流进行分析处理。具体的,通过适配业务系统的协议接口,汇聚被采集的业务系统的数据流,并将采集的数据流通过数据转换处理器形成结构化数据,并将结构化的数据存储到消息中间件,再从消息中间件中调用数据进行数据消费处理。The invention provides a data aggregation system and aggregation method based on pluggable components under multi-system. The method adapts to multi-data sources, multi-protocol and multi-interface data sources through the aggregation collector of the data aggregation system. A general-purpose data method, collects the target data source through a hot-pluggable aggregation modeler, collects the feedback data and converts it into standardized structured data through the intermediate data model conversion layer, and stores the data stream in the message middleware, Then call the data flow from the message middleware for analysis and processing. Specifically, by adapting the protocol interface of the business system, the collected data flow of the business system is aggregated, and the collected data flow is formed into structured data through the data conversion processor, and the structured data is stored in the message middleware, Then call the data from the message middleware for data consumption processing.

在上述技术方案的基础上,本实施例根据收集的业务系统需求进行数据源适配,选用与数据源相匹配的适配器。On the basis of the above technical solutions, this embodiment performs data source adaptation according to the collected business system requirements, and selects an adapter that matches the data source.

实施例1Example 1

如图1和图2所示,本发明所提供的一种基于多系统下可插拔式组件数据汇聚方法,具体步骤如下:As shown in Figure 1 and Figure 2, a method for data aggregation based on pluggable components under multiple systems provided by the present invention, the specific steps are as follows:

首先根据业务需求汇聚被采集业务系统的数据。先通过被采集端的数据进行适配,通过已有的汇聚器作为数据适配条件进行数据结构适配,被采集的业务系统数据汇聚器,通过汇聚器汇聚业务系统数据并由数据转换器进行结构化数据转换,形成结构化json数据,再由消息中间件将结构化数据存储到内存消息中间件,再由消费处理器调用结构化数据进行消费处理。Firstly, the data of the collected business system is aggregated according to business requirements. First, it is adapted through the data of the collected end, and the data structure is adapted through the existing aggregator as the data adaptation condition. The collected business system data aggregator aggregates the business system data through the aggregator and is structured by the data converter. The data is converted into structured json data, and then the message middleware stores the structured data in the memory message middleware, and then the consumer processor calls the structured data for consumption processing.

其中,采集汇聚器采用微服务架构的微服务,具体包括采用Java语言及其数据处理方式调用SocketServer、restfulAPI、JDBC等不同协议的数据流,并将不同结构的数据经过统一结构处理器转换封装成标准的结构化数据对象,将数据对象封装成统一结构化的json数据,并对数据进行标签化,然后将数据对象存储至消息中间件。Among them, the collection aggregator adopts microservices with microservice architecture, which specifically includes using Java language and its data processing methods to call data streams of different protocols such as SocketServer, restfulAPI, JDBC, etc., and convert and encapsulate data of different structures through a unified structure processor into Standard structured data objects, encapsulate the data objects into unified structured json data, label the data, and then store the data objects in the message middleware.

对应的,上述方法实施做组建的一种基于多系统下可插拔式组件数据采集系统,包括如下几个部分:Correspondingly, a multi-system-based pluggable component data acquisition system constructed by implementing the above method includes the following parts:

汇聚服务器,用于适配不同数据源结构,采集业务系统的运行数据,并将采集的数据包存储到消息中间件。The aggregation server is used to adapt to different data source structures, collect the operation data of the business system, and store the collected data packets in the message middleware.

消息中间件,用于将汇聚服务器采集的数据包存储到消息中间件,存储的是处理后结构化数据。The message middleware is used to store the data packets collected by the aggregation server into the message middleware, which stores the processed structured data.

转换处理器:是基于数据源格式的通用匹配数据格式处理器,将源数据处理成结构化数据,格式为json数据。Conversion processor: It is a general matching data format processor based on the data source format, which processes the source data into structured data and the format is json data.

消费处理器,用于从消息中间件中获取结构化数据包,通过消费处理器进行实时消费,并将数据包存储到大数据实时库中即Elastic Search。The consumption processor is used to obtain structured data packets from the message middleware, consume them in real time through the consumption processor, and store the data packets in the big data real-time database, namely Elastic Search.

其中,所述汇聚服务器、消息中间件和消费处理器均能兼容多种操作系统,包括Windows操作系统、Unix操作系统和Linux操作系统,且均能兼容多种操作平台。Wherein, the convergence server, message middleware and consumer processor are all compatible with various operating systems, including Windows operating system, Unix operating system and Linux operating system, and are compatible with various operating platforms.

其中,汇聚业务系统的数据包,并将采集的数据包通过数据转换处理器进行结构化处理,并通过消息中间存储到消息中间件,具体包括:采用Java技术调用SocketServer、restfulAPI、JDBC等不同协议数据流,并将不同结构的数据包经过统一结构处理器转换,封装成标准的结构化数据对象,再将数据对象封装成统一结构化json数据,并对json数据进行标签化,然后将数据对象存储至消息中间件。并且汇聚与被接入业务系统的数据包,支持多数据包并发汇聚,通过热插拔组件管理器对适配的汇聚器进行管理,由热插拔管理器对汇聚器进行组织协调、启动和停止,并可以支持在不停止应用情况下对组件进行热插拔管理,支持对新汇聚器进行安装、卸载管理,热插拔汇聚器只需要支持微服务架构就满足要求。汇聚的各种数据经过数据转换处理器进行数据结构化转换,形成标准的json数据格式,从消息中间件中调用消费处理,具体包括:从消息中间件中获取数据包,通过Flume或Logstash对数据包进行拆分和重组,进而保存数据对象到Elastic Search中。热插拔组件汇聚器架构示意图如图3所示。Among them, the data packets of the business system are aggregated, and the collected data packets are structured and processed through the data conversion processor, and stored in the message middleware through the message intermediate, which includes: using Java technology to call different protocols such as SocketServer, restfulAPI, JDBC, etc. data flow, and convert the data packets of different structures through the unified structure processor, encapsulate them into standard structured data objects, and then encapsulate the data objects into unified structured json data, label the json data, and then convert the data objects into Store to message middleware. And it aggregates and accesses the data packets of the business system, supports concurrent aggregation of multiple data packets, manages the adapted aggregator through the hot-plug component manager, and organizes, coordinates, starts and manages the aggregator by the hot-pluggable component manager. Stop, and can support hot-plug management of components without stopping the application, support installation and uninstall management of new aggregators, and hot-plug aggregators only need to support the microservice architecture to meet the requirements. The aggregated data is converted into a standard json data format by the data conversion processor, and the consumption processing is invoked from the message middleware. Specifically, it includes: obtaining data packets from the message middleware, and processing the data through Flume or Logstash. The package is split and reorganized to save the data objects in Elastic Search. The schematic diagram of the hot-pluggable component aggregator architecture is shown in Figure 3.

本发明所组建的一种基于多系统下可插拔式组件数据汇聚方法的系统,包括:A system based on the pluggable component data aggregation method under multiple systems formed by the present invention includes:

汇聚服务器,用于适配不同数据源结构,采集业务系统的运行数据,并将采集的数据包存储到消息中间件;The aggregation server is used to adapt to different data source structures, collect the operation data of the business system, and store the collected data packets in the message middleware;

消息中间件,用于将汇聚服务器采集的数据包存储到消息中间件,存储的是处理后结构化数据。The message middleware is used to store the data packets collected by the aggregation server into the message middleware, which stores the processed structured data.

消费服务器,用于从消息中间件中获取结构化数据包,通过消费处理器对数据实时消费,并将数据包存储到大数据实时库中即Elastic Search。The consumer server is used to obtain structured data packets from the message middleware, consume the data in real time through the consumer processor, and store the data packets in the big data real-time database, namely Elastic Search.

本实施例中,采集服务器通过把汇聚上来的业务数据以1000条/s数据量,每个数据包大小为1KB,再由数据转换处理器进行数据转换,形成结构化数据,转换速度以1000条/s,大小1KB数量为例,转换时间小于1秒。再由转换处理器将数据存储到消息中间件,存储到消息中间件的数据由消费处理器对其进行数据消费,其数据交互是通过Java通信实现的,所述汇聚服务器是兼容多种操作系统和多种操作平台,消息中间件、消费处理器所承载运行的服务器采用Windows、Linux、Centos、Unix等服务器。In this embodiment, the collection server converts the aggregated business data with a data volume of 1,000 pieces/s, and the size of each data packet is 1KB, and then performs data conversion by the data conversion processor to form structured data, and the conversion speed is 1,000 pieces. /s, the size is 1KB as an example, the conversion time is less than 1 second. The conversion processor stores the data in the message middleware, and the data stored in the message middleware is consumed by the consumer processor. The data interaction is realized through Java communication, and the aggregation server is compatible with a variety of operating systems. And a variety of operating platforms, the message middleware, the server running on the consumer processor adopts Windows, Linux, Centos, Unix and other servers.

本实例针对单一数据源进行数据适配,适配成功则采用此适配器,如适配失败则退出。与现有技术相比,增加数据适配器功能,具备数据自适配能力。同时采用实时数据处理架构,如实时中间件KAFKA、FLUME、Logstash、Elastic Search。能够保证数据在有效时间内进行处理需求。This example performs data adaptation for a single data source. If the adaptation is successful, this adapter is used, and if the adaptation fails, it exits. Compared with the prior art, the data adapter function is added, and the data self-adaptation capability is provided. At the same time, it adopts real-time data processing architecture, such as real-time middleware KAFKA, FLUME, Logstash, Elastic Search. It can ensure that the data needs to be processed within the effective time.

实施例2Example 2

实施例2是基于本发明所述系统架构所实现的一种系统示意,其框架结构示意图,如图4所示。汇聚系统通过访问被采集系统,如业务系统数据源可以是多组接入,然后通过数据适配器自动适配到汇聚器,如业务系统源数据到自动适配到相应的汇聚器上,其它数据源与此方式相同,通过汇聚器汇聚的数据包经由中间数据转换器进行数据转换,形成json数据,再由结构化数据处理器把json数据形成结构化数据,将格式化后数据推送到消息中间件,再由消费处理器和消息中间件进行通信,消费消息中间件中的数据,将消费后的数据推送到Elastic Search进行数据保存。Embodiment 2 is a schematic diagram of a system implemented based on the system architecture of the present invention, and a schematic diagram of its frame structure is shown in FIG. 4 . The aggregation system accesses the collected system. For example, the data source of the business system can be accessed by multiple groups, and then automatically adapts to the aggregator through the data adapter. For example, the source data of the business system is automatically adapted to the corresponding aggregator. Other data sources In the same way, the data packets gathered by the aggregator are converted through the intermediate data converter to form json data, and then the structured data processor converts the json data into structured data, and pushes the formatted data to the message middleware , and then the consumer processor communicates with the message middleware, consumes the data in the message middleware, and pushes the consumed data to Elastic Search for data storage.

其中,所述的业务系统数据源为企业应用中的数据源数据,数据适配器为根据数据源自动进行适配的中间组件;热插拔汇聚器为数据适配器后,所匹配对应的数据汇聚器;数据转换器为汇聚器所汇聚的数据包进行数据处理转换的组件;结构化处理器为数据所要形成结构化数据的必要组件;消息中间件处理器为将结构化数据推送到消息中间件组件;消息中间件为kafka;数据库为MySql数据库、oracle数据库等。Wherein, the business system data source is the data source data in the enterprise application, and the data adapter is an intermediate component that automatically adapts according to the data source; the hot-plug aggregator is the data aggregator corresponding to the data adapter; The data converter is the component that performs data processing and conversion for the data packets gathered by the aggregator; the structured processor is the necessary component for data to form structured data; the message middleware processor is the component that pushes the structured data to the message middleware; The message middleware is kafka; the database is MySql database, oracle database, etc.

其中所有要进行汇聚的业务系统都统称为业务系统数据源,所有的数据交易都会进行自动化适配,适配对应的汇聚器,汇聚处理器采用JAVA技术把采集到的数据与内存消息中间件进行数据交易,再由中间件kafka与消费服务器进行通信交易,最后由消费服务器将数据推送到Elastic Search中,完成整个链条化数据的归集。All business systems to be aggregated are collectively referred to as business system data sources. All data transactions will be automatically adapted to the corresponding aggregator. The aggregation processor uses JAVA technology to process the collected data with the memory message middleware. Data transaction, and then the middleware kafka communicates with the consumer server, and finally the consumer server pushes the data to Elastic Search to complete the collection of the entire chained data.

在本实施例中,基于JAVA语言编程技术开发汇聚服务器,也可以基于其它语言开发例如Python、Go等。扩展的汇聚器要符合微服务架构,并且本身汇聚器是一个微服务,在本发明中采用了多进程和多线程的方式进行并发处理,模块具备独立性和低耦合。In this embodiment, the convergence server is developed based on the JAVA language programming technology, and can also be developed based on other languages such as Python, Go, and the like. The extended aggregator should conform to the micro-service architecture, and the aggregator itself is a micro-service. In the present invention, a multi-process and multi-thread mode is adopted for concurrent processing, and the modules are independent and low-coupling.

本实例针对现有技术中存在不能处理的问题是实现数据处理零延时,对于采集处理器、消息中间件、消费处理器对于数据的处理都具有延时处理问题,无法实现数据的实时处理实时消费。本发明采用可适配多数据源汇聚器,适配多数据源适配器本身具备创新性、实用性特点。汇聚处理器的延时≤1秒,中间件处理器延时≤1秒,消费处理器延时≤1秒。本发明不需要通过在被采集业务系统中修改代码和加入插件等方式获取数据,对被采集的业务系统无感知,对采集系统无性能影响,无干扰,部署快速,且实施条件简单,兼容多种操作系统和多种操作平台。The problem that this example cannot handle in the prior art is to achieve zero delay in data processing. The acquisition processor, message middleware, and consumer processor all have delay processing problems for data processing, and real-time data processing cannot be realized in real time. Consumption. The invention adopts an adaptable multi-data source aggregator, and the adaptable multi-data source adapter itself has the characteristics of innovation and practicability. The latency of the aggregation processor is ≤1 second, the latency of the middleware processor is ≤1 second, and the latency of the consumer processor is ≤1 second. The present invention does not need to acquire data by modifying codes and adding plug-ins in the collected business system, has no perception of the collected business system, has no performance impact on the collection system, has no interference, has quick deployment, simple implementation conditions, and is compatible with many operating systems and multiple operating platforms.

Claims (8)

1. A pluggable component data aggregation system based on multiple systems is characterized by comprising an aggregation server, a message middleware, a conversion processor and a consumption processor;
the convergence server is deployed with a pluggable convergence component and is used for adapting to data sources of different systems and different structures, collecting data streams of a service system data source system through the convergence component and collecting the collected data streams to a message middleware;
the message middleware is used for storing the data stream acquired by the convergence server and the structured data processed by the conversion processor;
the conversion processor is used for processing the source data into structured data based on a general matching data format processor of a data source format, wherein the format is standard json data;
and the consumption processor acquires the structured data stream from the message middleware, calls consumption in real time through the consumption processor and flows the data stream to the big data real-time library.
2. The multisystem-based pluggable component data aggregation system according to claim 1, wherein the aggregation server is a cloud server or a physical server, the cloud server or the physical server is in network intercommunication with the collected server system, a data port is open and shared, and a mainstream TCP protocol is supported in a network protocol.
3. The multisystem-based pluggable component data aggregation system according to claim 1, wherein the aggregation server, the message middleware, and the consumption processor are all capable of processing data sources from different operating systems, so as to realize data aggregation under two or more operating systems, and the operating systems include a Windows operating system, a Unix operating system, and a linux operating system.
4. The multi-system based pluggable component data aggregation system of claim 1, wherein the message middleware is kafka and the configuration database is MySql or Oracle.
5. A pluggable component data aggregation method based on multiple systems is characterized by comprising the following steps:
(1) the construction of a convergence system: opening a data protocol interface of a service system, carrying out interface adaptation on a convergence server and a data stream of the accessed service system, carrying out data conversion on the acquired data stream through a conversion processor to form structured data, storing the structured data into a message middleware, and calling the message middleware to carry out message consumption processing;
(2) data aggregation processing: the convergence server collects data flow of the collected service system through the Filebeat, the data flow is transmitted to kafka or a cluster in the message middleware for storage after being filtered by the data converter, data are acquired from the kafka in the message middleware through the Logstash tool and transmitted to the Elasticisarch tool for data analysis and processing, and finally the data are displayed through the kibana tool.
6. The method for converging data of pluggable components based on multiple systems according to claim 5, wherein the structured data formed in the step (1) is processed as follows:
calling different protocol data streams of a SocketServer, a restfulAPI and JDBC based on Java language, converting and packaging different structural data into a standard structured data object through a unified structure processor, packaging the data object into unified structured json data, labeling the data, and storing the data object to a message middleware.
7. The method for data aggregation based on pluggable components under multiple systems according to claim 5, wherein the message middle in step (1) is kafka, elastic search is used for data storage, and a data object packaged in the message middleware is called in a multi-process and multi-thread manner.
8. The method for data aggregation based on pluggable components under multiple systems according to claim 5, wherein the Logstash in step (2) includes splitting and reassembling the data stream when acquiring the message middleware data, and storing the processed data object in an Elastic Search.
CN202010546939.3A 2020-06-16 2020-06-16 A data aggregation system and aggregation method based on pluggable components under multi-system Pending CN111753007A (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN202010546939.3A CN111753007A (en) 2020-06-16 2020-06-16 A data aggregation system and aggregation method based on pluggable components under multi-system

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN202010546939.3A CN111753007A (en) 2020-06-16 2020-06-16 A data aggregation system and aggregation method based on pluggable components under multi-system

Publications (1)

Publication Number Publication Date
CN111753007A true CN111753007A (en) 2020-10-09

Family

ID=72675334

Family Applications (1)

Application Number Title Priority Date Filing Date
CN202010546939.3A Pending CN111753007A (en) 2020-06-16 2020-06-16 A data aggregation system and aggregation method based on pluggable components under multi-system

Country Status (1)

Country Link
CN (1) CN111753007A (en)

Cited By (2)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN113918549A (en) * 2021-09-29 2022-01-11 重庆富民银行股份有限公司 System and method for data conversion and protocol adaptation based on configuration mode
CN114089977A (en) * 2021-11-16 2022-02-25 云镝智慧科技有限公司 Receipt docking method and device for business system and computer equipment

Citations (10)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
WO2007109047A2 (en) * 2006-03-18 2007-09-27 Peter Lankford Content-aware routing of subscriptions for streaming and static data
WO2013075490A1 (en) * 2011-11-25 2013-05-30 中兴通讯股份有限公司 Method for implementing terminal adaptation processing, protocol adaptation module and terminal
CN104333485A (en) * 2014-10-31 2015-02-04 北京思特奇信息技术股份有限公司 Business data acquisition and analysis method and system based on interchanger total quantity
CN107071006A (en) * 2017-03-27 2017-08-18 广州数字方舟信息技术股份有限公司 A kind of acquisition system and acquisition method towards navigation channel big data
US20180253342A1 (en) * 2017-03-03 2018-09-06 International Business Machines Corporation Discovery and exposure of transactional middleware server-based applications as consumable service endpoints
CN108549671A (en) * 2018-03-28 2018-09-18 微梦创科网络科技(中国)有限公司 Real time data acquisition and visual implementation method and device
CN109104487A (en) * 2018-08-20 2018-12-28 浪潮软件股份有限公司 Data transmission method based on logstack + kafka
CN109918148A (en) * 2019-02-21 2019-06-21 上海伊巢网络科技有限公司 The component dynamic pluggable system of Internet of things middleware
CN110083436A (en) * 2019-05-14 2019-08-02 上海理想信息产业(集团)有限公司 A kind of business datum real-time monitoring system and method based on Java bytecode enhancing technology
CN110351315A (en) * 2018-04-03 2019-10-18 中兴通讯股份有限公司 Method, system and storage medium, the electronic device of data processing

Patent Citations (10)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
WO2007109047A2 (en) * 2006-03-18 2007-09-27 Peter Lankford Content-aware routing of subscriptions for streaming and static data
WO2013075490A1 (en) * 2011-11-25 2013-05-30 中兴通讯股份有限公司 Method for implementing terminal adaptation processing, protocol adaptation module and terminal
CN104333485A (en) * 2014-10-31 2015-02-04 北京思特奇信息技术股份有限公司 Business data acquisition and analysis method and system based on interchanger total quantity
US20180253342A1 (en) * 2017-03-03 2018-09-06 International Business Machines Corporation Discovery and exposure of transactional middleware server-based applications as consumable service endpoints
CN107071006A (en) * 2017-03-27 2017-08-18 广州数字方舟信息技术股份有限公司 A kind of acquisition system and acquisition method towards navigation channel big data
CN108549671A (en) * 2018-03-28 2018-09-18 微梦创科网络科技(中国)有限公司 Real time data acquisition and visual implementation method and device
CN110351315A (en) * 2018-04-03 2019-10-18 中兴通讯股份有限公司 Method, system and storage medium, the electronic device of data processing
CN109104487A (en) * 2018-08-20 2018-12-28 浪潮软件股份有限公司 Data transmission method based on logstack + kafka
CN109918148A (en) * 2019-02-21 2019-06-21 上海伊巢网络科技有限公司 The component dynamic pluggable system of Internet of things middleware
CN110083436A (en) * 2019-05-14 2019-08-02 上海理想信息产业(集团)有限公司 A kind of business datum real-time monitoring system and method based on Java bytecode enhancing technology

Cited By (2)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN113918549A (en) * 2021-09-29 2022-01-11 重庆富民银行股份有限公司 System and method for data conversion and protocol adaptation based on configuration mode
CN114089977A (en) * 2021-11-16 2022-02-25 云镝智慧科技有限公司 Receipt docking method and device for business system and computer equipment

Similar Documents

Publication Publication Date Title
CN106534257B (en) A multi-source security log collection system and method with a multi-level cluster architecture
CN111737329A (en) Unified data acquisition platform for rail transit
CN102739452A (en) Method and system for monitoring resources
CN105119752A (en) Distributed log acquisition method, device and system
CN104410662A (en) Parallel mass data transmitting middleware of Internet of things and working method thereof
CN103647685B (en) A kind of test result information uploads receiving handling method
CN106506519A (en) System and method for cross-platform communication of WCF framework net.tcp protocol
CN113703997A (en) Bidirectional asynchronous communication middleware system integrating multiple message agents and implementation method
CN115686540A (en) RPA control method and system based on Hongmeng system
CN111753007A (en) A data aggregation system and aggregation method based on pluggable components under multi-system
CN111343239B (en) Communication request processing method, communication request processing device and transaction system
CN111190875A (en) Log aggregation method and device based on container platform
WO2025118967A1 (en) Big data access method based on protocol adaptation and scheduling in seismological industry
CN103544060A (en) WEBSERVICE based service dispatching system and method
CN116244141A (en) A method and system for real-time monitoring of microservice network infrastructure
CN115938397A (en) Speech recognition method, device, equipment and storage medium
CN113971200B (en) A map service traffic recording system and method for a cloud native platform
CN113297148B (en) Method, device and equipment for collecting service log data and readable storage medium
CN104217314B (en) Routing iinformation grasping means and device
CN118642872B (en) Multi-source fusion OPC data acquisition system and method based on track system
CN114003410A (en) Message-based flink real-time service scheduling configuration method and system
CN109413185A (en) A kind of equipment routing inspection system and its Cloud Server design method
CN108809725A (en) A kind of collection method and device of journal file
CN112131070A (en) Call relationship tracking method, apparatus, device, and computer-readable storage medium
CN118474183A (en) Data transmission method, CUC architecture, storage medium and program product based on data analysis

Legal Events

Date Code Title Description
PB01 Publication
PB01 Publication
SE01 Entry into force of request for substantive examination
SE01 Entry into force of request for substantive examination
RJ01 Rejection of invention patent application after publication

Application publication date: 20201009

RJ01 Rejection of invention patent application after publication