[go: up one dir, main page]

CN112667614A - A data processing method, device and computer equipment - Google Patents

A data processing method, device and computer equipment Download PDF

Info

Publication number
CN112667614A
CN112667614A CN202011563032.4A CN202011563032A CN112667614A CN 112667614 A CN112667614 A CN 112667614A CN 202011563032 A CN202011563032 A CN 202011563032A CN 112667614 A CN112667614 A CN 112667614A
Authority
CN
China
Prior art keywords
data
processing
processed
queue
message queue
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Pending
Application number
CN202011563032.4A
Other languages
Chinese (zh)
Inventor
唐杰
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Volkswagen Mobvoi Beijing Information Technology Co Ltd
Original Assignee
Volkswagen Mobvoi Beijing Information Technology Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Volkswagen Mobvoi Beijing Information Technology Co Ltd filed Critical Volkswagen Mobvoi Beijing Information Technology Co Ltd
Priority to CN202011563032.4A priority Critical patent/CN112667614A/en
Publication of CN112667614A publication Critical patent/CN112667614A/en
Pending legal-status Critical Current

Links

Images

Landscapes

  • Information Retrieval, Db Structures And Fs Structures Therefor (AREA)

Abstract

The embodiment of the invention discloses a data processing method, a data processing device and computer equipment. The method comprises the following steps: acquiring data to be processed, and adding the data to be processed to a first data message queue; processing each data to be processed in the first data message queue by adopting a streaming data processing mode based on a Flink real-time processing frame to obtain queue processing data; adding the queue processing data to a second data message queue; and performing real-time data processing on each queue processing data in the second data message queue based on the Flink real-time processing framework. The technical scheme can ensure the consistency and the real-time performance of the data processing process.

Description

一种数据处理方法、装置及计算机设备A data processing method, device and computer equipment

技术领域technical field

本发明实施例涉及数据处理技术领域,尤其涉及一种数据处理方法、装置及计算机设备。Embodiments of the present invention relate to the technical field of data processing, and in particular, to a data processing method, apparatus, and computer equipment.

背景技术Background technique

随着互联网的快速发展,存在越来越多的多元数据,而这些数据往往都具有实时性。在对大数据进行处理时,需要依托于分布式处理或分布式数据库等技术,而在数据处理过程中保证数据的一致性和实时性始终是数据处理的重要议题。With the rapid development of the Internet, there are more and more diverse data, and these data are often real-time. When processing big data, it is necessary to rely on technologies such as distributed processing or distributed database, and ensuring data consistency and real-time in the process of data processing is always an important issue in data processing.

目前,在数据处理领域,一般存在批量计算和实时流计算两种任务类型。Flink是一个同时面向分布式实时流处理和批量数据处理的开源数据平台,它能在基于同一个Flink实时处理框架运行时,提供支持流处理和批处理两种类型任务的功能。在实时处理系统中确保数据一致性时,通常是需要对数据进行幂等性写入操作或事务性写入操作。其中,幂等性写入操作在任意多次向一个系统写入数据时,只对目标系统产生一次结果影响,但这种操作要求数据具有幂等性。事务性写入操作结合了Flink的一致性检查点Checkpoint机制来保证只对外部输出产生一次影响,但只有经过Checkpoint确认的数据才可以向外部写入,由于Checkpoint之间存在一定的时间间隔,会降低数据的实时性。因此,如何基于Flink实时处理框架使数据在处理过程中保持一致性和实时性,是亟待解决的问题。At present, in the field of data processing, there are generally two types of tasks: batch computing and real-time stream computing. Flink is an open source data platform for both distributed real-time stream processing and batch data processing. It can support both stream processing and batch processing when running on the same Flink real-time processing framework. When ensuring data consistency in a real-time processing system, it is usually necessary to perform idempotent write operations or transactional write operations on the data. Among them, the idempotent write operation only affects the target system once when data is written to a system any number of times, but this operation requires the data to be idempotent. The transactional write operation combines Flink's consistency checkpoint Checkpoint mechanism to ensure that it only affects the external output once, but only the data confirmed by the Checkpoint can be written to the outside. Reduce the real-time nature of data. Therefore, how to keep data consistent and real-time during processing based on the Flink real-time processing framework is an urgent problem to be solved.

发明内容SUMMARY OF THE INVENTION

本发明实施例提供一种数据处理方法、装置及计算机设备,以保证数据处理过程的一致性和实时性。Embodiments of the present invention provide a data processing method, apparatus, and computer equipment to ensure the consistency and real-time performance of the data processing process.

第一方面,本发明实施例提供了一种数据处理方法,包括:In a first aspect, an embodiment of the present invention provides a data processing method, including:

获取待处理数据,将所述待处理数据添加至第一数据消息队列;obtaining data to be processed, and adding the data to be processed to the first data message queue;

基于Flink实时处理框架采用流式数据处理方式处理所述第一数据消息队列中的各所述待处理数据,得到队列处理数据;Based on the Flink real-time processing framework, the data to be processed in the first data message queue is processed by a streaming data processing method, and the queue processing data is obtained;

将所述队列处理数据添加至第二数据消息队列;adding the queue processing data to the second data message queue;

基于所述Flink实时处理框架对所述第二数据消息队列中的各所述队列处理数据进行实时数据处理。Real-time data processing is performed on each of the queue processing data in the second data message queue based on the Flink real-time processing framework.

第二方面,本发明实施例还提供了一种数据处理装置,包括:In a second aspect, an embodiment of the present invention further provides a data processing device, including:

第一数据消息队列生成模块,设置为获取待处理数据,将所述待处理数据添加至第一数据消息队列;a first data message queue generation module, configured to obtain the data to be processed, and add the to-be-processed data to the first data message queue;

队列处理数据生成模块,设置为基于Flink实时处理框架采用流式数据处理方式处理所述第一数据消息队列中的各所述待处理数据,得到队列处理数据;The queue processing data generation module is configured to process each of the to-be-processed data in the first data message queue based on the Flink real-time processing framework using a streaming data processing method to obtain queue processing data;

第二数据消息队列生成模块,设置为将所述队列处理数据添加至第二数据消息队列;The second data message queue generation module is configured to add the queue processing data to the second data message queue;

实时数据处理模块,设置为基于所述Flink实时处理框架对所述第二数据消息队列中的各所述队列处理数据进行实时数据处理。The real-time data processing module is configured to perform real-time data processing on each of the queue processing data in the second data message queue based on the Flink real-time processing framework.

第三方面,本发明实施例还提供了一种计算机设备,包括存储器、处理器及存储在存储器上并可在处理器上运行的计算机程序,所述处理器执行所述程序时实现如本发明任意实施例所述的数据处理方法。In a third aspect, an embodiment of the present invention further provides a computer device, including a memory, a processor, and a computer program stored in the memory and running on the processor, the processor implementing the program as described in the present invention when the processor executes the program The data processing method described in any embodiment.

第四方面,本发明实施例还提供了一种计算机可读存储介质,其上存储有计算机程序,该程序被处理器执行时实现如本发明任意实施例所述的数据处理方法。In a fourth aspect, an embodiment of the present invention further provides a computer-readable storage medium on which a computer program is stored, and when the program is executed by a processor, implements the data processing method according to any embodiment of the present invention.

本发明实施例提供的技术方案中,在将获取到的待处理数据添加至第一数据消息队列后,基于Flink实时处理框架并采用流式数据处理方式对第一数据消息队列中的各待处理数据进行处理,得到队列处理数据,然后将队列处理数据添加至第二数据消息队列,从而可以基于Flink实时处理框架对第二数据消息队列中的各队列处理数据进行实时数据处理,实现了基于Flink实时处理框架并采用两个数据消息队列对数据进行实时处理,对数据无特殊要求,解决了现有数据处理过程难以有效保证一致性和实时性的问题,保证了数据处理过程的一致性和实时性。In the technical solution provided by the embodiment of the present invention, after the acquired data to be processed is added to the first data message queue, each to-be-processed data in the first data message queue is processed based on the Flink real-time processing framework and the streaming data processing method is adopted. The data is processed to obtain the queue processing data, and then the queue processing data is added to the second data message queue, so that real-time data processing can be performed on the queue processing data in the second data message queue based on the Flink real-time processing framework. The real-time processing framework uses two data message queues to process data in real time, with no special requirements for data, which solves the problem that the existing data processing process is difficult to effectively ensure consistency and real-time performance, and ensures the consistency and real-time performance of the data processing process. sex.

附图说明Description of drawings

图1是本发明实施例一中的一种数据处理方法的流程示意图;1 is a schematic flowchart of a data processing method in Embodiment 1 of the present invention;

图2是本发明实施例二中的一种数据处理方法的流程示意图;2 is a schematic flowchart of a data processing method in Embodiment 2 of the present invention;

图3是本发明实施例二中的一种数据断点续传的流程示意图;3 is a schematic flowchart of a data breakpoint resuming transmission in Embodiment 2 of the present invention;

图4是本发明实施例二中的一种数据清洗及处理的流程示意图;4 is a schematic flowchart of a data cleaning and processing in Embodiment 2 of the present invention;

图5是本发明实施例三中的一种数据处理装置的结构示意图;5 is a schematic structural diagram of a data processing apparatus in Embodiment 3 of the present invention;

图6是本发明实施例四中的一种计算机设备的硬件结构示意图。FIG. 6 is a schematic diagram of a hardware structure of a computer device in Embodiment 4 of the present invention.

具体实施方式Detailed ways

下面结合附图和实施例对本发明作进一步的详细说明。可以理解的是,此处所描述的具体实施例仅仅用于解释本发明,而非对本发明的限定。另外还需要说明的是,为了便于描述,附图中仅示出了与本发明相关的部分而非全部结构。The present invention will be further described in detail below in conjunction with the accompanying drawings and embodiments. It should be understood that the specific embodiments described herein are only used to explain the present invention, but not to limit the present invention. In addition, it should be noted that, for the convenience of description, the drawings only show some but not all structures related to the present invention.

在更加详细地讨论示例性实施例之前应当提到的是,一些示例性实施例被描述成作为流程图描绘的处理或方法。虽然流程图将各项操作(或步骤)描述成顺序的处理,但是其中的许多操作可以被并行地、并发地或者同时实施。此外,各项操作的顺序可以被重新安排。当其操作完成时所述处理可以被终止,但是还可以具有未包括在附图中的附加步骤。所述处理可以对应于方法、函数、规程、子例程、子程序等等。Before discussing the exemplary embodiments in greater detail, it should be mentioned that some exemplary embodiments are described as processes or methods depicted as flowcharts. Although a flowchart depicts various operations (or steps) as a sequential process, many of the operations may be performed in parallel, concurrently, or concurrently. Additionally, the order of operations can be rearranged. The process may be terminated when its operation is complete, but may also have additional steps not included in the figures. The processes may correspond to methods, functions, procedures, subroutines, subroutines, and the like.

实施例一Example 1

图1是本发明实施例一提供的一种数据处理方法的流程图,本发明实施例可适用于基于Flink实时处理框架对任意类型的数据进行处理以保证数据处理过程的一致性和实时性的情况,该方法可以由本发明实施例提供的数据处理装置来执行,该装置可采用软件和/或硬件的方式实现,并一般可集成在计算机设备中。1 is a flowchart of a data processing method provided in Embodiment 1 of the present invention. The embodiment of the present invention is applicable to processing any type of data based on the Flink real-time processing framework to ensure the consistency and real-time performance of the data processing process. In some cases, the method may be executed by the data processing apparatus provided in the embodiment of the present invention, and the apparatus may be implemented in the form of software and/or hardware, and may generally be integrated in computer equipment.

如图1所示,本实施例提供的数据处理方法,具体包括:As shown in Figure 1, the data processing method provided by this embodiment specifically includes:

S110、获取待处理数据,将待处理数据添加至第一数据消息队列。S110. Acquire data to be processed, and add the data to be processed to a first data message queue.

其中,待处理数据,可以是各种来源、各种类型及各种格式的日志数据,例如,埋点数据、log文件或外部数据等等。也即,本发明实施例中的待处理数据可以是任意类型的数据,不要求其必须满足幂等性。The data to be processed may be log data of various sources, types and formats, such as buried point data, log files, or external data, and so on. That is, the data to be processed in this embodiment of the present invention may be any type of data, and it is not required that it must satisfy idempotency.

在本发明实施例中,可以通过数据采集器采集数据的方式以获取待处理数据。其中,数据采集器可以应用任一种具有数据采集功能的技术,本实施例对此不做具体限定。可选的,可以通过应用Flume技术的数据采集器,对数量庞大的各种待处理数据集中进行采集,例如,通过应用Flume技术的数据采集器收集网站服务器中的日志数据,可以实现分布式的海量日志等数据的采集、聚合和传输,进而保证系统的稳定性和容错性。In this embodiment of the present invention, the data to be processed may be acquired by collecting data by a data collector. The data collector may apply any technology with a data collection function, which is not specifically limited in this embodiment. Optionally, a data collector using Flume technology can collect a large amount of various data to be processed in a centralized manner. The collection, aggregation and transmission of massive logs and other data to ensure the stability and fault tolerance of the system.

数据消息队列,指的是一个可以写入数据及从中读取数据的消息序列。可选的,第一数据消息队列可以是一个分布式的Kafka消息队列,将待处理数据写入Kafka消息队列中,在对数据进行处理时,可以从Kafka消息队列中读取数据,避免了获取待处理数据的速度与处理数据的速度不一致的问题,进一步保证了系统的稳定性。A data message queue is a sequence of messages that can write data and read data from it. Optionally, the first data message queue can be a distributed Kafka message queue, which writes the data to be processed into the Kafka message queue. When processing the data, the data can be read from the Kafka message queue to avoid obtaining The problem that the speed of data to be processed is inconsistent with the speed of processing data further ensures the stability of the system.

在获取待处理数据之后,将获取的待处理数据添加至第一数据消息队列,以用于后续对第一数据消息队列中的待处理数据进行处理。After acquiring the to-be-processed data, the acquired to-be-processed data is added to the first data message queue for subsequent processing of the to-be-processed data in the first data message queue.

S120、基于Flink实时处理框架采用流式数据处理方式处理第一数据消息队列中的各待处理数据,得到队列处理数据。S120. Based on the Flink real-time processing framework, the data to be processed in the first data message queue is processed in a streaming data processing manner to obtain queue processing data.

Flink实时处理框架,是一个可以同时面向分布式实时流处理和批量数据处理的开源数据平台,在运行同一个Flink实时处理框架时,可以同时提供支持流处理和批处理两种类型应用的功能。The Flink real-time processing framework is an open-source data platform that can simultaneously face distributed real-time stream processing and batch data processing. When running the same Flink real-time processing framework, it can simultaneously provide functions that support both stream processing and batch processing applications.

流式数据处理,指的是实时地处理一个或多个事件流,也就是说,每读取一条数据后就实时处理一条数据,保证了数据的实时性。Streaming data processing refers to processing one or more event streams in real time, that is, processing a piece of data in real time after each piece of data is read, ensuring the real-time nature of the data.

队列处理数据,指的是将第一数据消息队列中的各待处理数据基于Flink实时处理框架,并采用流式数据处理方式处理后得到的数据。Queue processing data refers to the data obtained by processing the data to be processed in the first data message queue based on the Flink real-time processing framework and using the streaming data processing method.

基于Flink实时处理框架并采用流式数据处理方式处理第一数据消息队列中的各待处理数据,从而可以得到队列处理数据,保证了数据的一致性和实时性。其中,在对各待处理数据进行处理时,可以调用外部服务辅助数据的处理。例如,在进行数据的清洗处理时,需要将获取到的待处理数据中的经纬度转化为具体位置,此时可以调用外部服务(如某一电子地图),以在地图中准确定位与待处理数据中的经纬度所对应的具体位置。所谓清洗处理也即将待处理数据转化为具有指向性的数据的过程,例如,经纬度数据经过清洗处理后,可以转化为具体的位置信息。Based on the Flink real-time processing framework and adopting the streaming data processing method to process the data to be processed in the first data message queue, the queue processing data can be obtained, which ensures the consistency and real-time performance of the data. Wherein, when each data to be processed is processed, an external service can be invoked to assist in the processing of the data. For example, when cleaning data, it is necessary to convert the latitude and longitude in the acquired data to be processed into a specific location. At this time, an external service (such as an electronic map) can be called to accurately locate and process the data in the map. The specific location corresponding to the latitude and longitude in . The so-called cleaning process is also the process of converting the data to be processed into directional data. For example, after the latitude and longitude data is cleaned and processed, it can be converted into specific location information.

可选的,在基于Flink实时处理框架处理第一数据消息队列中的各待处理数据之前,还可以包括:读取第一数据消息队列中的各待处理数据;将各待处理数据存储至分布式文件系统。Optionally, before processing each to-be-processed data in the first data message queue based on the Flink real-time processing framework, it may further include: reading each to-be-processed data in the first data message queue; storing each to-be-processed data in a distributed format file system.

分布式文件系统,指的是一个用于存储及管理数据的文件系统,其存储的数据可以通过计算机网络进行节点间的通信和数据传输,其中节点可以分布在数据通信链路中的任意位置。将各待处理数据存储至分布式文件系统,可以将待处理数据扩展至整个网络,从而增强了系统的可扩展性、稳定性及执行效率。Distributed file system refers to a file system for storing and managing data. The data stored in it can communicate and transmit data between nodes through a computer network, where nodes can be distributed at any position in the data communication link. Storing each data to be processed in a distributed file system can extend the data to be processed to the entire network, thereby enhancing the scalability, stability and execution efficiency of the system.

在基于Flink实时处理框架处理第一数据消息队列中的各待处理数据之前,读取第一数据消息队列中的各待处理数据,将各待处理数据存储至分布式文件系统,可以保留获取到的未经过任何方式处理的原始数据,确保了数据的原始性和完整性,并且可以定期将保留的原始数据与经过Flink实时处理框架处理的实时流数据和批量数据进行比较,以验证数据处理结果的准确性。Before processing the data to be processed in the first data message queue based on the Flink real-time processing framework, read the data to be processed in the first data message queue, store the data to be processed in the distributed file system, and retain the obtained data. The raw data that has not been processed in any way ensures the originality and integrity of the data, and the retained raw data can be regularly compared with the real-time streaming data and batch data processed by the Flink real-time processing framework to verify the data processing results. accuracy.

S130、将队列处理数据添加至第二数据消息队列。S130. Add the queue processing data to the second data message queue.

可选的,第二数据消息队列可以与第一数据消息队列的类型一样,也是一个分布式的Kafka消息队列。将基于Flink实时处理框架并采用流式数据处理方式处理后得到的队列处理数据添加至第二数据消息队列,以用于后续进行其他应用处理,如实时流数据分析计算、批量数据分析计算等。此时,第二数据消息队列中的数据是将获取到的待处理数据进行处理后得到的数据。Optionally, the second data message queue may be of the same type as the first data message queue, and is also a distributed Kafka message queue. The queue processing data obtained based on the Flink real-time processing framework and processed by the streaming data processing method is added to the second data message queue for subsequent application processing, such as real-time streaming data analysis and calculation, batch data analysis and calculation, etc. At this time, the data in the second data message queue is the data obtained by processing the obtained data to be processed.

S140、基于Flink实时处理框架对第二数据消息队列中的各队列处理数据进行实时数据处理。S140. Perform real-time data processing on each queue processing data in the second data message queue based on the Flink real-time processing framework.

实时数据处理,指的是对需要实时特性的数据或数据量进行分析、计算等处理,例如,统计当前时刻的网站独立访客(Unique Visitor,UV)数据量、统计当前时刻页面浏览量(Page View,PV)、统计当前访问总量等实时性强的指标。Real-time data processing refers to the analysis and calculation of data or data volume that requires real-time characteristics, for example, statistics of the unique visitor (UV) data volume of the website at the current moment, and statistics of the page views (Page View) at the current moment. , PV), statistics of the total number of current visits and other real-time indicators.

在将队列处理数据添加至第二数据消息队列后,基于Flink实时处理框架对第二数据消息队列中的各队列处理数据进行实时数据处理,可以得到相应的实时处理结果。After the queue processing data is added to the second data message queue, real-time data processing is performed on each queue processing data in the second data message queue based on the Flink real-time processing framework, and corresponding real-time processing results can be obtained.

在数据处理过程中,采用了第一数据消息队列和第二数据消息队列两个消息队列,既避免了获取待处理数据的速度与处理数据的速度不一致的问题,也可以在系统出现故障时,保证系统对每条数据只处理一次,不会出现数据被重复处理或多次处理的情况,使一条数据只能影响一次处理后的最终结果,保证了数据的一致性。In the process of data processing, two message queues, the first data message queue and the second data message queue, are used, which not only avoids the problem of inconsistency between the speed of acquiring the data to be processed and the speed of processing the data, but also enables the It is ensured that the system processes each piece of data only once, and the data will not be repeatedly processed or processed multiple times, so that a piece of data can only affect the final result after one processing, ensuring data consistency.

作为一种可选的实施方式,在对第二数据消息队列中的各队列处理数据进行实时数据处理之后,还可以包括:As an optional implementation manner, after performing real-time data processing on the processing data of each queue in the second data message queue, it may further include:

从第二数据消息队列中读取各队列处理数据,并存储至静态数据库;静态数据库用于对各队列处理数据进行批量数据处理;Read the processing data of each queue from the second data message queue and store it in a static database; the static database is used to perform batch data processing on the processing data of each queue;

将实时数据处理结果和批量数据处理结果存储到结果数据库;或Store real-time data processing results and batch data processing results in a results database; or

从第二数据消息队列中读取各队列处理数据,并通过预设接口发送至目标应用程序。The processing data of each queue is read from the second data message queue, and sent to the target application through a preset interface.

静态数据库,指的是可以存储离线数据等其他非实时性数据的数据库。可选的,静态数据库可以是一个Hive数仓,它可以用于对数据进行处理、查询及分析等操作。由于数据消息队列所能承载的数据量有限,如果当前数据消息队列中的数据量已经达到最大值,那么再写入数据时,就会自动删除当前数据消息队列中最先存在的数据,无法保留完整的数据,进而在对各队列处理数据进行的批量数据处理时,由于数据缺失,降低了批量数据处理的准确性,因此将第二数据消息队列中的各队列处理数据存储至静态数据库,可以保留第二数据消息队列中的各队列处理数据的完整性,实现各队列处理数据的批量数据处理,并提高批量数据处理的准确性,也可用于日后其他的数据分析计算中。A static database refers to a database that can store offline data and other non-real-time data. Optionally, the static database can be a Hive data warehouse, which can be used to process, query, and analyze data. Due to the limited amount of data that the data message queue can carry, if the amount of data in the current data message queue has reached the maximum value, when data is written again, the first existing data in the current data message queue will be automatically deleted and cannot be retained. Complete data, and further in the batch data processing of each queue processing data, due to the lack of data, the accuracy of batch data processing is reduced, so storing each queue processing data in the second data message queue to the static database can be The integrity of the processing data of each queue in the second data message queue is preserved, the batch data processing of the processing data of each queue is realized, the accuracy of batch data processing is improved, and it can also be used for other data analysis and calculation in the future.

批量数据处理,指的是针对一段时间范围内的数据或数据量进行分析、计算等处理,例如,可以基于静态数据库中存储的第二数据消息队列中的各队列处理数据,统计具体一段时间内网页的总浏览量、统计近一个月内每天的交易总量等批量数据。Batch data processing refers to analyzing, calculating, and other processing of data or data volume within a certain period of time. For example, data can be processed based on each queue in the second data message queue stored in the static database, and statistics can be made within a specific period of time. Batch data such as the total page views of the webpage and the total daily transaction volume in the past month.

结果数据库,指的是可以存储实时数据处理结果和批量数据处理结果,以将实时数据处理结果和批量数据处理结果作为缓存的数据库。例如,结果数据库可以是mysql数据库等。当需要再次访问某个之前处理得到的实时数据处理结果或批量数据处理结果时,可以直接在结果数据库中读取相应的处理结果,无需重新进行实时数据处理或批量数据处理,使数据结果具有可重复使用性,减少了数据的重复处理过程,提高了数据处理效率。The result database refers to a database that can store real-time data processing results and batch data processing results so as to use the real-time data processing results and batch data processing results as caches. For example, the result database can be a mysql database, etc. When it is necessary to re-access a real-time data processing result or batch data processing result obtained from previous processing, the corresponding processing result can be directly read in the result database, without re-processing real-time data processing or batch data processing, so that the data result has a reproducible value. Reusability reduces the repeated processing of data and improves the efficiency of data processing.

在对第二数据消息队列中的各队列处理数据进行实时数据处理之后,还可以从第二数据消息队列中读取各队列处理数据,并将各队列处理数据存储至可用于对各队列处理数据进行批量数据处理的静态数据库,进而可以将实时数据处理结果和批量数据处理结果作为缓存存储到结果数据库,以便于在后续数据处理过程中可以直接读取实时数据处理结果和批量数据处理结果,避免了数据的重复计算,充分地应用了模块化思想,从而实现了将Flink实时处理框架中的实时流处理和批量数据处理两种功能解耦合。After the real-time data processing is performed on the processing data of each queue in the second data message queue, the processing data of each queue can also be read from the second data message queue, and the processing data of each queue can be stored to the data that can be used for processing each queue. A static database for batch data processing, so that the real-time data processing results and batch data processing results can be stored in the result database as a cache, so that the real-time data processing results and batch data processing results can be directly read in the subsequent data processing process, avoiding It realizes the decoupling of the two functions of real-time stream processing and batch data processing in the Flink real-time processing framework.

预设接口,指的是预设的可以将第二数据消息队列中的各队列处理数据传输到第三方应用程序的接口。其中,预设接口可以是应用程序编程接口(ApplicationProgramming Interface,API)。当某个第三方应用程序需要获取第二数据消息队列中的各队列处理数据时,可以通过预设的接口第二数据消息队列中的各队列处理数据。The preset interface refers to a preset interface that can transmit the processing data of each queue in the second data message queue to a third-party application program. The preset interface may be an application programming interface (Application Programming Interface, API). When a third-party application needs to obtain processing data of each queue in the second data message queue, it can process data through each queue in the second data message queue through a preset interface.

目标应用程序,指的是需要获取第二数据消息队列中的各队列处理数据的第三方应用程序,例如,若某个地图APP需要获取处理后的实时数据,则该地图APP即为目标应用程序。The target application refers to a third-party application that needs to obtain the processing data of each queue in the second data message queue. For example, if a map APP needs to obtain processed real-time data, the map APP is the target application .

在对第二数据消息队列中的各队列处理数据进行实时数据处理之后,还可以从第二数据消息队列中读取各队列处理数据,并通过预设接口发送至目标应用程序,实现了Flink实时处理框架的架构扩展,满足了不同应用程序所对应的数据需求,提高了架构的灵活性。After real-time data processing is performed on the processing data of each queue in the second data message queue, the processing data of each queue can also be read from the second data message queue and sent to the target application through a preset interface, realizing Flink real-time data processing. The architecture extension of the processing framework satisfies the data requirements corresponding to different applications and improves the flexibility of the architecture.

本发明实施例提供的技术方案,在将获取到的待处理数据添加至第一数据消息队列后,基于Flink实时处理框架并采用流式数据处理方式对第一数据消息队列中的各待处理数据进行处理,得到队列处理数据,然后将队列处理数据添加至第二数据消息队列,从而可以基于Flink实时处理框架对第二数据消息队列中的各队列处理数据进行实时数据处理,实现了基于Flink实时处理框架并采用两个数据消息队列对数据进行实时处理,对数据无特殊要求,解决了现有数据处理过程难以有效保证一致性和实时性的问题,保证了数据处理过程的一致性和实时性。According to the technical solution provided by the embodiment of the present invention, after the acquired data to be processed is added to the first data message queue, each data to be processed in the first data message queue is processed based on the Flink real-time processing framework and the streaming data processing method is adopted. Perform processing to obtain the queue processing data, and then add the queue processing data to the second data message queue, so that real-time data processing can be performed on the queue processing data in the second data message queue based on the Flink real-time processing framework. Real-time data processing based on Flink The processing framework uses two data message queues to process data in real time, with no special requirements for data, which solves the problem that the existing data processing process is difficult to effectively ensure consistency and real-time performance, and ensures the consistency and real-time performance of the data processing process. .

实施例二Embodiment 2

图2是本发明实施例二提供的一种数据处理方法的流程图。本实施例在上述实施例的基础上进行具体化,其中,可以将待处理数据添加至第一数据消息队列,具体为:FIG. 2 is a flowchart of a data processing method according to Embodiment 2 of the present invention. This embodiment is embodied on the basis of the above-mentioned embodiment, wherein the data to be processed can be added to the first data message queue, specifically:

确定待处理数据对应的第一目标分区序号和第一目标分区位置;Determine the first target partition sequence number and the first target partition position corresponding to the data to be processed;

根据第一目标分区序号和第一目标分区位置将待处理数据添加至第一数据消息队列中的第一目标数据分区。The data to be processed is added to the first target data partition in the first data message queue according to the sequence number of the first target partition and the position of the first target partition.

进一步的,基于Flink实时处理框架采用流式数据处理方式处理第一数据消息队列中的各待处理数据,可以包括:Further, using the streaming data processing method based on the Flink real-time processing framework to process the data to be processed in the first data message queue may include:

确定当前数据分区对应的当前分区序号以及数据处理进度标识;Determine the current partition serial number and data processing progress identifier corresponding to the current data partition;

根据当前分区序号以及数据处理进度标识确定当前待处理数据;Determine the current data to be processed according to the current partition serial number and the data processing progress identifier;

基于Flink实时处理框架实时处理当前待处理数据。Based on the Flink real-time processing framework, the current data to be processed is processed in real time.

如图2所示,本实施例提供的一种数据处理方法,具体包括:As shown in Figure 2, a data processing method provided by this embodiment specifically includes:

S210、获取待处理数据,确定待处理数据对应的第一目标分区序号和第一目标分区位置。S210: Acquire the data to be processed, and determine the sequence number of the first target partition and the position of the first target partition corresponding to the data to be processed.

S220、根据第一目标分区序号和第一目标分区位置将待处理数据添加至第一数据消息队列中的第一目标数据分区。S220. Add the data to be processed to the first target data partition in the first data message queue according to the sequence number of the first target partition and the position of the first target partition.

目标分区序号,指的是将所有的数据分发至不同的数据分区,每个数据分区所对应的序号,例如,分区0(job0)、分区1(job1)等。在处理大数据时,由于大数据属于分布式数据,因此可以通过数据分区的方式,将数据集分发至不同的数据分区,从而可以在多个数据分区同时处理或查询数据,提高了数据的管理及查询效率。其中,数据分区可以是将数据分发至同一磁盘的不同区,也可以是将数据分发至不同的磁盘或机器设备中,本发明对此不做具体限定。第一目标分区序号可以为当前的待处理数据在第一数据消息队列中所在第一目标数据分区的序号。第一目标数据分区也即当前的待处理数据添加到第一数据消息队列中对应的分区。The sequence number of the target partition refers to distributing all data to different data partitions, and the sequence number corresponding to each data partition, for example, partition 0 (job0), partition 1 (job1), and so on. When processing big data, since big data belongs to distributed data, data sets can be distributed to different data partitions by data partitioning, so that data can be processed or queried in multiple data partitions at the same time, which improves data management. and query efficiency. The data partition may be to distribute data to different areas of the same disk, or to distribute data to different disks or machines, which is not specifically limited in the present invention. The sequence number of the first target partition may be the sequence number of the first target data partition where the current data to be processed is located in the first data message queue. The first target data partition, that is, the current data to be processed is added to the corresponding partition in the first data message queue.

目标分区位置,指的是每条数据在对应序号的目标数据分区中的具体存放位置。可以将数据放至对应目标数据分区的具体目标分区位置处,也可以根据目标分区位置查询对应的数据。根据数据对应的目标分区位置,可以确定数据在数据消息队列中的目标数据分区内的具体位置。其中,目标分区位置可以通过偏移量(offset)表示。第一目标分区位置可以为当前的待处理数据在第一数据消息队列中第一目标数据分区中的具体队列位置。The target partition location refers to the specific storage location of each piece of data in the target data partition corresponding to the serial number. The data can be placed at a specific target partition location corresponding to the target data partition, or the corresponding data can be queried according to the target partition location. According to the location of the target partition corresponding to the data, the specific location of the data in the target data partition in the data message queue can be determined. The target partition position may be represented by an offset (offset). The first target partition location may be a specific queue location of the current data to be processed in the first target data partition in the first data message queue.

在获取待处理数据之后,需要确定待处理数据对应的第一目标分区序号和第一目标分区位置,然后根据第一目标分区序号和第一目标分区位置将待处理数据添加至第一数据消息队列中的第一目标数据分区。After acquiring the data to be processed, it is necessary to determine the first target partition serial number and the first target partition position corresponding to the to-be-processed data, and then add the to-be-processed data to the first data message queue according to the first target partition serial number and the first target partition position The first target data partition in .

S230、基于Flink实时处理框架采用流式数据处理方式处理第一数据消息队列中的各待处理数据。S230 , processing each data to be processed in the first data message queue by adopting a streaming data processing method based on the Flink real-time processing framework.

值得指出的是,在数据处理中,系统可能经常受到各类意外因素的影响而发生故障,比如流量激增、网络抖动、云服务资源分配出现问题、系统压力过大导致宕机或崩溃等。在这种情况下,Flink实时处理框架会重启作业,为了保证数据处理过程的一致性,即成功处理故障并恢复之后得到的数据处理结果与没有发生任何故障得到的处理结果相比,前者的数据处理结果具有正确性,也就是说,系统故障的发生不会对得到的数据处理结果产生影响,通常是采用Flink的Checkpoint(一致性检查点)机制来保证只对外部输出产生一次影响,但Checkpoint之间存在一定的时间间隔,会降低数据的实时性。因此,在Flink实时处理框架正常启动处理数据或系统出现故障重启Flink实时处理框架时,为了同时保证数据处理过程的一致性和实时性,并实现数据的断点续传,本发明实施例采用了一种闭环方式进行数据的处理。It is worth pointing out that in data processing, the system may often fail due to various unexpected factors, such as traffic surges, network jitters, problems with cloud service resource allocation, and system pressures that lead to downtime or crashes. In this case, the Flink real-time processing framework will restart the job. In order to ensure the consistency of the data processing process, that is, the data processing results obtained after the failure is successfully handled and recovered are compared with the processing results obtained without any failure. The processing results are correct, that is to say, the occurrence of system failure will not affect the obtained data processing results. Usually, Flink's Checkpoint (consistency checkpoint) mechanism is used to ensure that only one impact on external output occurs, but Checkpoint There is a certain time interval between, which will reduce the real-time performance of the data. Therefore, when the Flink real-time processing framework normally starts to process data or restarts the Flink real-time processing framework due to a system failure, in order to ensure the consistency and real-time performance of the data processing process at the same time, and to realize the continuous transmission of data at breakpoints, the embodiment of the present invention adopts A closed-loop approach to data processing.

其中,S230可以具体包括如下S231-S233操作:Wherein, S230 may specifically include the following operations S231-S233:

S231、确定当前数据分区对应的当前分区序号以及第一数据处理进度标识。S231. Determine the current partition serial number and the first data processing progress identifier corresponding to the current data partition.

其中,当前数据分区可以为当前正在处理的待处理数据所在的数据分区。The current data partition may be the data partition where the to-be-processed data currently being processed is located.

数据处理进度标识,指的是可用于指示数据处理进度的标记。在本发明实施例中,第一数据处理标识可以通过第一数据消息队列中的末次分区位置表示,用于标识第一数据消息队列的数据处理进度。其中,第一数据消息队列中的末次分区位置是指在Flink实时处理框架正常启动前或系统出现故障重启Flink实时处理框架前,最后一个基于Flink实时处理框架完成实时处理的待处理数据在第一数据消息队列中的当前处理的数据分区内所对应的第一目标分区位置。Data processing progress identifier, which refers to a flag that can be used to indicate the progress of data processing. In this embodiment of the present invention, the first data processing identifier may be represented by the position of the last partition in the first data message queue, and is used to identify the data processing progress of the first data message queue. The position of the last partition in the first data message queue refers to the last data to be processed based on the Flink real-time processing framework to complete real-time processing before the Flink real-time processing framework starts normally or before the system fails to restart the Flink real-time processing framework. The position of the first target partition corresponding to the currently processed data partition in the data message queue.

确定当前数据分区对应的当前分区序号以及数据处理进度标识,以用于确定当前待处理数据。The current partition serial number and the data processing progress identifier corresponding to the current data partition are determined, so as to be used for determining the current data to be processed.

S232、根据当前分区序号以及第一数据处理进度标识确定当前待处理数据。S232: Determine the current data to be processed according to the current partition serial number and the first data processing progress identifier.

根据当前分区序号以及第一数据处理进度标识,在第一数据消息队列中确定当前处理的数据分区内的当前待处理数据,以基于Flink实时处理框架对当前待处理数据进行实时处理。According to the current partition serial number and the first data processing progress identifier, the current pending data in the currently processed data partition is determined in the first data message queue, so as to perform real-time processing on the current pending data based on the Flink real-time processing framework.

S233、基于Flink实时处理框架实时处理当前待处理数据,得到队列处理数据。S233 , process the current data to be processed in real time based on the Flink real-time processing framework, and obtain queue processing data.

基于Flink实时处理框架实时处理第一数据消息队列中的当前待处理数据,得到队列处理数据,其中,可以采用流式数据处理方式处理当前待处理数据。Based on the Flink real-time processing framework, the current to-be-processed data in the first data message queue is processed in real time to obtain queue-processed data, in which the current to-be-processed data can be processed by using a stream data processing method.

可选的,基于Flink实时处理框架实时处理当前待处理数据,可以包括:基于Flink实时处理框架对当前待处理数据实时转化为目标指向性数据。Optionally, processing the current data to be processed in real time based on the Flink real-time processing framework may include: converting the current data to be processed in real time into target directional data based on the Flink real-time processing framework.

目标指向性数据,指的是针对人而言,可以直观看懂的包含某种信息的数据,例如,具体位置或某种产品的信息等。Target-oriented data refers to data containing certain information that can be intuitively understood by humans, for example, information about a specific location or a certain product.

示例性的,基于Flink实时处理框架实时处理当前待处理数据时,可以对当前待处理数据进行清洗,以将当前待处理数据实时转化为目标指向性数据。例如,当前待处理数据为经纬度,基于Flink实时处理框架实时处理后,可以将经纬度转化为人可以直观看出的具体位置信息;再例如,当前待处理数据为某个产品的ID,基于Flink实时处理框架实时处理后,可以将产品的ID转化为具体的商品信息。基于Flink实时处理框架对当前待处理数据实时转化为目标指向性数据,完成了对当前待处理数据的清洗处理。Exemplarily, when the current to-be-processed data is processed in real time based on the Flink real-time processing framework, the current to-be-processed data may be cleaned to convert the current to-be-processed data into target directional data in real time. For example, the current data to be processed is the latitude and longitude. After real-time processing based on the Flink real-time processing framework, the longitude and latitude can be converted into specific location information that can be intuitively seen by people; for another example, the current data to be processed is the ID of a product, which is processed in real time based on Flink. After the framework is processed in real time, the product ID can be converted into specific commodity information. Based on the Flink real-time processing framework, the current data to be processed is converted into target-oriented data in real time, and the cleaning of the current data to be processed is completed.

S240、将队列处理数据添加至第二数据消息队列。S240. Add the queue processing data to the second data message queue.

将基于Flink实时处理框架并采用流式数据处理方式处理后得到的队列处理数据添加至第二数据消息队列。The queue processing data obtained after processing based on the Flink real-time processing framework and using the streaming data processing method is added to the second data message queue.

可选的,将队列处理数据添加至第二数据消息队列,可以包括:确定队列处理数据对应的第二目标分区序号和第二数据处理进度标识;根据第二目标分区序号和第二数据处理进度标识确定队列处理数据在第二目标分区序号中的第二目标分区位置;根据第二目标分区序号和第二目标分区位置将队列处理数据添加至第二数据消息队列中的第二目标数据分区。Optionally, adding the queue processing data to the second data message queue may include: determining the second target partition sequence number and the second data processing progress identifier corresponding to the queue processing data; according to the second target partition serial number and the second data processing progress The identification determines the second target partition position of the queue processing data in the second target partition sequence number; the queue processing data is added to the second target data partition in the second data message queue according to the second target partition sequence number and the second target partition position.

其中,第二目标分区序号可以为与处理后得到的队列处理数据对应的待处理数据在第二数据消息队列中所在第二目标数据分区的序号。第二目标分区位置可以为与处理后得到的队列处理数据对应的待处理数据在第二数据消息队列中第二目标数据分区中的具体队列位置。第二目标数据分区也即处理后得到的队列处理数据添加至第二数据消息队列中对应的分区。在将队列处理数据添加至第二数据消息队列时,可以首先确定队列处理数据对应的第二目标分区序号和第二数据处理进度标识,在本发明实施例中,第二数据处理标识可以通过第二数据消息队列中的末次分区位置表示,用于标识第二数据消息队列的数据添加进度。其中,第二数据消息队列中的末次分区位置是指在Flink实时处理框架正常启动前或系统出现故障重启Flink实时处理框架前,最后一个队列处理数据添加至第二数据消息队列中所对应的第二目标分区位置。;然后根据第二目标分区序号和第二数据处理进度标识确定队列处理数据在第二目标分区序号中的第二目标分区位置,其中第二目标分区位置与第二数据处理进度标识对应;最后根据第二目标分区序号和第二目标分区位置将队列处理数据添加至第二数据消息队列中的第二目标数据分区。The sequence number of the second target partition may be the sequence number of the second target data partition where the to-be-processed data corresponding to the queue processing data obtained after processing is located in the second data message queue. The second target partition location may be a specific queue location in the second target data partition in the second data message queue of the to-be-processed data corresponding to the queue processing data obtained after processing. The second target data partition, that is, the queue processing data obtained after processing, is added to the corresponding partition in the second data message queue. When adding the queue processing data to the second data message queue, the second target partition sequence number and the second data processing progress identifier corresponding to the queue processing data may be determined first. In this embodiment of the present invention, the second data processing identifier may be passed through the The position representation of the last partition in the second data message queue is used to identify the data addition progress of the second data message queue. Among them, the position of the last partition in the second data message queue means that before the Flink real-time processing framework starts normally or the system fails to restart the Flink real-time processing framework, the last queue processing data is added to the second data message queue corresponding to the first partition 2. Target partition location. Then determine the second target subregion position of the queue processing data in the second target subregion sequence number according to the second target subregion sequence number and the second data processing progress mark, wherein the second target subregion position corresponds to the second data processing progress indicator; The second target partition sequence number and the second target partition location add the queue processing data to the second target data partition in the second data message queue.

需要说明的是,基于Flink实时处理框架采用流式数据处理方式处理第一数据消息队列中的各所述待处理数据时,其确定的第一数据处理进度标识用于锁定第一数据消息队列的数据处理进度。同理,将队列处理数据添加至第二数据消息队列时,其确定的第二数据处理进度标识用于锁定第二数据消息队列的数据添加进度。也即,第一数据处理进度标识和第二数据处理进度标识是两个不同的进度标识。It should be noted that, when each of the data to be processed in the first data message queue is processed by the streaming data processing method based on the Flink real-time processing framework, the determined first data processing progress identifier is used to lock the first data message queue. Data processing progress. Similarly, when the queue processing data is added to the second data message queue, the determined second data processing progress identifier is used to lock the data adding progress of the second data message queue. That is, the first data processing progress identifier and the second data processing progress identifier are two different progress identifiers.

例如,假设当前待处理数据所在第一数据消息队列中的第一目标数据分区的第一目标分区序号为2,对应的第一数据处理进度标识为04,表示当前正在处理第一数据消息队列中的数据分区2内的第4条数据,由于可能出现数据处理速度与获取待处理数据的速度不一致的问题,使得在第一数据消息队列中的待处理数据已经添加至第一目标数据分区2中的第一目标分区位置08处,甚至有可能出现第一数据消息队列中的待处理数据已经添加至第一目标数据分区3中,在这种情况下,如果系统在数据处理过程中出现故障后重启Flink实时处理框架,在重新确定当前待处理数据时,需要根据当前数据分区对应的当前分区序号(即分区2)以及第一数据处理进度标识(即04)确定当前待处理数据,然后再基于Flink实时处理框架处理当前待处理数据。For example, it is assumed that the first target partition sequence number of the first target data partition in the first data message queue where the current data to be processed is located is 2, and the corresponding first data processing progress identifier is 04, indicating that the first data message queue is currently being processed. The fourth piece of data in the data partition 2 of the first data message queue has been added to the first target data partition 2 due to the inconsistency between the data processing speed and the speed of obtaining the data to be processed. At the position 08 of the first target partition, it may even appear that the pending data in the first data message queue has been added to the first target data partition 3. Restart the Flink real-time processing framework. When re-determining the current data to be processed, it is necessary to determine the current data to be processed according to the current partition sequence number corresponding to the current data partition (ie partition 2) and the first data processing progress identifier (ie 04), and then based on The Flink real-time processing framework processes the currently pending data.

需要指出的是,如果获取待处理数据的速度与数据处理速度一致,也就是说,当获取到一条待处理数据后就立即完成对该条待处理数据的处理,然后马上处理刚获取到的下一条待处理数据,在这种情况下,当前数据分区对应的当前分区序号与第一目标数据分区中的待处理数据的第一目标分区序号相对应,且第一数据处理进度标识与第一目标分区位置相对应,故仅根据第一数据消息队列中的第一目标分区序号及第一目标分区位置也可确定当前待处理数据。It should be pointed out that if the speed of obtaining the data to be processed is the same as the data processing speed, that is to say, when a piece of data to be processed is obtained, the processing of the piece of data to be processed will be completed immediately, and then the next piece of data just obtained will be processed immediately. A piece of data to be processed, in this case, the current partition serial number corresponding to the current data partition corresponds to the first target partition serial number of the to-be-processed data in the first target data partition, and the first data processing progress identifier corresponds to the first target The positions of the partitions correspond to each other, so the current data to be processed can be determined only according to the sequence number of the first target partition and the position of the first target partition in the first data message queue.

再例如,在将当前待处理数据基于Flink实时处理框架处理后得到的队列处理数据添加至第二数据消息队列时,考虑到数据处理速度与数据添加至第二数据消息队列的速度不一致的问题,需要确定队列处理数据对应的第二目标分区序号和第二数据处理进度标识,假设对应的第二目标分区序号为2且第二数据处理进度标识为04,进而可以确定队列处理数据应该添加至第二目标分区2中的第二目标分区位置04处,根据第二目标分区序号2和第二目标分区位置04,将队列处理数据添加至第二数据消息队列中的第二目标数据分区2中。For another example, when adding the queue processing data obtained by processing the current data to be processed based on the Flink real-time processing framework to the second data message queue, considering the inconsistency between the data processing speed and the speed at which data is added to the second data message queue, It is necessary to determine the second target partition sequence number and the second data processing progress identifier corresponding to the queue processing data. Assuming that the corresponding second target partition serial number is 2 and the second data processing progress identifier is 04, it can be determined that the queue processing data should be added to the first partition. At the second target partition position 04 in the two target partitions 2, the queue processing data is added to the second target data partition 2 in the second data message queue according to the second target partition sequence number 2 and the second target partition position 04.

需要指出的是,如果数据处理速度与队列处理数据添加至第二数据消息队列的速度一致,也就是说,当处理完一条待处理数据后就可以立即将得到的队列处理数据添加至第二数据消息队列中,然后再立即添加得到的下一个队列处理数据,在这种情况下,第二目标分区序号和第一目标分区序号相对应,,且第二数据处理进度标识和第一数据处理进度标识相对应,故可以根据第一目标分区序号和第一数据处理进度标识将得到的队列处理数据添加至第二数据消息队列中。It should be pointed out that if the data processing speed is the same as the speed at which the queue processing data is added to the second data message queue, that is, when a piece of pending data is processed, the obtained queue processing data can be immediately added to the second data message queue. In the message queue, and then immediately add the obtained next queue processing data, in this case, the second target partition serial number corresponds to the first target partition serial number, and the second data processing progress identifier and the first data processing progress The identifiers correspond, so the obtained queue processing data can be added to the second data message queue according to the sequence number of the first target partition and the first data processing progress identifier.

可以理解的是,第一数据消息队列与第二数据消息队列对数据的存储模式一致,即同一个数据在两个数据消息队列中的存储位置一致。例如,待处理数据在第一数据消息队列中对应的第一目标分区序号为0且第一目标分区位置为2,则在将该待处理数据实时处理后得到的队列处理数据添加至第二数据消息队列时,对应的第二目标分区序号为0且第二目标分区位置为2。It can be understood that the storage modes of the data in the first data message queue and the second data message queue are consistent, that is, the storage locations of the same data in the two data message queues are consistent. For example, if the sequence number of the first target partition corresponding to the data to be processed in the first data message queue is 0 and the position of the first target partition is 2, the queue processing data obtained after real-time processing of the data to be processed is added to the second data When the message queue is used, the sequence number of the corresponding second target partition is 0 and the position of the second target partition is 2.

图3是本发明实施例提供的一种断点续传的流程示意图,在一个具体的例子中,如图3所示,在获取待处理数据的速度、数据处理速度与队列处理数据添加至第二数据消息队列的速度互不一致的情况下,数据在正常处理及传输时,基于Flink实时处理框架处理后,将得到的队列处理数据可以根据第二目标序号和第二目标分区位置添加至对应的第二数据消息队列中的第二目标数据分区中。当系统在数据处理过程中出现故障重启Flink实时处理框架后,可以确定当前数据分区对应的当前分区序号以及第一数据消息队列的数据处理进度(即第一数据处理进度标识),然后根据第一数据消息队列中的当前分区序号和第一数据处理进度标识确定当前待处理数据,并基于Flink实时处理框架继续处理当前待处理数据,从而可以保证在系统故障前进行数据处理的位置处继续处理待处理数据,实现数据的断点续传。进而可以根据第二目标分区序号和第二数据消息队列的数据添加进度(即第二数据处理进度标识)确定队列处理数据在第二目标分区序号中的第二目标分区位置,继续将队列处理数据添加至第二数据消息队列中的第二目标数据分区中。FIG. 3 is a schematic flowchart of a breakpoint resuming transmission provided by an embodiment of the present invention. In a specific example, as shown in FIG. 3 , when the speed of acquiring the data to be processed, the data processing speed and the queue processing data are added to the first When the speeds of the two data message queues are inconsistent with each other, during normal processing and transmission of data, after processing based on the Flink real-time processing framework, the obtained queue processing data can be added to the corresponding data according to the second target sequence number and the second target partition location in the second target data partition in the second data message queue. When the system fails during data processing and restarts the Flink real-time processing framework, it can determine the current partition sequence number corresponding to the current data partition and the data processing progress of the first data message queue (ie, the first data processing progress identifier), and then according to the first The current partition serial number in the data message queue and the first data processing progress identifier determine the current pending data, and continue to process the current pending data based on the Flink real-time processing framework, so as to ensure that the pending data processing continues at the location where the data processing was performed before the system failure. Process data and realize data resuming. And then can determine the second target partition position of the queue processing data in the second target partition serial number according to the second target partition sequence number and the data addition progress of the second data message queue (that is, the second data processing progress mark), and continue to queue the processing data. is added to the second target data partition in the second data message queue.

S250、基于Flink实时处理框架对第二数据消息队列中的各队列处理数据进行实时数据处理。S250. Perform real-time data processing on each queue processing data in the second data message queue based on the Flink real-time processing framework.

本实施例未尽详细解释之处请参见前述实施例,在此不再赘述。For details that are not explained in this embodiment, please refer to the foregoing embodiments, which will not be repeated here.

图4是本发明实施例提供的一种数据清洗及处理的流程示意图,在一个具体的例子中,如图4所示,通过使用Flume技术的数据采集器采集待清洗数据,其中,采集的待清洗数据可以包括log文件、埋点数据或外部数据,将采集的待清洗数据添加至第一数据消息队列中,然后既可以读取第一数据消息队列中的各待清洗数据并将各待清洗数据存储至分布式文件系统,以保留原始数据,用于对比校验,也可以基于Flink实时处理框架采用流式处理方式清洗第一数据消息队列中的各待清洗数据,并将得到的队列处理数据添加至第二数据消息队列中,以基于Flink实时处理框架对第二数据消息队列中的各队列处理数据进行实时数据处理,此外,还可以从第二数据消息队列中读取各队列处理数据,并将其存储至静态数据库,以用作对个队列处理数据进行批量数据处理,并可以将批量数据处理结果和实时数据处理结果存储至结果数据库中,或从第二数据消息队列中读取各队列处理数据,并通过预设接口发送至目标应用程序,实现了Flink实时处理框架的架构扩展,满足了不同应用程序所对应的数据需求,提高了架构的灵活性。FIG. 4 is a schematic flowchart of data cleaning and processing provided by an embodiment of the present invention. In a specific example, as shown in FIG. 4 , the data to be cleaned is collected by a data collector using the Flume technology. The cleaning data can include log files, buried point data or external data. The collected data to be cleaned is added to the first data message queue, and then each data to be cleaned in the first data message queue can be read and the data to be cleaned can be read. The data is stored in the distributed file system to retain the original data for comparison and verification. It is also possible to use streaming processing to clean the data to be cleaned in the first data message queue based on the Flink real-time processing framework, and process the resulting queue. The data is added to the second data message queue to perform real-time data processing on the processing data of each queue in the second data message queue based on the Flink real-time processing framework. In addition, the processing data of each queue can also be read from the second data message queue , and store it in the static database for batch data processing of each queue processing data, and can store the batch data processing results and real-time data processing results in the result database, or read each queue from the second data message queue. The queue processes data and sends it to the target application through the preset interface, which realizes the architecture expansion of the Flink real-time processing framework, meets the data requirements corresponding to different applications, and improves the flexibility of the architecture.

上述技术方案,在将获取到的待处理数据根据第一目标分区序号和第一目标分区位置添加至第一数据消息队列中的第一目标数据分区后,在基于Flink实时处理框架采用流式数据处理方式处理第一数据消息队列中的各待处理数据时,采用了一种闭环方式进行数据的处理,即:首先确定当前数据分区对应的当前分区序号以及数据处理进度标识,并根据当前分区序号以及数据处理进度标识确定当前待处理数据,然后基于Flink实时处理框架实时处理当前待处理数据,得到队列处理数据,从而避免了在Flink实时处理框架正常启动或系统出现故障重启时出现数据不一致及降低数据实时性的问题,之后再将队列处理数据添加至第二数据消息队列,从而可以基于Flink实时处理框架对第二数据消息队列中的各队列处理数据进行实时数据处理,实现了基于Flink实时处理框架并采用两个数据消息队列对数据进行实时处理,并在数据处理过程中采用闭环方式,对数据无特殊要求,解决了现有数据处理过程难以有效保证一致性和实时性的问题,保证了数据处理过程的一致性和实时性,并实现了数据断点续传的功能。In the above technical solution, after the acquired data to be processed is added to the first target data partition in the first data message queue according to the first target partition serial number and the first target partition position, the streaming data is used based on the Flink real-time processing framework. When processing the data to be processed in the first data message queue, a closed-loop method is used to process the data, that is: first determine the current partition serial number corresponding to the current data partition and the data processing progress identifier, and according to the current partition serial number And the data processing progress indicator determines the current data to be processed, and then processes the current data to be processed in real time based on the Flink real-time processing framework to obtain the queue processing data, thus avoiding data inconsistency and reduction when the Flink real-time processing framework starts normally or the system restarts due to failure. The problem of real-time data, and then add the queue processing data to the second data message queue, so that real-time data processing can be performed on the processing data of each queue in the second data message queue based on the Flink real-time processing framework, realizing real-time processing based on Flink. The framework also uses two data message queues to process data in real time, and adopts a closed-loop method in the data processing process, with no special requirements for data, which solves the problem that the existing data processing process is difficult to effectively ensure consistency and real-time performance, and ensures The data processing process is consistent and real-time, and the function of continuous data transmission is realized.

实施例三Embodiment 3

图5是本发明实施例三提供的一种数据处理装置的结构示意图,本发明实实例可适用于基于Flink实时处理框架对任意类型的数据进行处理以保证数据处理过程的一致性和实时性的情况,该装置可采用软件和/或硬件的方式实现,并一般可集成在计算机设备中。FIG. 5 is a schematic structural diagram of a data processing apparatus provided in Embodiment 3 of the present invention. The embodiment of the present invention is applicable to processing any type of data based on the Flink real-time processing framework to ensure the consistency and real-time performance of the data processing process. In some cases, the apparatus can be implemented in software and/or hardware, and can generally be integrated in computer equipment.

如图5所示,该数据查询装置具体包括:第一数据消息队列生成模块510、队列处理数据生成模块520、第二数据消息队列生成模块530和实时数据处理模块540。其中,As shown in FIG. 5 , the data query apparatus specifically includes: a first data message queue generation module 510 , a queue processing data generation module 520 , a second data message queue generation module 530 and a real-time data processing module 540 . in,

第一数据消息队列生成模块510,设置为获取待处理数据,将所述待处理数据添加至第一数据消息队列;The first data message queue generating module 510 is configured to obtain data to be processed, and add the data to be processed to the first data message queue;

队列处理数据生成模块520,设置为基于Flink实时处理框架采用流式数据处理方式处理所述第一数据消息队列中的各所述待处理数据,得到队列处理数据;The queue processing data generation module 520 is configured to process each of the to-be-processed data in the first data message queue based on the Flink real-time processing framework using a streaming data processing method to obtain queue processing data;

第二数据消息队列生成模块530,设置为将所述队列处理数据添加至第二数据消息队列;The second data message queue generating module 530 is configured to add the queue processing data to the second data message queue;

实时数据处理模块540,设置为基于所述Flink实时处理框架对所述第二数据消息队列中的各所述队列处理数据进行实时数据处理。The real-time data processing module 540 is configured to perform real-time data processing on each of the queue processing data in the second data message queue based on the Flink real-time processing framework.

本发明实施例提供的技术方案,在将获取到的待处理数据添加至第一数据消息队列后,基于Flink实时处理框架并采用流式数据处理方式对第一数据消息队列中的各待处理数据进行处理,得到队列处理数据,然后将队列处理数据添加至第二数据消息队列,从而可以基于Flink实时处理框架对第二数据消息队列中的各队列处理数据进行实时数据处理,实现了基于Flink实时处理框架并采用两个数据消息队列对数据进行实时处理,对数据无特殊要求,解决了现有数据处理过程难以有效保证一致性和实时性的问题,保证了数据处理过程的一致性和实时性。According to the technical solution provided by the embodiment of the present invention, after the acquired data to be processed is added to the first data message queue, each data to be processed in the first data message queue is processed based on the Flink real-time processing framework and the streaming data processing method is adopted. Perform processing to obtain the queue processing data, and then add the queue processing data to the second data message queue, so that real-time data processing can be performed on the queue processing data in the second data message queue based on the Flink real-time processing framework. Real-time data processing based on Flink The processing framework uses two data message queues to process data in real time, with no special requirements for data, which solves the problem that the existing data processing process is difficult to effectively ensure consistency and real-time performance, and ensures the consistency and real-time performance of the data processing process. .

可选的,第一数据消息队列生成模块510,具体设置为:Optionally, the first data message queue generating module 510 is specifically set as:

确定所述待处理数据对应的第一目标分区序号和第一目标分区位置;Determine the first target partition sequence number and the first target partition position corresponding to the data to be processed;

根据所述第一目标分区序号和所述第一目标分区位置将所述待处理数据添加至所述第一数据消息队列中的第一目标数据分区。The data to be processed is added to the first target data partition in the first data message queue according to the sequence number of the first target partition and the position of the first target partition.

可选的,队列处理数据生成模块520,还包括:当前分区序号及处理进度标识确定单元、当前待处理数据确定单元和当前待处理数据处理单元,其中,Optionally, the queue processing data generation module 520 further includes: a current partition serial number and a processing progress identification unit, a current data determination unit to be processed, and a current data processing unit to be processed, wherein,

当前分区序号及处理进度标识确定单元,设置为确定当前数据分区对应的当前分区序号以及第一数据处理进度标识;The current partition serial number and the processing progress identification determining unit is set to determine the current partition serial number corresponding to the current data partition and the first data processing progress identification;

当前待处理数据确定单元,设置为根据所述当前分区序号以及所述第一数据处理进度标识确定当前待处理数据;A current to-be-processed data determination unit, configured to determine current to-be-processed data according to the current partition serial number and the first data processing progress identifier;

当前待处理数据处理单元,设置为基于所述Flink实时处理框架实时处理所述当前待处理数据。The current to-be-processed data processing unit is set to process the current to-be-processed data in real time based on the Flink real-time processing framework.

可选的,当前待处理数据处理单元,具体设置为:Optionally, the current data processing unit to be processed, specifically set as:

基于所述Flink实时处理框架对所述当前待处理数据实时转化为目标指向性数据。Based on the Flink real-time processing framework, the current data to be processed is converted into target directional data in real time.

可选的,第二数据消息队列生成模块530,具体设置为:Optionally, the second data message queue generation module 530 is specifically set as:

确定所述队列处理数据对应的第二目标分区序号和第二数据处理进度标识;Determine the second target partition sequence number and the second data processing progress identifier corresponding to the queue processing data;

根据所述第二目标分区序号和所述第二数据处理进度标识确定所述队列处理数据在所述第二目标分区序号中的第二目标分区位置;Determine the second target partition position of the queue processing data in the second target partition serial number according to the second target partition serial number and the second data processing progress identifier;

根据所述第二目标分区序号和所述第二目标分区位置将所述队列处理数据添加至所述第二数据消息队列中的第二目标数据分区。The queue processing data is added to the second target data partition in the second data message queue according to the second target partition sequence number and the second target partition position.

可选的,上述装置还包括:待处理数据存储模块,其中,待处理数据存储模块具体设置为在所述基于Flink实时处理框架处理所述第一数据消息队列中的各所述待处理数据之前,读取所述第一数据消息队列中的各所述待处理数据;Optionally, the above-mentioned device further includes: a data storage module to be processed, wherein the data storage module to be processed is specifically configured to process each of the data to be processed in the first data message queue based on the Flink real-time processing framework. , read each of the data to be processed in the first data message queue;

将各所述待处理数据存储至分布式文件系统。Each of the data to be processed is stored in a distributed file system.

可选的,上述装置还包括:数据处理结果存储模块或处理数据发送模块,其中,数据处理结果存储模块具体设置为:在所述对所述第二数据消息队列中的各所述队列处理数据进行实时数据处理之后,从所述第二数据消息队列中读取各所述队列处理数据,并存储至静态数据库;所述静态数据库用于对各所述队列处理数据进行批量数据处理;Optionally, the above-mentioned device further includes: a data processing result storage module or a processing data sending module, wherein the data processing result storage module is specifically configured to: process data for each of the queues in the second data message queue. After the real-time data processing is performed, each of the queue processing data is read from the second data message queue and stored in a static database; the static database is used to perform batch data processing on each of the queue processing data;

将实时数据处理结果和批量数据处理结果存储到结果数据库;或Store real-time data processing results and batch data processing results in a results database; or

处理数据发送模块具体设置为:在所述对所述第二数据消息队列中的各所述队列处理数据进行实时数据处理之后,从所述第二数据消息队列中读取各所述队列处理数据,并通过预设接口发送至目标应用程序。The processing data sending module is specifically configured to: after performing real-time data processing on each of the queue processing data in the second data message queue, read each of the queue processing data from the second data message queue , and sent to the target application through the preset interface.

上述数据处理装置可执行本发明任意实施例所提供的数据处理方法,具备执行数据处理方法相应的功能模块和有益效果。The above data processing apparatus can execute the data processing method provided by any embodiment of the present invention, and has corresponding functional modules and beneficial effects for executing the data processing method.

实施例四Embodiment 4

图6为本发明实施例四提供的一种计算机设备的硬件结构示意图。图6示出了适于用来实现本发明实施方式的示例性计算机设备12的框图。图6显示的计算机设备12仅仅是一个示例,不应对本发明实施例的功能和使用范围带来任何限制。FIG. 6 is a schematic diagram of a hardware structure of a computer device according to Embodiment 4 of the present invention. Figure 6 shows a block diagram of an exemplary computer device 12 suitable for use in implementing embodiments of the present invention. The computer device 12 shown in FIG. 6 is only an example, and should not impose any limitation on the function and scope of use of the embodiments of the present invention.

如图6所示,计算机设备12以通用计算设备的形式表现。计算机设备12的组件可以包括但不限于:一个或者多个处理器或者处理单元16,系统存储器28,连接不同系统组件(包括系统存储器28和处理单元16)的总线18。As shown in FIG. 6, computer device 12 takes the form of a general-purpose computing device. Components of computer device 12 may include, but are not limited to, one or more processors or processing units 16 , system memory 28 , and a bus 18 connecting various system components including system memory 28 and processing unit 16 .

总线18表示几类总线结构中的一种或多种,包括存储器总线或者存储器控制器,外围总线,图形加速端口,处理器或者使用多种总线结构中的任意总线结构的局域总线。举例来说,这些体系结构包括但不限于工业标准体系结构(ISA)总线,微通道体系结构(MAC)总线,增强型ISA总线、视频电子标准协会(VESA)局域总线以及外围组件互连(PCI)总线。Bus 18 represents one or more of several types of bus structures, including a memory bus or memory controller, a peripheral bus, a graphics acceleration port, a processor, or a local bus using any of a variety of bus structures. By way of example, these architectures include, but are not limited to, Industry Standard Architecture (ISA) bus, Micro Channel Architecture (MAC) bus, Enhanced ISA bus, Video Electronics Standards Association (VESA) local bus, and Peripheral Component Interconnect ( PCI) bus.

计算机设备12典型地包括多种计算机系统可读介质。这些介质可以是任何能够被计算机设备12访问的可用介质,包括易失性和非易失性介质,可移动的和不可移动的介质。Computer device 12 typically includes a variety of computer system readable media. These media can be any available media that can be accessed by computer device 12, including both volatile and nonvolatile media, removable and non-removable media.

系统存储器28可以包括易失性存储器形式的计算机系统可读介质,例如随机存取存储器(RAM)30和/或高速缓存存储器32。计算机设备12可以进一步包括其它可移动/不可移动的、易失性/非易失性计算机系统存储介质。仅作为举例,存储系统34可以用于读写不可移动的、非易失性磁介质(图6未显示,通常称为“硬盘驱动器”)。尽管图6中未示出,可以提供用于对可移动非易失性磁盘(例如“软盘”)读写的磁盘驱动器,以及对可移动非易失性光盘(例如CD-ROM,DVD-ROM或者其它光介质)读写的光盘驱动器。在这些情况下,每个驱动器可以通过一个或者多个数据介质接口与总线18相连。系统存储器28可以包括至少一个程序产品,该程序产品具有一组(例如至少一个)程序模块,这些程序模块被配置以执行本发明各实施例的功能。System memory 28 may include computer system readable media in the form of volatile memory, such as random access memory (RAM) 30 and/or cache memory 32 . Computer device 12 may further include other removable/non-removable, volatile/non-volatile computer system storage media. For example only, storage system 34 may be used to read and write to non-removable, non-volatile magnetic media (not shown in FIG. 6, commonly referred to as a "hard drive"). Although not shown in Figure 6, a disk drive may be provided for reading and writing to removable non-volatile magnetic disks (eg "floppy disks"), as well as removable non-volatile optical disks (eg CD-ROM, DVD-ROM) or other optical media) to read and write optical drives. In these cases, each drive may be connected to bus 18 through one or more data media interfaces. System memory 28 may include at least one program product having a set (eg, at least one) of program modules configured to perform the functions of various embodiments of the present invention.

具有一组(至少一个)程序模块42的程序/实用工具40,可以存储在例如系统存储器28中,这样的程序模块42包括但不限于操作系统、一个或者多个应用程序、其它程序模块以及程序数据,这些示例中的每一个或某种组合中可能包括网络环境的实现。程序模块42通常执行本发明所描述的实施例中的功能和/或方法。A program/utility 40 having a set (at least one) of program modules 42, which may be stored, for example, in system memory 28, such program modules 42 including, but not limited to, an operating system, one or more application programs, other program modules, and programs Data, each or some combination of these examples may include an implementation of a network environment. Program modules 42 generally perform the functions and/or methods of the described embodiments of the present invention.

计算机设备12也可以与一个或多个外部设备14(例如键盘、指向设备、显示器24等)通信,还可与一个或者多个使得用户能与该计算机设备12交互的设备通信,和/或与使得该计算机设备12能与一个或多个其它计算设备进行通信的任何设备(例如网卡,调制解调器等等)通信。这种通信可以通过输入/输出(I/O)接口22进行。并且,计算机设备12还可以通过网络适配器20与一个或者多个网络(例如局域网(LAN),广域网(WAN)和/或公共网络,例如因特网)通信。如图所示,网络适配器20通过总线18与计算机设备12的其它模块通信。应当明白,尽管图6中未示出,可以结合计算机设备12使用其它硬件和/或软件模块,包括但不限于:微代码、设备驱动器、冗余处理单元、外部磁盘驱动阵列、RAID系统、磁带驱动器以及数据备份存储系统等。Computer device 12 may also communicate with one or more external devices 14 (eg, keyboard, pointing device, display 24, etc.), may also communicate with one or more devices that enable a user to interact with computer device 12, and/or communicate with Any device (eg, network card, modem, etc.) that enables the computer device 12 to communicate with one or more other computing devices. Such communication may take place through input/output (I/O) interface 22 . Also, the computer device 12 may communicate with one or more networks (eg, a local area network (LAN), a wide area network (WAN), and/or a public network such as the Internet) through a network adapter 20 . As shown, network adapter 20 communicates with other modules of computer device 12 via bus 18 . It should be understood that, although not shown in FIG. 6, other hardware and/or software modules may be used in conjunction with computer device 12, including but not limited to: microcode, device drivers, redundant processing units, external disk drive arrays, RAID systems, tapes drives and data backup storage systems.

处理单元16通过运行存储在系统存储器28中的程序,从而执行各种功能应用以及数据处理,例如实现本发明实施例所提供的一种数据处理方法。也即,所述处理单元执行所述程序时实现:The processing unit 16 executes various functional applications and data processing by running the program stored in the system memory 28, for example, implements a data processing method provided by the embodiment of the present invention. That is, when the processing unit executes the program, it realizes:

获取待处理数据,将所述待处理数据添加至第一数据消息队列;obtaining data to be processed, and adding the data to be processed to the first data message queue;

基于Flink实时处理框架采用流式数据处理方式处理所述第一数据消息队列中的各所述待处理数据,得到队列处理数据;Based on the Flink real-time processing framework, the data to be processed in the first data message queue is processed by a streaming data processing method, and the queue processing data is obtained;

将所述队列处理数据添加至第二数据消息队列;adding the queue processing data to the second data message queue;

基于所述Flink实时处理框架对所述第二数据消息队列中的各所述队列处理数据进行实时数据处理。Real-time data processing is performed on each of the queue processing data in the second data message queue based on the Flink real-time processing framework.

实施例五Embodiment 5

本发明实施例五提供了一种计算机可读存储介质,其上存储有计算机程序,该程序被处理器执行时实现如本申请所有发明实施例提供的一种数据处理方法:也即,该程序被处理器执行时实现:Embodiment 5 of the present invention provides a computer-readable storage medium on which a computer program is stored, and when the program is executed by a processor, implements a data processing method as provided by all inventive embodiments of the present application: that is, the program Implemented when executed by the processor:

获取待处理数据,将所述待处理数据添加至第一数据消息队列;obtaining data to be processed, and adding the data to be processed to the first data message queue;

基于Flink实时处理框架采用流式数据处理方式处理所述第一数据消息队列中的各所述待处理数据,得到队列处理数据;Based on the Flink real-time processing framework, the data to be processed in the first data message queue is processed by a streaming data processing method, and the queue processing data is obtained;

将所述队列处理数据添加至第二数据消息队列;adding the queue processing data to the second data message queue;

基于所述Flink实时处理框架对所述第二数据消息队列中的各所述队列处理数据进行实时数据处理。Real-time data processing is performed on each of the queue processing data in the second data message queue based on the Flink real-time processing framework.

可以采用一个或多个计算机可读的介质的任意组合。计算机可读介质可以是计算机可读信号介质或者计算机可读存储介质。计算机可读存储介质例如可以是但不限于电、磁、光、电磁、红外线、或半导体的系统、装置或器件,或者任意以上的组合。计算机可读存储介质的更具体的例子(非穷举的列表)包括:具有一个或多个导线的电连接、便携式计算机磁盘、硬盘、随机存取存储器(RAM)、只读存储器(ROM)、可擦式可编程只读存储器(EPROM或闪存)、光纤、便携式紧凑磁盘只读存储器(CD-ROM)、光存储器件、磁存储器件、或者上述的任意合适的组合。在本文件中,计算机可读存储介质可以是任何包含或存储程序的有形介质,该程序可以被指令执行系统、装置或者器件使用或者与其结合使用。Any combination of one or more computer-readable media may be employed. The computer-readable medium may be a computer-readable signal medium or a computer-readable storage medium. The computer-readable storage medium may be, for example, but not limited to, an electrical, magnetic, optical, electromagnetic, infrared, or semiconductor system, apparatus or device, or a combination of any of the above. More specific examples (a non-exhaustive list) of computer readable storage media include: electrical connections having one or more wires, portable computer disks, hard disks, random access memory (RAM), read only memory (ROM), Erasable programmable read only memory (EPROM or flash memory), optical fiber, portable compact disk read only memory (CD-ROM), optical storage devices, magnetic storage devices, or any suitable combination of the foregoing. In this document, a computer-readable storage medium can be any tangible medium that contains or stores a program that can be used by or in conjunction with an instruction execution system, apparatus, or device.

计算机可读的信号介质可以包括在基带中或者作为载波一部分传播的数据信号,其中承载了计算机可读的程序代码。这种传播的数据信号可以采用多种形式,包括但不限于电磁信号、光信号或上述的任意合适的组合。计算机可读的信号介质还可以是计算机可读存储介质以外的任何计算机可读介质,该计算机可读介质可以发送、传播或者传输用于由指令执行系统、装置或者器件使用或者与其结合使用的程序。A computer-readable signal medium may include a propagated data signal in baseband or as part of a carrier wave, with computer-readable program code embodied thereon. Such propagated data signals may take a variety of forms, including but not limited to electromagnetic signals, optical signals, or any suitable combination of the foregoing. A computer-readable signal medium can also be any computer-readable medium other than a computer-readable storage medium that can transmit, propagate, or transport the program for use by or in connection with the instruction execution system, apparatus, or device .

计算机可读介质上包含的程序代码可以用任何适当的介质传输,包括但不限于无线、电线、光缆、RF等等,或者上述的任意合适的组合。Program code embodied on a computer readable medium may be transmitted using any suitable medium, including but not limited to wireless, wireline, optical fiber cable, RF, etc., or any suitable combination of the foregoing.

可以以一种或多种程序设计语言或其组合来编写用于执行本发明操作的计算机程序代码,所述程序设计语言包括面向对象的程序设计语言(诸如Java、Smalltalk、C++),还包括常规的过程式程序设计语言(诸如“C”语言或类似的程序设计语言)。程序代码可以完全地在用户计算机上执行、部分地在用户计算机上执行、作为一个独立的软件包执行、部分在用户计算机上部分在远程计算机上执行、或者完全在远程计算机或服务器上执行。在涉及远程计算机的情形中,远程计算机可以通过任意种类的网络(包括局域网(LAN)或广域网(WAN)),连接到用户计算机,或者,可以连接到外部计算机(例如利用因特网服务提供商来通过因特网连接)。Computer program code for carrying out operations of the present invention may be written in one or more programming languages, including object-oriented programming languages (such as Java, Smalltalk, C++), and conventional procedural programming language (such as the "C" language or similar programming language). The program code may execute entirely on the user's computer, partly on the user's computer, as a stand-alone software package, partly on the user's computer and partly on a remote computer, or entirely on the remote computer or server. Where a remote computer is involved, the remote computer may be connected to the user's computer through any kind of network, including a local area network (LAN) or a wide area network (WAN), or may be connected to an external computer (eg, using an Internet service provider through Internet connection).

注意,上述仅为本发明的较佳实施例及所运用技术原理。本领域技术人员会理解,本发明不限于这里所述的特定实施例,对本领域技术人员来说能够进行各种明显的变化、重新调整和替代而不会脱离本发明的保护范围。因此,虽然通过以上实施例对本发明进行了较为详细的说明,但是本发明不仅仅限于以上实施例,在不脱离本发明构思的情况下,还可以包括更多其他等效实施例,而本发明的范围由所附的权利要求范围决定。Note that the above are only preferred embodiments of the present invention and applied technical principles. Those skilled in the art will understand that the present invention is not limited to the specific embodiments described herein, and various obvious changes, readjustments and substitutions can be made by those skilled in the art without departing from the protection scope of the present invention. Therefore, although the present invention has been described in detail through the above embodiments, the present invention is not limited to the above embodiments, and can also include more other equivalent embodiments without departing from the concept of the present invention. The scope is determined by the scope of the appended claims.

Claims (10)

1.一种数据处理方法,其特征在于,包括:1. a data processing method, is characterized in that, comprises: 获取待处理数据,将所述待处理数据添加至第一数据消息队列;obtaining data to be processed, and adding the data to be processed to the first data message queue; 基于Flink实时处理框架采用流式数据处理方式处理所述第一数据消息队列中的各所述待处理数据,得到队列处理数据;Based on the Flink real-time processing framework, the data to be processed in the first data message queue is processed by a streaming data processing method, and the queue processing data is obtained; 将所述队列处理数据添加至第二数据消息队列;adding the queue processing data to the second data message queue; 基于所述Flink实时处理框架对所述第二数据消息队列中的各所述队列处理数据进行实时数据处理。Real-time data processing is performed on each of the queue processing data in the second data message queue based on the Flink real-time processing framework. 2.根据权利要求1所述的方法,其特征在于,所述将所述待处理数据添加至第一数据消息队列,包括:2. The method according to claim 1, wherein the adding the to-be-processed data to the first data message queue comprises: 确定所述待处理数据对应的第一目标分区序号和第一目标分区位置;Determine the first target partition sequence number and the first target partition position corresponding to the data to be processed; 根据所述第一目标分区序号和所述第一目标分区位置将所述待处理数据添加至所述第一数据消息队列中的第一目标数据分区。The data to be processed is added to the first target data partition in the first data message queue according to the sequence number of the first target partition and the position of the first target partition. 3.根据权利要求2所述的方法,其特征在于,所述基于Flink实时处理框架采用流式数据处理方式处理所述第一数据消息队列中的各所述待处理数据,包括:3. The method according to claim 2, wherein the processing of each of the data to be processed in the first data message queue based on the Flink real-time processing framework using a streaming data processing method comprises: 确定当前数据分区对应的当前分区序号以及第一数据处理进度标识;Determine the current partition sequence number corresponding to the current data partition and the first data processing progress identifier; 根据所述当前分区序号以及所述第一数据处理进度标识确定当前待处理数据;Determine the current data to be processed according to the current partition serial number and the first data processing progress identifier; 基于所述Flink实时处理框架实时处理所述当前待处理数据。The current data to be processed is processed in real time based on the Flink real-time processing framework. 4.根据权利要求3所述的方法,其特征在于,所述基于所述Flink实时处理框架实时处理所述当前待处理数据,包括:4. The method according to claim 3, wherein the real-time processing of the current data to be processed based on the Flink real-time processing framework comprises: 基于所述Flink实时处理框架对所述当前待处理数据实时转化为目标指向性数据。Based on the Flink real-time processing framework, the current data to be processed is converted into target directional data in real time. 5.根据权利要求2-4任一所述的方法,其特征在于,所述将所述队列处理数据添加至第二数据消息队列,包括:5. The method according to any one of claims 2-4, wherein the adding the queue processing data to the second data message queue comprises: 确定所述队列处理数据对应的第二目标分区序号和第二数据处理进度标识;Determine the second target partition sequence number and the second data processing progress identifier corresponding to the queue processing data; 根据所述第二目标分区序号和所述第二数据处理进度标识确定所述队列处理数据在所述第二目标分区序号中的第二目标分区位置;Determine the second target partition position of the queue processing data in the second target partition serial number according to the second target partition serial number and the second data processing progress identifier; 根据所述第二目标分区序号和所述第二目标分区位置将所述队列处理数据添加至所述第二数据消息队列中的第二目标数据分区。The queue processing data is added to the second target data partition in the second data message queue according to the second target partition sequence number and the second target partition position. 6.根据权利要求1所述的方法,其特征在于,在所述基于Flink实时处理框架处理所述第一数据消息队列中的各所述待处理数据之前,还包括:6 . The method according to claim 1 , wherein before processing each of the data to be processed in the first data message queue based on the Flink real-time processing framework, the method further comprises: 6 . 读取所述第一数据消息队列中的各所述待处理数据;reading each of the data to be processed in the first data message queue; 将各所述待处理数据存储至分布式文件系统。Each of the data to be processed is stored in a distributed file system. 7.根据权利要求1所述的方法,其特征在于,在所述对所述第二数据消息队列中的各所述队列处理数据进行实时数据处理之后,还包括:7. The method according to claim 1, wherein after performing real-time data processing on each of the queue processing data in the second data message queue, the method further comprises: 从所述第二数据消息队列中读取各所述队列处理数据,并存储至静态数据库;所述静态数据库用于对各所述队列处理数据进行批量数据处理;Read each of the queue processing data from the second data message queue and store it in a static database; the static database is used to perform batch data processing on each of the queue processing data; 将实时数据处理结果和批量数据处理结果存储到结果数据库;或Store real-time data processing results and batch data processing results in a results database; or 从所述第二数据消息队列中读取各所述队列处理数据,并通过预设接口发送至目标应用程序。Each of the queue processing data is read from the second data message queue, and sent to the target application through a preset interface. 8.一种数据处理装置,其特征在于,包括:8. A data processing device, comprising: 第一数据消息队列生成模块,设置为获取待处理数据,将所述待处理数据添加至第一数据消息队列;a first data message queue generation module, configured to obtain the data to be processed, and add the to-be-processed data to the first data message queue; 队列处理数据生成模块,设置为基于Flink实时处理框架采用流式数据处理方式处理所述第一数据消息队列中的各所述待处理数据,得到队列处理数据;The queue processing data generation module is configured to process each of the to-be-processed data in the first data message queue based on the Flink real-time processing framework using a streaming data processing method to obtain queue processing data; 第二数据消息队列生成模块,设置为将所述队列处理数据添加至第二数据消息队列;The second data message queue generation module is configured to add the queue processing data to the second data message queue; 实时数据处理模块,设置为基于所述Flink实时处理框架对所述第二数据消息队列中的各所述队列处理数据进行实时数据处理。The real-time data processing module is configured to perform real-time data processing on each of the queue processing data in the second data message queue based on the Flink real-time processing framework. 9.根据权利要求8所述的装置,其特征在于,所述第一数据消息队列生成模块,具体设置为:9. The device according to claim 8, wherein the first data message queue generation module is specifically set as: 确定所述待处理数据对应的第一目标分区序号和第一目标分区位置;Determine the first target partition sequence number and the first target partition position corresponding to the data to be processed; 根据所述第一目标分区序号和所述第一目标分区位置将所述待处理数据添加至所述第一数据消息队列中的第一目标数据分区。The data to be processed is added to the first target data partition in the first data message queue according to the sequence number of the first target partition and the position of the first target partition. 10.一种计算机设备,包括存储器、处理器及存储在存储器上并可在处理器上运行的计算机程序,其特征在于,所述处理器执行所述程序时实现如权利要求1-7中任一所述的数据处理方法。10. A computer device, comprising a memory, a processor and a computer program stored in the memory and running on the processor, wherein the processor implements any of claims 1-7 when the processor executes the program. A described data processing method.
CN202011563032.4A 2020-12-25 2020-12-25 A data processing method, device and computer equipment Pending CN112667614A (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN202011563032.4A CN112667614A (en) 2020-12-25 2020-12-25 A data processing method, device and computer equipment

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN202011563032.4A CN112667614A (en) 2020-12-25 2020-12-25 A data processing method, device and computer equipment

Publications (1)

Publication Number Publication Date
CN112667614A true CN112667614A (en) 2021-04-16

Family

ID=75409100

Family Applications (1)

Application Number Title Priority Date Filing Date
CN202011563032.4A Pending CN112667614A (en) 2020-12-25 2020-12-25 A data processing method, device and computer equipment

Country Status (1)

Country Link
CN (1) CN112667614A (en)

Cited By (4)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN113553327A (en) * 2021-07-06 2021-10-26 杭州网易云音乐科技有限公司 Data processing method and device, medium and computing equipment
CN113900948A (en) * 2021-10-15 2022-01-07 北京同城必应科技有限公司 Solution for checking accuracy of data buried points under big data background
CN114428820A (en) * 2022-01-26 2022-05-03 普联技术有限公司 Method, system and data synchronization device for real-time synchronization of distributed data
CN114547055A (en) * 2022-02-16 2022-05-27 浙江大华技术股份有限公司 Data processing method and device

Citations (9)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US20170083378A1 (en) * 2015-09-18 2017-03-23 Salesforce.Com, Inc. Managing processing of long tail task sequences in a stream processing framework
CN106648904A (en) * 2017-01-09 2017-05-10 大连理工大学 Self-adaptive rate control method for stream data processing
CN107577717A (en) * 2017-08-09 2018-01-12 阿里巴巴集团控股有限公司 A kind of processing method, device and server for ensureing data consistency
US20190130004A1 (en) * 2017-10-27 2019-05-02 Streamsimple, Inc. Streaming Microservices for Stream Processing Applications
CN110007913A (en) * 2019-03-21 2019-07-12 佳都新太科技股份有限公司 Visual flow chart of data processing setting method, device, equipment and storage medium
CN110784419A (en) * 2019-10-22 2020-02-11 中国铁道科学研究院集团有限公司电子计算技术研究所 Data visualization method and system for railway electric affairs
CN110928906A (en) * 2019-11-08 2020-03-27 杭州安恒信息技术股份有限公司 Method for writing carbon data only once based on flink
CN111381977A (en) * 2018-12-29 2020-07-07 北大方正集团有限公司 Message processing method and device
CN112035534A (en) * 2020-09-18 2020-12-04 上海依图网络科技有限公司 Real-time big data processing method and device and electronic equipment

Patent Citations (9)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US20170083378A1 (en) * 2015-09-18 2017-03-23 Salesforce.Com, Inc. Managing processing of long tail task sequences in a stream processing framework
CN106648904A (en) * 2017-01-09 2017-05-10 大连理工大学 Self-adaptive rate control method for stream data processing
CN107577717A (en) * 2017-08-09 2018-01-12 阿里巴巴集团控股有限公司 A kind of processing method, device and server for ensureing data consistency
US20190130004A1 (en) * 2017-10-27 2019-05-02 Streamsimple, Inc. Streaming Microservices for Stream Processing Applications
CN111381977A (en) * 2018-12-29 2020-07-07 北大方正集团有限公司 Message processing method and device
CN110007913A (en) * 2019-03-21 2019-07-12 佳都新太科技股份有限公司 Visual flow chart of data processing setting method, device, equipment and storage medium
CN110784419A (en) * 2019-10-22 2020-02-11 中国铁道科学研究院集团有限公司电子计算技术研究所 Data visualization method and system for railway electric affairs
CN110928906A (en) * 2019-11-08 2020-03-27 杭州安恒信息技术股份有限公司 Method for writing carbon data only once based on flink
CN112035534A (en) * 2020-09-18 2020-12-04 上海依图网络科技有限公司 Real-time big data processing method and device and electronic equipment

Cited By (4)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN113553327A (en) * 2021-07-06 2021-10-26 杭州网易云音乐科技有限公司 Data processing method and device, medium and computing equipment
CN113900948A (en) * 2021-10-15 2022-01-07 北京同城必应科技有限公司 Solution for checking accuracy of data buried points under big data background
CN114428820A (en) * 2022-01-26 2022-05-03 普联技术有限公司 Method, system and data synchronization device for real-time synchronization of distributed data
CN114547055A (en) * 2022-02-16 2022-05-27 浙江大华技术股份有限公司 Data processing method and device

Similar Documents

Publication Publication Date Title
CN112667614A (en) A data processing method, device and computer equipment
CN108139958B (en) System and method for processing events of an event stream
CN108833131A (en) System, method, device and computer storage medium for distributed database cloud service
US10657099B1 (en) Systems and methods for transformation and analysis of logfile data
CN111367687A (en) Inter-process data communication method and device
CN110049118A (en) Information push method, device, equipment and storage medium
US11012368B2 (en) Data packet management in a memory constrained environment
CN110033242B (en) Working time determining method, device, equipment and medium
CN112711487A (en) Data source management and control method and device, management and control server and storage medium
CN109284108B (en) Unmanned vehicle data storage method and device, electronic equipment and storage medium
CN117992204A (en) Memory detection method, device, equipment and storage medium
CN111930385A (en) Data acquisition method, device, equipment and storage medium
CN114936026A (en) Method, system, storage medium and equipment for analyzing semi-structured data
US20140089341A1 (en) Coordinating data collection among system components
CN110827001A (en) Accounting event bookkeeping method, system, equipment and storage medium
CN114510398A (en) Abnormal monitoring method, device, equipment, system and medium
CN114428705A (en) A method, device, device and storage medium for monitoring network data
CN112364268A (en) Resource acquisition method and device, electronic equipment and storage medium
CN110427293B (en) Application processing methods, devices, equipment and media
CN114448976B (en) Method, device, equipment, medium and program product for assembling network message
CN117033058A (en) Analysis method, device, equipment and medium for software crash data
CN115408461A (en) Data information acquisition method, device, electronic equipment and medium
CN109213569B (en) A virtual machine-based auditing method, device, server and storage medium
CN114201564A (en) Map production task processing method and device, electronic equipment and storage medium
CN114201508A (en) Data processing method, data processing apparatus, electronic device, and storage medium

Legal Events

Date Code Title Description
PB01 Publication
PB01 Publication
SE01 Entry into force of request for substantive examination
SE01 Entry into force of request for substantive examination
RJ01 Rejection of invention patent application after publication
RJ01 Rejection of invention patent application after publication

Application publication date: 20210416