CN117111856A - Data lake data processing method, device, system, equipment and medium - Google Patents
Data lake data processing method, device, system, equipment and medium Download PDFInfo
- Publication number
- CN117111856A CN117111856A CN202311180449.6A CN202311180449A CN117111856A CN 117111856 A CN117111856 A CN 117111856A CN 202311180449 A CN202311180449 A CN 202311180449A CN 117111856 A CN117111856 A CN 117111856A
- Authority
- CN
- China
- Prior art keywords
- data
- file
- written
- node service
- target
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Pending
Links
- 238000003672 processing method Methods 0.000 title claims abstract description 35
- 238000005192 partition Methods 0.000 claims abstract description 135
- 238000012545 processing Methods 0.000 claims abstract description 52
- 238000000034 method Methods 0.000 claims abstract description 28
- 238000012217 deletion Methods 0.000 claims description 27
- 230000037430 deletion Effects 0.000 claims description 27
- 238000003860 storage Methods 0.000 claims description 21
- 238000013507 mapping Methods 0.000 claims description 17
- 238000004590 computer program Methods 0.000 claims description 15
- 230000006854 communication Effects 0.000 claims description 14
- 238000004891 communication Methods 0.000 claims description 13
- 230000006870 function Effects 0.000 claims description 11
- 230000001174 ascending effect Effects 0.000 claims description 7
- 238000003780 insertion Methods 0.000 claims description 6
- 230000037431 insertion Effects 0.000 claims description 6
- 238000010586 diagram Methods 0.000 description 29
- 238000005457 optimization Methods 0.000 description 19
- 230000000694 effects Effects 0.000 description 7
- 238000005516 engineering process Methods 0.000 description 5
- 238000006243 chemical reaction Methods 0.000 description 4
- 230000003993 interaction Effects 0.000 description 4
- 238000012986 modification Methods 0.000 description 4
- 230000004048 modification Effects 0.000 description 4
- 230000003287 optical effect Effects 0.000 description 4
- 230000008569 process Effects 0.000 description 4
- 238000004364 calculation method Methods 0.000 description 3
- 238000004140 cleaning Methods 0.000 description 2
- 239000004973 liquid crystal related substance Substances 0.000 description 2
- 238000007726 management method Methods 0.000 description 2
- 230000000644 propagated effect Effects 0.000 description 2
- 230000004044 response Effects 0.000 description 2
- 239000004065 semiconductor Substances 0.000 description 2
- 238000012795 verification Methods 0.000 description 2
- 230000009471 action Effects 0.000 description 1
- 238000004458 analytical method Methods 0.000 description 1
- 238000013459 approach Methods 0.000 description 1
- 230000009286 beneficial effect Effects 0.000 description 1
- 230000008901 benefit Effects 0.000 description 1
- 230000008859 change Effects 0.000 description 1
- 238000007405 data analysis Methods 0.000 description 1
- 238000013500 data storage Methods 0.000 description 1
- 238000009826 distribution Methods 0.000 description 1
- 230000010354 integration Effects 0.000 description 1
- 238000010801 machine learning Methods 0.000 description 1
- 238000004519 manufacturing process Methods 0.000 description 1
- 239000013307 optical fiber Substances 0.000 description 1
- 230000008520 organization Effects 0.000 description 1
- 238000011176 pooling Methods 0.000 description 1
- 230000004083 survival effect Effects 0.000 description 1
- 238000012800 visualization Methods 0.000 description 1
Landscapes
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
The application relates to the technical field of big data, and provides a data processing method, a device, a system, equipment and a medium for data lake, wherein the method is applied to slave node service and comprises the following steps: acquiring data to be written; recording data to be written in a pre-created pre-written log file, and recording the data to be written in a pre-created memory partition according to the data type of the data to be written; generating an initial form file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition, and transmitting the initial form file to a main node service to generate an initial snapshot; converting the equivalent deleted file in the initial snapshot into a position deleted file, and merging the files in the initial snapshot according to the preset table format file capacity to obtain a target table format file; and transmitting the target tabular file to a main node service to generate a target snapshot, and finishing writing the data to be written into the data lake. By the scheme of the application, the reading and writing processing efficiency of the data lake can be improved.
Description
Technical Field
The application relates to the technical field of big data, in particular to a data lake data processing method, a device, a system, equipment and a medium.
Background
A data lake is a method of storing data in a natural format in a system or repository that facilitates the configuration of data, typically object blocks or files, in various patterns and structures.
At present, the support of the integration of the data lake and the flow batch is not particularly perfect, the support of low-delay requirements in the aspect of real-time calculation is insufficient, the real-time calculation delay of the minute level is only supported, and the real-time calculation delay of the second level cannot be satisfied in the data lake.
The existing data lake can generate a large number of small files when snapshot submission is carried out, so that the reading and writing efficiency of the data lake is reduced. In addition, a large number of equivalent deleted files can be generated when the data lake reads and writes data, and the equivalent deleted files occupy a large amount of memory, so that the data reading and writing efficiency is further reduced.
Therefore, how to improve the data reading and writing efficiency of the data lake is a problem to be solved at present.
Disclosure of Invention
In view of the above-mentioned drawbacks of the prior art, an object of the present application is to provide a method, apparatus, system, device and medium for processing data in a data lake, which are used for solving the problem of how to improve the efficiency of reading and writing data in the data lake in the prior art.
To achieve the above and other related objects, the present application provides a data processing method of a data lake, the method being applied to a slave node service, the method comprising:
acquiring data to be written;
recording the data to be written in a pre-created pre-written log file, and recording the data to be written in a pre-created memory partition according to the data type of the data to be written, wherein the pre-written log file and the memory partition have a mapping relation;
generating an initial form file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition, and transmitting the initial form file to a main node service to generate an initial snapshot;
converting the equivalent deleted file in the initial snapshot into a position deleted file, and merging the files in the initial snapshot according to the file capacity of a preset table format to obtain a target table format file;
and transmitting the target tabular file to a main node service to generate a target snapshot, and finishing writing the data to be written into a data lake.
In an embodiment of the present application, before acquiring the data to be written, the method further includes:
Sending heartbeat information of a slave node service to the master node service so that the master node service generates an available slave node service list according to the heartbeat information;
and the master node service transmits the available slave node service list to a client so that the client can select the available slave node service according to the available slave node service list and issue a data processing instruction.
In one embodiment of the present application, selecting an available slave node service according to the list of available slave node services and issuing a data processing instruction includes:
selecting a plurality of available slave node services from the available slave node service list, carrying out load balancing processing according to preset Internet protocol information and the plurality of available slave node services, and connecting with the plurality of available slave node services in a communication way;
and issuing a data writing instruction and/or a data reading instruction to each available slave node service of the communication connection so as to realize data processing of the data lake.
In an embodiment of the present application, the slave heartbeat information includes a slave node service name, a slave node service identity, a slave node service internet protocol address, and slave node service port information;
The target table file comprises data file path information, a data file serial number, position deletion file path information, position deletion file serial number, a pre-written log file and a slave node service identity.
In an embodiment of the present application, recording the data to be written in a pre-created memory partition according to the data type of the data to be written includes:
when the data type is data to be deleted, recording the data to be written into a delete partition, and deleting records corresponding to the data to be written in an index partition and a List partition;
when the data type is update data and the record corresponding to the data to be written exists in the index partition, updating the record corresponding to the data to be written in the index partition into the List partition;
when the data type is updated data and the index partition does not have the record corresponding to the data to be written, recording the data to be written into a delete partition and a List partition, and recording the record position of the data to be written in the List partition into the index partition;
when the data type is insertion data, recording the data to be written into a List partition;
Wherein the delete partitions, index partitions, and List partitions are included in the pre-created memory partition.
In an embodiment of the present application, when the pre-written log file is full of data, closing the pre-written log file and adding the pre-written log file to a pre-built pre-written log file queue;
when the space of the memory partition is full of data, closing the pre-written log file and adding the pre-written log file into a pre-built pre-written log file queue;
when the slave node service consumes a preset tabular file generation thread, the pre-written log file queue is empty, closing the pre-written log file with data currently existing, and adding the pre-written log file into the pre-written log file queue.
In an embodiment of the present application, generating an initial table file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition includes:
when the data type is insertion data, transmitting the data to be written into a data file in a target data lake table format;
when the data type is data to be deleted, transmitting the data to be written into an equivalent deleted file in the form of the target data lake;
When the data type is updated data and is recorded in the delete partition, transmitting the data to be written into the data file, recording the position information of the data to be written into the data file, and transmitting the data to be written into the equivalent deleted file;
when the data type is updated data and is recorded in the index partition, transmitting the data to be written into the data file, recording the position information of the data to be written into the data file, and recording the position information into a position deletion file in the target data lake table format.
In an embodiment of the present application, converting the equivalent deleted file in the initial snapshot into the location deleted file includes:
according to the sequence numbers of the equivalent deleted files in the initial snapshot, the equivalent deleted files are arranged in an ascending order;
traversing the equivalent deleted files after ascending arrangement, and storing records in the equivalent deleted files into a memory cache;
obtaining a file to be converted, wherein the serial number of the file is smaller than that of the equivalent deleted file;
when the file to be converted exists in the memory cache, generating a new position deletion file according to the file to be converted, and adding the serial number of the equivalent deletion file, the path of the data file and the position of the file to be converted in the data file into the new position deletion file.
In an embodiment of the present application, merging the files in the initial snapshot according to a preset table file capacity includes:
acquiring a to-be-combined data file and a to-be-combined position deletion file, wherein the file capacity of the initial snapshot is smaller than the file capacity of a preset first table format;
and merging a plurality of data files to be merged and a plurality of deletion files at the positions to be merged according to a preset second table format file capacity, wherein the second table format file capacity is larger than the first table format file capacity.
In an embodiment of the present application, after transmitting the target table file to the host node service to generate the target snapshot, the method further includes:
when receiving a target snapshot submitting result transmitted by the main node service, acquiring a to-be-deleted pre-written log file name according to the target snapshot submitting result;
and deleting the pre-written log file in the pre-written log file queue according to the pre-written log file name to be deleted, and deleting the data corresponding to the pre-written log file name to be deleted in the memory partition.
In an embodiment of the present application, after completing writing the data to be written into the data lake, the method further includes:
Responding to a data reading instruction, and starting a read cache function of the target data lake table format to acquire target data in the target data lake table format;
and deleting the record corresponding to the target data in the delete partition, and updating the record corresponding to the target data in the List partition to finish the reading of the target data in the data lake.
In an embodiment of the present application, there is also provided a data processing apparatus for a data lake, the apparatus including:
the data acquisition module is used for acquiring data to be written;
the data recording module is used for recording the data to be written in a pre-written log file which is created in advance, and recording the data to be written in a pre-created memory partition according to the data type of the data to be written, wherein the pre-written log file and the memory partition have a mapping relation;
the initial snapshot generation module is used for generating an initial table format file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition, and transmitting the initial table format file to a main node service to generate an initial snapshot;
the optimizing module is used for converting the equivalent deleted files in the initial snapshot into position deleted files, and merging the files in the initial snapshot according to the file capacity of a preset table format to obtain target table format files;
And the target snapshot generating module is used for transmitting the target tabular file to a main node service to generate a target snapshot, and finishing writing the data to be written into a data lake.
In one embodiment of the present application, there is also provided a data processing system for a data lake, the system including:
the slave node service is used for acquiring data to be written; recording the data to be written in a pre-created pre-written log file, and recording the data to be written in a pre-created memory partition according to the data type of the data to be written, wherein the pre-written log file and the memory partition have a mapping relation; generating an initial form file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition, and transmitting the initial form file to a main node service to generate an initial snapshot; converting the equivalent deleted file in the initial snapshot into a position deleted file, and merging the files in the initial snapshot according to the file capacity of a preset table format to obtain a target table format file; transmitting the target tabular file to a main node service to generate a target snapshot, and finishing writing the data to be written into a data lake;
The main node service is used for receiving the initial table format file to generate an initial snapshot and receiving the target table format file to generate a target snapshot;
and the client is used for transmitting the data to be written to the slave node service.
In an embodiment of the present application, there is also provided an electronic device including:
one or more processors;
and a storage means for storing one or more programs which, when executed by the one or more processors, cause the electronic device to implement the data processing method as described above.
In an embodiment of the present application, there is also provided a computer-readable storage medium having stored thereon a computer program which, when executed by a processor of a computer, causes the computer to perform the data lake data processing method as described above.
The application has the beneficial effects that:
the method is applied to the slave node service in the master-slave node service mode, and the slave node service firstly acquires data to be written; recording the data to be written in a pre-created pre-written log file, and recording the data to be written in a pre-created memory partition according to the data type of the data to be written, wherein the pre-written log file and the memory partition have a mapping relation; generating an initial form file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition, and transmitting the initial form file to a main node service to generate an initial snapshot; converting the equivalent deleted file in the initial snapshot into a position deleted file, and merging the files in the initial snapshot according to the file capacity of a preset table format to obtain a target table format file; and finally, transmitting the target tabular file to a main node service to generate a target snapshot, and finishing writing the data to be written into a data lake. In the application, the master node service and the slave node service are mutually matched to realize the processing of data to the data lake form, the client can ignore the time of generating the snapshot in the data lake form when writing the data, write the data into the slave node service in real time, and write the data in the second level through the pre-written log file with the mapping relation and the memory partition, thereby improving the efficiency of writing the data in the data lake; the buffer memory of the read data can be opened through the slave node service, so that the visibility of the read data in the form of the data lake is improved, and the efficiency of the read data of the data lake is further improved; the equivalent deleted files in the initial snapshot are converted into position deleted files, the data files in the data lake form and the position deleted files can be traversed together and processed in a key grouping mode to achieve the effect of deleting data, global memory indexes of the data in the data lake form can be eliminated by eliminating the equivalent deleted files, memory consumption is reduced, and the efficiency of reading and writing processing of the data lake is improved; and merging the small files in the snapshot according to the preset table format capacity, so that the number of the position deleted files is eliminated, and the reading and writing processing efficiency of the data lake is further improved.
It is to be understood that both the foregoing general description and the following detailed description are exemplary and explanatory only and are not restrictive of the application as claimed.
Drawings
The accompanying drawings, which are incorporated in and constitute a part of this specification, illustrate embodiments consistent with the application and together with the description, serve to explain the principles of the application. It is evident that the drawings in the following description are only some embodiments of the present application and that other drawings may be obtained from these drawings without inventive effort for a person of ordinary skill in the art. In the drawings:
FIG. 1 is a schematic diagram of an implementation environment of a data processing method of a data lake according to an exemplary embodiment of the present application;
FIG. 2 is a flow chart of a data processing method of a data lake according to an exemplary embodiment of the present application;
FIG. 3 is a schematic diagram of a data processing system of a data lake in accordance with an exemplary embodiment of the present application;
FIG. 4 is a schematic diagram of a client shown in an exemplary embodiment of the application;
FIG. 5 is a schematic diagram illustrating the structure of a master node service according to an exemplary embodiment of the present application;
FIG. 6 is a schematic diagram illustrating the structure of a slave node service according to an exemplary embodiment of the present application;
FIG. 7 is a schematic diagram illustrating internal interactions of a host node service according to an exemplary embodiment of the present application;
FIG. 8 is a schematic diagram illustrating internal interactions of a slave node service according to an exemplary embodiment of the present application;
FIG. 9 is a diagram illustrating a pre-write log partition and memory partition structure according to an exemplary embodiment of the present application;
FIG. 10 is a diagram illustrating the recording of data to be written to a pre-write log file and memory partition according to an exemplary embodiment of the present application;
FIG. 11 is a tabular file generation diagram illustrating an exemplary embodiment of the present application;
FIG. 12 is a block diagram of a data lake data processing apparatus shown in an exemplary embodiment of the present application;
fig. 13 shows a schematic diagram of a computer system suitable for an electronic device according to an embodiment of the application.
Detailed Description
Further advantages and effects of the present application will become readily apparent to those skilled in the art from the disclosure herein, by referring to the accompanying drawings and the preferred embodiments. The application may be practiced or carried out in other embodiments that depart from the specific details, and the details of the present description may be modified or varied from the spirit and scope of the present application. It should be understood that the preferred embodiments are presented by way of illustration only and not by way of limitation.
It should be noted that the illustrations provided in the following embodiments merely illustrate the basic concept of the present invention by way of illustration, and only the components related to the present invention are shown in the drawings and are not drawn according to the number, shape and size of the components in actual implementation, and the form, number and proportion of the components in actual implementation may be arbitrarily changed, and the layout of the components may be more complicated.
In the following description, numerous details are set forth in order to provide a more thorough explanation of embodiments of the present invention, it will be apparent, however, to one skilled in the art that embodiments of the present invention may be practiced without these specific details, in other embodiments, well-known structures and devices are shown in block diagram form, rather than in detail, in order to avoid obscuring the embodiments of the present invention.
It should be noted that, the data lake is usually a single store of full data in the enterprise, where the full data includes a copy of the original data generated by the original system and converted data generated for various tasks, including report forms, visualization, advanced analysis, and machine learning. The data lake includes structured data (rows and columns), semi-structured data (e.g., CSV, log, XML, JSON), unstructured data (e.g., email, document, PDF, etc.), and binary data (e.g., image, audio, video) from a relational database, thereby forming a centralized data store that accommodates all forms of data.
In the existing data processing method of the data lake, as mentioned in the Chinese patent CN115576946A, index information is added in a metadata file of a data table, then a corresponding relation between the data file and an index is established in Iceberg (an open table format facing a massive data analysis scene), and then when processing operation of target data is executed in the Iceberg, index matching of a row group level is carried out on the target data according to the corresponding relation between the data file and the index, so as to determine the target data file; and then, carrying out corresponding data processing operation on the target line group data in the target data file to obtain a data processing result. This approach only speeds up data retrieval and does not have any processing or change to the Equality deletess (equivalent delete) file that would result from the data update, so the efficiency problem caused by the Equality deletess file remains unsolved.
The Equality deletess file is a global file for recording all records smaller than the number of the file, and if data is present in the file, the records in the data file smaller than the number of the file should be deleted, the file will increase with the increase of the data, so that a large amount of memory is occupied when the data is read (especially when the data is read in a mode of MOR (merging when reading)), and even OOM (memory overflow) occurs. When the data is read, the full Equality deletess file corresponding to the table snapshot is extremely large, the task runs slowly, and the OOM can cause failure of the data reading and writing task.
When a task writes data into a data lake table format in real time, the visibility of the data is mainly determined by the time of submitting the snapshot, the visibility of the data in the existing data lake cannot support second-level data reading and writing, and if the snapshot of the second level is submitted by a strong manufacture, a large number of small files can be generated, and the reading and writing efficiency of the data lake can be reduced.
Thus, the following problems exist in current data lake reading and writing: the global Equality deletess file causes extremely high memory overhead and affects the data reading and writing efficiency; the visibility of the data in the existing data lake table format does not support second-level data writing; too many small files reduce the read-write efficiency.
The following describes the technical terms in the present application:
iceberg: iceberg is an open data lake table format for a scenario of analyzing massive data, where the table format can be understood as a way of organizing metadata and data files, and is an intermediate layer based on a computing layer (e.g., flink, spark) and a storage layer (e.g., ORC, parqurt, avro). The table format belongs to an abstract concept of the database system on the implementation level, and generally defines some table metadata information and API (Application Programming Interface, application program interface) interfaces, such as which fields are included in the table, the organization form of files below the table, table index information, statistical information, and interfaces of an upper query engine for reading and writing files in the table, and further includes a metadata management layer (snapshot) and a data storage layer (data files).
Master node service, slave node service: in the software architecture, the Master-Slave mode includes a Master node service (Master) and a Slave node service (Slave), and the two services are respectively deployed on different servers. The master-slave mode generally comprises a master node service and a plurality of node slave services, the master node service and the slave node service form a cluster together, the master node service is used as a task dispatcher, the plurality of slave node services are distributed with computing tasks, and after all the slave node services complete the tasks, the master node service gathers task results. The master-slave mode enables hot data backup, providing multiple copies.
Quality-delete files: namely, the equivalent deleted file is the equivalent deleted file, and the following conditions should be satisfied when the equivalent deleted file is applied to the data file: the serial number of the data file is smaller than the deleted serial number; the partitions (specification and partition values) of the data file are equal to the partitions of the deleted file or the partition specification of the deleted file is not partitioned. The equivalent deleted file identifies deleted rows in the set of data files by one or more column values and other columns containing deleted rows may be selected.
position delete file: a location delete file containing the following data types: a path of a data file of a line of data to be deleted; a row of data to be deleted is at the position of the data file; each field and field value of a row of data to be deleted. The location deletion file identifies deleted rows by location in the file and one or more data files, and may be selected to contain deleted rows. If there is an entry in the location deletion file, the line's file and location are in the data file, then the data line will be deleted.
OOM, out of memory (memory overflow), means that the program has insufficient memory space to use when applying for memory.
snapshot: snapshots, each snapshot is a complete collection of data files in a table at a point in time. The snapshots are listed in metadata files, but the files in the snapshots are stored in separate manifest files.
WAL: write-ahead logging, pre-writing logs. A pre-written log is a series of techniques used in relational database systems to provide atomicity and durability. Before the system submits all the modifications, the system writes the modifications into Log (Log) files managed by the WAL, and then executes real operations through the logs recorded in the Log files, so that the system does not need to wait for the system to fall down when each transaction is submitted, and the Log can be used for recovering the database whenever and wherever the transaction is submitted.
Memory index: memory index is a critical part of the database read path. For a memory database, the memory index is the entry for all queries, while for a disk database, the memory index can greatly accelerate hot data queries in a scenario where the memory size is sufficient to cache a large portion of the hot data.
Thread pool: the thread pool is a thread management technology realized by utilizing a pooling technical idea, and is mainly used for multiplexing threads, conveniently managing the threads and tasks and decoupling the creation of the threads and the execution of the tasks, and the thread pool can be created to multiplex the created threads so as to reduce the resource consumption caused by frequently creating and destroying the threads.
data file: the data file is a file containing table rows, and is actual table content data.
Delete files: files are deleted and files encoded in rows in the table that are deleted by location or data value.
FIG. 1 is a schematic diagram of an implementation environment of a data processing method of a data lake according to an exemplary embodiment of the present application.
Referring to fig. 1, a terminal device 110 and a server 120 may be included in an implementation environment. The technical solution provided in the embodiment of the present application may be applied to a server 120, where the server 120 is configured to obtain data to be written transmitted by a terminal device 110, and implement writing and reading of the data according to a master node service and a slave node service in the server 120.
In one embodiment of the present application, the slave node service in the server 120 obtains the data to be written; recording the data to be written in a pre-created pre-written log file, and recording the data to be written in a pre-created memory partition according to the data type of the data to be written, wherein the pre-written log file and the memory partition have a mapping relation; generating an initial form file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition, and transmitting the initial form file to a main node service to generate an initial snapshot; converting the equivalent deleted file in the initial snapshot into a position deleted file, and merging the files in the initial snapshot according to the file capacity of a preset table format to obtain a target table format file; and transmitting the target tabular file to a main node service to generate a target snapshot, and finishing writing the data to be written into a data lake. In the embodiment, the master node service and the slave node service cooperate with each other to realize the processing of data to the data lake form, the client can ignore the time of generating the snapshot in the data lake form when writing the data, write the data into the slave node service in real time, and can achieve the second-level data writing through the pre-written log file and the memory partition with the mapping relation, thereby improving the data writing efficiency of the data lake; the buffer memory of the read data can be opened through the slave node service, so that the visibility of the read data in the form of the data lake is improved, and the efficiency of the read data of the data lake is further improved; the equivalent deleted files in the initial snapshot are converted into position deleted files, the data files in the data lake form and the position deleted files can be traversed together and processed in a key grouping mode to achieve the effect of deleting data, global memory indexes of the data in the data lake form can be eliminated by eliminating the equivalent deleted files, memory consumption is reduced, and the efficiency of reading and writing processing of the data lake is improved; and merging the small files in the snapshot according to the preset table format capacity, so that the number of the position deleted files is eliminated, and the reading and writing processing efficiency of the data lake is further improved.
It should be understood that the number of terminal devices 110 and servers 120 in fig. 1 is merely illustrative. There may be any number of terminal devices 110 and servers 120 as practical.
The terminal device 110 corresponds to a client, and may be any electronic device having a user input interface, including but not limited to a touch screen, a keyboard, physical keys, an audio pick-up device, and the like, including but not limited to a smart phone, a tablet, a notebook, a computer, a car-mounted computer, and the like.
The server 120 corresponds to a server, which may be a server providing various services, may be an independent physical server, may be a server cluster or a distributed system formed by a plurality of physical servers, and may also be a cloud server providing cloud services, cloud databases, cloud computing, cloud functions, cloud storage, network services, cloud communication, middleware services, domain name services, security services, CDNs (Content Delivery Network, content distribution networks), and basic cloud computing services such as big data and artificial intelligent platforms, which are not limited herein.
The terminal device 110 may communicate with the server 120 through a wireless network such as 3G (third generation mobile information technology), 4G (fourth generation mobile information technology), 5G (fifth generation mobile information technology), and the like, which is not limited herein.
The foregoing describes an exemplary implementation environment for applying the technical solutions of the present application, and the following describes the data processing method of the present application.
To solve the problem of how to improve the efficiency of data reading and writing in the prior art, embodiments of the present application respectively propose a data processing method, a data processing device, a data processing system, an electronic device, a computer readable storage medium and a computer program product, and these embodiments will be described in detail below.
Referring to fig. 2, fig. 2 is a flow chart illustrating a data processing method of a data lake according to an exemplary embodiment of the present application, and the method may be applied to the implementation environment shown in fig. 1. It should be understood that the method may be adapted to other exemplary implementation environments and be specifically executed by devices in other implementation environments, and the implementation environments to which the method is adapted are not limited by the present embodiment.
As shown in fig. 2, in an exemplary embodiment, the data processing method of the data lake is applied to the slave node service, and the data processing method of the data lake at least includes steps S210 to S250, which are described in detail as follows:
In step S210, data to be written is acquired.
Referring to FIG. 3 for illustration, FIG. 3 is a schematic diagram of a data lake data processing system including 1 and clients (clients), 1 Master, and multiple Slaves, according to an illustrative embodiment of the application. The Master receives the link of the Clinot and the heartbeat information of the Slave to obtain complete available Slave information; the Master transmits the Slave information to the Clinot so that the Clinot can communicate with the Slave according to the Slave information, and sends data to be written to the Slave, and initiates a request for reading and writing the data.
In step S220, the data to be written is recorded in the pre-created pre-written log file, and the data to be written is recorded in the pre-created memory partition according to the data type of the data to be written, wherein the pre-written log file and the memory partition have a mapping relationship.
Illustratively, the Slave establishes the WAL and the memory partition (which may also be a memory partition) according to a preset file size. After receiving the data to be written, the Slave records the data in the WAL, and then records the data in the memory partition according to the data type.
In step S230, according to the information recorded in the pre-written log file and the memory partition, an initial table file corresponding to the data to be written is generated, and the initial table file is transmitted to the host node service to generate an initial snapshot.
Illustratively, the Slave consumes a preset tabular file to generate a thread, periodically traverses the record of the WAL, writes the record into the Iceberg, and in the process, the WAL and the memory partition cooperate together to complete writing of data into the Iceberg. And in the process of consuming a preset table file generation thread by the Slave, writing the data in the WAL into the data-file and the position-file respectively according to the Delete record and the Update record of the memory partition, completely following the data file structure of the Iceberg, and completing the conversion from the WAL to the Iceberg. After the conversion is completed, the Iceberg file is transmitted to a Master for initial snapshot generation. And after the snapshot is submitted successfully, the Master sends a notice of successful snapshot submission to the Slave. And the Slave deletes the WAL corresponding to the snapshot and the data in the memory partition according to the notification.
In step S240, the equivalent deleted file in the initial snapshot is converted into a location deleted file, and the files in the initial snapshot are merged according to the preset table format file capacity to obtain the target table format file.
Illustratively, the thread of the Slave's asynchronous optimization thread pool will periodically perform the Iceberg's table optimization task, converting the existing quality delete file to Position Delete File; and merging a large number of small data-files with corresponding Position Delete File to generate a file with a proper size, and cleaning up out-of-date snapshots.
In step S250, the target table file is transferred to the host node service to generate a target snapshot, and writing the data to be written into the data lake is completed.
As can be seen from the foregoing steps S210 to S250, in the solution provided in this embodiment, the master node service and the slave node service cooperate with each other to implement processing from data to a data lake table format, when writing data, the client may ignore the time of generating a snapshot in the data lake table format, write data into the slave node service in real time, and write data in a second level through the pre-write log file and the memory partition with the mapping relationship, thereby improving the efficiency of writing data into the data lake; the buffer memory of the read data can be opened through the slave node service, so that the visibility of the read data in the form of the data lake is improved, and the efficiency of the read data of the data lake is further improved; the equivalent deleted files in the initial snapshot are converted into position deleted files, the data files in the data lake form and the position deleted files can be traversed together and processed in a key grouping mode to achieve the effect of deleting data, global memory indexes of the data in the data lake form can be eliminated by eliminating the equivalent deleted files, memory consumption is reduced, and the efficiency of reading and writing processing of the data lake is improved; and merging the small files in the snapshot according to the preset table format capacity, so that the number of the position deleted files is eliminated, and the reading and writing processing efficiency of the data lake is further improved.
In an embodiment of the present application, referring to fig. 4, fig. 4 is a schematic diagram of a client according to an exemplary embodiment of the present application. As shown in fig. 4, the client includes a Master connector for connecting with a Master to obtain a Slave list; the timing discovery device is used for carrying out timing and master communication to obtain information of the Slaves and updating a Slave list; and the Slave connector is used for connecting and communicating with the Slave according to the Slave list and sending a data writing instruction and a data reading instruction.
In an embodiment of the present application, referring to fig. 5 and 6, fig. 5 is a schematic diagram of a structure of a home node service according to an exemplary embodiment of the present application, and fig. 6 is a schematic diagram of internal interaction of the home node service according to an exemplary embodiment of the present application. As shown in fig. 5 and 6, the main node service includes: a Slave heartbeat receiver, a Slave list, an asynchronous processing thread pool, an Iceberg snapshot processor, a client connector, and a Slave broadcaster.
Wherein, as shown in fig. 6:
the Slave heartbeat receiver is used for receiving heartbeat information and an Iceberg file sent by the Slave.
The Slave list is used for recording relevant information of all the Slave, including basic information (heartbeat information) of the Slave, and a file list (file information of the WAL conversion Iceberg) of the Slave.
The threads of the asynchronous processing thread pool perform mainly two tasks: generating a table optimization task of Iceberg and sending the task to a broadcaster; a task of generating an Iceberg snapshot and performing this task.
When an Iceberg snapshot task starts to be executed, the Iceberg snapshot processor takes out Data File and Position Delete File information which are generated by the Slave from a Slave File list, generates a new snapshot ID, generates a new Iceberg snapshot, submits the generated snapshot to the Iceberg, and after the snapshot is submitted successfully, transmits a result to the Slave broadcaster, and the submitted result comprises: slave's ID, data File and Position Delete File path list.
The Slave broadcaster mainly broadcasts snapshot submission results and table optimization tasks.
The client connector mainly receives the link of the client, responds available Slave information to the client, and can communicate with the Slave after the client obtains the Slave information to initiate a request for reading and writing data.
In an embodiment of the present application, referring to fig. 7 and 8, fig. 7 is a schematic diagram of a structure of a slave node service according to an exemplary embodiment of the present application, and fig. 8 is a schematic diagram of internal interaction of a slave node service according to an exemplary embodiment of the present application. As shown in fig. 7, the slave node service includes: the system comprises a heartbeat report device, a pre-write log processor (WAL), a pre-write log queue, a memory cache partition processor (mem segment), an asynchronous consumption thread pool, an asynchronous optimization thread pool, an Iceberg file generator, an Iceberg reader and a cleaner.
Wherein, as shown in fig. 8:
the main tasks of the heartbeat reporter include: establishing connection with a master; sending the heartbeat information of the user to the Master at regular time; receiving the result of the Iceberg file generator and timely sending the result to a Master; and receiving the Iceberg optimization task in the response information, and adding the Iceberg optimization task to an optimization task queue.
The pre-write log processor is matched with the memory cache partition processor to record data to be written.
The threads of the asynchronous consuming thread pool read the files in the queue at regular time to convert the WAL to Iceberg files.
The thread timing of the asynchronous optimization thread pool reads the optimization task of the optimization task list, and executes the optimization task, mainly the following optimization tasks are performed: merging small files; clearing the expired snapshot; rewriting the list; clearing the isolated file; equality deletessFile is converted to Position Deletes File.
The cleaner mainly receives the snapshot submitting result of the Iceberg broadcasted by the Master and deletes the data in the WAL and the memory distinction according to the result.
The Iceberg reader receives the read data request of the client, reads the read data according to the read request, and combines the read result with the current mem segment.
In an embodiment of the present application, before acquiring the data to be written, the method further includes:
Sending heartbeat information of a slave node service to the master node service so that the master node service generates an available slave node service list according to the heartbeat information;
and the master node service transmits the available slave node service list to a client so that the client can select the available slave node service according to the available slave node service list and issue a data processing instruction.
In one embodiment of the present application, selecting an available slave node service according to the available slave node service list and issuing a data processing instruction includes:
selecting a plurality of available slave node services from the available slave node service list, carrying out load balancing processing according to preset Internet protocol information and the plurality of available slave node services, and connecting with the plurality of available slave node services in a communication way;
and issuing a data writing instruction and/or a data reading instruction to each available slave node service of the communication connection so as to realize data processing of the data lake.
In an embodiment of the present application, the slave heartbeat information includes a slave node service name, a slave node service identity, a slave node service internet protocol address, and slave node service port information;
The target table file comprises data file path information, a data file serial number, position deletion file path information, position deletion file serial number, a pre-written log file and a slave node service identity.
Illustratively, the heartbeat information includes the following information:
name: the name of Slave;
ID: slave ID, a unique identifier in this service;
IP: the IP address of the Slave is used for communication;
PORT: slave's port for communication.
The Iceberg file includes:
data File Path, the Path of the Data File;
data File Number: sequence Number of Data File;
position Delete File Path: a path of the location deletion file;
position Delete File number: deleting the serial number of the file at the position;
WAL File, pre-writing log File;
slave ID: an ID of the slave node service.
In an embodiment of the present application, recording the data to be written in a pre-created memory partition according to the data type of the data to be written includes:
when the data type is data to be deleted, recording the data to be written into a delete partition, and deleting records corresponding to the data to be written in an index partition and a List partition;
When the data type is update data and the record corresponding to the data to be written exists in the index partition, updating the record corresponding to the data to be written in the index partition into the List partition;
when the data type is updated data and the index partition does not have the record corresponding to the data to be written, recording the data to be written into a delete partition and a List partition, and recording the record position of the data to be written in the List partition into the index partition;
when the data type is insertion data, recording the data to be written into a List partition;
wherein the delete partitions, index partitions, and List partitions are included in the pre-created memory partition.
Referring to fig. 9, fig. 9 is a schematic diagram illustrating a pre-write log partition and a memory partition structure according to an exemplary embodiment of the present application. The pre-written log partition and the memory partition cooperate to execute the following tasks:
opening a pre-written log file, and pre-distributing a space with a certain size for the pre-written log file;
opening a read-write channel of the pre-written log file through a memory mapping technology;
opening a continuous memory space for buffer use, and dividing the space into a List (data List), indexes (Map [ key, index of data in List ]), delete (key to delete record);
When a write data request is received, the data is recorded in the WAL and flushed to disk.
Referring to fig. 10, fig. 10 is a schematic diagram illustrating recording data to be written to a pre-write log file and a memory partition according to an exemplary embodiment of the present application. As shown in fig. 10, recording data to be written to the pre-write log file and the memory partition includes the following steps:
if the type of the data is Delete, adding the data into Delete, removing records in the index, obtaining an index if the record to be removed exists in the index, and deleting the data corresponding to the index in the List;
if the data is update, first find if there is in index, if there is not, add the record in delete, add the record at the end of list at the same time, and add record in index [ data, index at the end of list ]; if an existing record is found in index, the data in the list is updated directly with the position in the list indicated by this record.
If the data is insert, this record is added at the end of the list.
In one embodiment of the present application, when the pre-written log file is full of data, closing the pre-written log file and adding the pre-written log file to a pre-built pre-written log file queue;
When the space of the memory partition is full of data, closing the pre-written log file and adding the pre-written log file into a pre-built pre-written log file queue;
when the slave node service consumes a preset tabular file generation thread, the pre-written log file queue is empty, closing the pre-written log file with data currently existing, and adding the pre-written log file into the pre-written log file queue.
In an embodiment of the present application, generating an initial table file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition includes:
when the data type is insertion data, transmitting the data to be written into a data file in a target data lake table format;
when the data type is data to be deleted, transmitting the data to be written into an equivalent deleted file in the form of the target data lake;
when the data type is updated data and is recorded in the delete partition, transmitting the data to be written into the data file, recording the position information of the data to be written into the data file, and transmitting the data to be written into the equivalent deleted file;
When the data type is updated data and is recorded in the index partition, transmitting the data to be written into the data file, recording the position information of the data to be written into the data file, and recording the position information into a position deletion file in the target data lake table format.
Referring to fig. 11 for an exemplary illustration, fig. 11 is a tabular file generation diagram illustrating an exemplary embodiment of the present application. As shown in fig. 11, the table file generation includes the steps of:
s1101, traversing the data of the WAL file.
S1102, if the type of the Data is insert, directly writing the Data into the Data File.
S1103, if the data type is Delete, it is recorded in Equality deletes File.
S1104, if the Data type is update, writing the Data into the Data File, and recording the position at the moment; also present in the delete in the cache, then recorded in Equality deletess File; if the cached index also exists, the Data is recorded in Position Deletes File [ Data, data File Path, position of Data File ].
S1105, the Equality deletes File file generated in step S1104 is converted into Position Deletes File.
And S1106, sending the result to a heartbeat report device.
In an embodiment of the present application, converting the equivalent deleted file in the initial snapshot into a location deleted file includes:
according to the sequence numbers of the equivalent deleted files in the initial snapshot, the equivalent deleted files are arranged in an ascending order;
traversing the equivalent deleted files after ascending arrangement, and storing records in the equivalent deleted files into a memory cache;
obtaining a file to be converted, wherein the serial number of the file is smaller than that of the equivalent deleted file;
when the file to be converted exists in the memory cache, generating a new position deletion file according to the file to be converted, and adding the serial number of the equivalent deletion file, the path of the data file and the position of the file to be converted in the data file into the new position deletion file.
Illustratively, the tabular file generation includes the steps of:
a. equality deletes File under the current snapshot is taken and arranged in ascending Sequence number order.
b. The sorted Equality deletes File is traversed, the record of the current Equality deletes File is read into the memory cache, if it is too large, the reading can be segmented, and each segment is traversed.
c. Traversing the record of the Data File with Sequence number less than Equality deletes File in step b, denoted as record, if record exists in the cache in step b, generating a Position Delete File, the Sequence number of which is the Sequence number of the current Data File (not created if File exists), adding a delete record to Position Delete File: the locations of the quality ids, the path of the current Data File, and the record in the Data File.
d. Traversing n Equality deletes File, generating an Iceberg snapshot.
e. Steps a through d are repeated to ensure that there is no Equality deletes File in the Iceberg table.
In an embodiment of the present application, merging files in the initial snapshot according to a preset table file capacity includes:
acquiring a to-be-combined data file and a to-be-combined position deletion file, wherein the file capacity of the initial snapshot is smaller than the file capacity of a preset first table format;
and merging a plurality of data files to be merged and a plurality of deletion files at the positions to be merged according to a preset second table format file capacity, wherein the second table format file capacity is larger than the first table format file capacity.
In an embodiment of the present application, after transmitting the target tabular file to the host node service to generate the target snapshot, the method further includes:
when receiving a target snapshot submitting result transmitted by the main node service, acquiring a to-be-deleted pre-written log file name according to the target snapshot submitting result;
and deleting the pre-written log file in the pre-written log file queue according to the pre-written log file name to be deleted, and deleting the data corresponding to the pre-written log file name to be deleted in the memory partition.
Illustratively, the cleaner deletes the log file of the designated WAL name in the result, and marks the memory cache corresponding to the upper WAL as a state that can be cleared out of date. The cleaner also cleans the cache data marked as out-of-date in a timing way, and the cleaning rule is as follows: to date, the survival time exceeds a specified time.
In an embodiment of the present application, after completing writing the data to be written into the data lake, the method further includes:
responding to a data reading instruction, and starting a read cache function of the target data lake table format to acquire target data in the target data lake table format;
and deleting the record corresponding to the target data in the delete partition, and updating the record corresponding to the target data in the List partition to finish the reading of the target data in the data lake.
The Iceberg reader receives a read data request from a client, reads the read data according to the read request, and combines the read result with the current mem segment, wherein the combining step is as follows: deleting if there is a record in delete; records in the List partition overlay the records of the query. It should be noted that the Iceberg reader opens the read cache, speeding up the efficiency of reading the data.
When data is required to be read, starting the reading and caching characteristics of the Iceberg of the Slave end, caching hot data, obtaining the data from the Iceberg, and then performing a reject operation with a delete record in the Slave memory partition and performing a merging operation with a data record to obtain final data. Because the memory partition and the reading cache of the Iceberg are both in the memory, the reading of the Iceberg data is started from the latest snapshot when the data is read, and therefore the second-level reading response operation can be achieved.
In an embodiment of the present application, when receiving a snapshot commit result, a broadcaster traverses the Slave list, and sends the result to the Slave one by one, and when sending, the broadcaster needs to filter according to the ID of the Slave, and only sends the commit result of the corresponding Slave.
The broadcaster receives the optimization task, starts the task distributor, traverses the Iceberg table, binds the optimization task and the table in a polling mode, and sends the optimization task and the table to the Slave. Each Slave can be guaranteed to be divided into a certain number of optimization tasks of the table. When the polling list is sent to the Slave, the Slave list is required to be randomly ordered, so that the optimization task of each list is guaranteed, the execution of each Slave is possible, and the pressure of a certain Slave is not always high.
In the above embodiment, through the cooperation of the Master and the Slave, the client may not care the time of generating the snapshot by Iceberg, and may write the data into the Slave in real time, that is, the write has succeeded, and the WAL in the Slave accelerates the write operation of the data through the memory mapping, so that the second-level write may be achieved. As the Iceberg at the Slave end starts the buffer of the read data, the visibility of the read data of the Iceberg is greatly improved, and the newly added data which is not durable to the Iceberg is in the buffer of the Slave, so that the data can be quickly taken out and combined with the data read from the Iceberg, and the final effect of quickly reading the data is achieved. As the realization of the table optimization task at the Slave end and the WAL conversion Iceberg task can be known, equality deletess File which possibly exists is converted into Position Deletes File, and the Data files and Position Deletes File can be traversed together and processed according to key grouping in combination with logic of reading Data by the Iceberg to achieve the effect of deleting the Data. The Equality deletess File is eliminated, the global memory index of the table data is naturally eliminated, and the memory is not greatly consumed by the index is naturally eliminated, so that the OOM is avoided. The timed file merging further eliminates Position Deletes File files and further accelerates the reading speed of the data.
In an embodiment of the present application, the actual verification is performed by the data processing method of the data lake in the embodiment of the present application, and the verification result indicates: the data lake data processing method can accurately control the memory overhead required by service, optimize the memory of a client by more than 90%, improve the reading efficiency of the Iceberg table by 400 times and improve the writing efficiency of the Iceberg table by 60 times.
FIG. 12 is a block diagram of a data processing apparatus of a data lake according to an exemplary embodiment of the present application. The device may be applied to the implementation environment shown in fig. 1. The apparatus may also be adapted to other exemplary implementation environments and may be specifically configured in other devices, and the present embodiment is not limited to the implementation environments to which the apparatus is adapted.
As shown in fig. 12, the exemplary data lake data processing apparatus includes:
a data acquisition module 1201, configured to acquire data to be written;
the data recording module 1202 is configured to record the data to be written in a pre-created pre-write log file, and record the data to be written in a pre-created memory partition according to a data type of the data to be written, where a mapping relationship exists between the pre-write log file and the memory partition;
The initial snapshot generating module 1203 is configured to generate an initial table file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition, and transmit the initial table file to a host node service to generate an initial snapshot;
the optimizing module 1204 is configured to convert the equivalent deleted file in the initial snapshot into a location deleted file, and combine the files in the initial snapshot according to a preset table format file capacity to obtain a target table format file;
and the target snapshot generating module 1205 is configured to transmit the target tabular file to a master node service to generate a target snapshot, so as to complete writing the data to be written into a data lake.
In the data lake data processing device, a master node service and a slave node service are mutually matched to realize the processing of data to a data lake form, when the data is written, a client can ignore the time of generating a snapshot in the data lake form, write the data into the slave node service in real time, and can achieve the second-level data writing through a pre-written log file and a memory partition with a mapping relation, thereby improving the data writing efficiency of the data lake; the buffer memory of the read data can be opened through the slave node service, so that the visibility of the read data in the form of the data lake is improved, and the efficiency of the read data of the data lake is further improved; the equivalent deleted files in the initial snapshot are converted into position deleted files, the data files in the data lake form and the position deleted files can be traversed together and processed in a key grouping mode to achieve the effect of deleting data, global memory indexes of the data in the data lake form can be eliminated by eliminating the equivalent deleted files, memory consumption is reduced, and the efficiency of reading and writing processing of the data lake is improved; and merging the small files in the snapshot according to the preset table format capacity, so that the number of the position deleted files is eliminated, and the reading and writing processing efficiency of the data lake is further improved.
It should be noted that, the data processing apparatus provided in the foregoing embodiment and the data processing method provided in the foregoing embodiment belong to the same concept, and the specific manner in which each module and unit perform the operation has been described in detail in the method embodiment, which is not described herein again. In practical application, the data processing device provided in the above embodiment may distribute the functions to different functional modules according to needs, that is, the internal structure of the device is divided into different functional modules to complete all or part of the functions described above, which is not limited herein.
In one embodiment of the present application, referring to FIG. 3, the data lake data processing system comprises:
the slave node service is used for acquiring data to be written; recording the data to be written in a pre-created pre-written log file, and recording the data to be written in a pre-created memory partition according to the data type of the data to be written, wherein the pre-written log file and the memory partition have a mapping relation; generating an initial form file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition, and transmitting the initial form file to a main node service to generate an initial snapshot; converting the equivalent deleted file in the initial snapshot into a position deleted file, and merging the files in the initial snapshot according to the file capacity of a preset table format to obtain a target table format file; transmitting the target tabular file to a main node service to generate a target snapshot, and finishing writing the data to be written into a data lake;
The main node service is used for receiving the initial table format file to generate an initial snapshot and receiving the target table format file to generate a target snapshot;
and the client is used for transmitting the data to be written to the slave node service.
It should be noted that, the data processing system provided by the foregoing embodiment and the data processing method provided by the foregoing embodiment belong to the same concept, and the working modes of the slave node service, the master node service and the client have been described in detail in the method embodiment, which is not repeated herein.
The embodiment of the application also provides electronic equipment, which comprises: one or more processors; and a storage device for storing one or more programs which, when executed by the one or more processors, cause the electronic device to implement the data lake data processing method provided in the above embodiments.
Fig. 13 shows a schematic diagram of a computer system suitable for an electronic device according to an embodiment of the application. It should be noted that, the computer system 1300 of the electronic device shown in fig. 13 is only an example, and should not impose any limitation on the functions and the application scope of the embodiments of the present application.
As shown in fig. 13, the computer system 1300 includes a central processing unit (Central Processing Unit, CPU) 1301 that can perform various appropriate actions and processes according to a program stored in a Read-Only Memory (ROM) 1302 or a program uploaded from a storage portion 1308 to a random access Memory (Random Access Memory, RAM) 1303, for example, performing the method described in the above embodiment. In the RAM 1303, various programs and data required for the system operation are also stored. The CPU 1301, ROM 1302, and RAM 1303 are connected to each other through a bus 1304. An Input/Output (I/O) interface 1305 is also connected to bus 1304.
The following components are connected to the I/O interface 1305: an input section 1306 including a keyboard, a mouse, and the like; an output portion 1307 including a Cathode Ray Tube (CRT), a liquid crystal display (Liquid Crystal Display, LCD), and the like, a speaker, and the like; a storage portion 1308 including a hard disk or the like; and a communication section 1309 including a network interface card such as a LAN (Local Area Network ) card, a modem, or the like. The communication section 1309 performs a communication process via a network such as the internet. The drive 1310 is also connected to the I/O interface 1305 as needed. Removable media 1311, such as magnetic disks, optical disks, magneto-optical disks, semiconductor memory, and the like, is mounted on drive 1310 as needed so that a computer program read therefrom is mounted into storage portion 1308 as needed.
In particular, according to embodiments of the present application, the processes described above with reference to flowcharts may be implemented as computer software programs. For example, embodiments of the present application include a computer program product comprising a computer program embodied on a computer readable medium, the computer program comprising a computer program for performing the method shown in the flowchart. In such embodiments, the computer program may be downloaded and installed from a network via the communication portion 1309 and/or installed from the removable medium 1311. When executed by a Central Processing Unit (CPU) 1301, performs various functions defined in the system of the present application.
It should be noted that, the computer readable medium shown in the embodiments of the present application may be a computer readable signal medium or a computer readable storage medium, or any combination of the two. The computer readable storage medium may be, for example, an electronic, magnetic, optical, electromagnetic, infrared, or semiconductor system, apparatus, or device, or any combination thereof. More specific examples of the computer-readable storage medium may include, but are not limited to: an electrical connection having one or more wires, a portable computer diskette, a hard disk, a Random Access Memory (RAM), a read-Only Memory (ROM), an erasable programmable read-Only Memory (Erasable Programmable Read Only Memory, EPROM), flash Memory, an optical fiber, a portable compact disc read-Only Memory (CD-ROM), an optical storage device, a magnetic storage device, or any suitable combination of the foregoing. In the present application, a computer-readable signal medium may comprise a data signal propagated in baseband or as part of a carrier wave, with a computer-readable computer program embodied therein. Such a propagated data signal may take any of a variety of forms, including, but not limited to, electro-magnetic, optical, or any suitable combination of the foregoing. A computer readable signal medium may also be any computer readable medium that is not a computer readable storage medium and that can communicate, propagate, or transport a program for use by or in connection with an instruction execution system, apparatus, or device. A computer program embodied on a computer readable medium may be transmitted using any appropriate medium, including but not limited to: wireless, wired, etc., or any suitable combination of the foregoing.
The flowcharts and block diagrams in the figures illustrate the architecture, functionality, and operation of possible implementations of systems, methods and computer program products according to various embodiments of the present application. Where each block in the flowchart or block diagrams may represent a module, segment, or portion of code, which comprises one or more executable instructions for implementing the specified logical function(s). It should also be noted that, in some alternative implementations, the functions noted in the block may occur out of the order noted in the figures. For example, two blocks shown in succession may, in fact, be executed substantially concurrently, or the blocks may sometimes be executed in the reverse order, depending upon the functionality involved. It will also be noted that each block of the block diagrams or flowchart illustration, and combinations of blocks in the block diagrams or flowchart illustration, can be implemented by special purpose hardware-based systems which perform the specified functions or acts, or combinations of special purpose hardware and computer instructions.
The units involved in the embodiments of the present application may be implemented by software, or may be implemented by hardware, and the described units may also be provided in a processor. Wherein the names of the units do not constitute a limitation of the units themselves in some cases.
Another aspect of the application also provides a computer readable storage medium having stored thereon a computer program which, when executed by a processor of a computer, causes the computer to perform a data processing method as described above. The computer-readable storage medium may be included in the electronic device described in the above embodiment or may exist alone without being incorporated in the electronic device.
Another aspect of the application also provides a computer program product or computer program comprising computer instructions stored in a computer readable storage medium. The processor of the computer device reads the computer instructions from the computer-readable storage medium, and the processor executes the computer instructions, so that the computer device performs the data processing method provided in the above-described respective embodiments.
The above embodiments are merely illustrative of the principles of the present application and its effectiveness, and are not intended to limit the application. Modifications and variations may be made to the above-described embodiments by those skilled in the art without departing from the spirit and scope of the application. It is therefore intended that all equivalent modifications and changes made by those skilled in the art without departing from the spirit and technical spirit of the present application shall be covered by the appended claims.
Claims (15)
1. A data processing method for a data lake, the method being applied to a slave node service, the method comprising:
acquiring data to be written;
recording the data to be written in a pre-created pre-written log file, and recording the data to be written in a pre-created memory partition according to the data type of the data to be written, wherein the pre-written log file and the memory partition have a mapping relation;
generating an initial form file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition, and transmitting the initial form file to a main node service to generate an initial snapshot;
converting the equivalent deleted file in the initial snapshot into a position deleted file, and merging the files in the initial snapshot according to the file capacity of a preset table format to obtain a target table format file;
and transmitting the target tabular file to a main node service to generate a target snapshot, and finishing writing the data to be written into a data lake.
2. The data processing method according to claim 1, further comprising, before acquiring the data to be written:
Sending heartbeat information of a slave node service to the master node service so that the master node service generates an available slave node service list according to the heartbeat information;
and the master node service transmits the available slave node service list to a client so that the client can select the available slave node service according to the available slave node service list and issue a data processing instruction.
3. The data processing method according to claim 2, wherein selecting available slave node services and issuing data processing instructions according to the list of available slave node services comprises:
selecting a plurality of available slave node services from the available slave node service list, carrying out load balancing processing according to preset Internet protocol information and the plurality of available slave node services, and connecting with the plurality of available slave node services in a communication way;
and issuing a data writing instruction and/or a data reading instruction to each available slave node service of the communication connection so as to realize data processing of the data lake.
4. The data processing method of claim 2, wherein:
the slave heartbeat information comprises a slave node service name, a slave node service identity, a slave node service Internet protocol address and slave node service port information;
The target table file comprises data file path information, a data file serial number, position deletion file path information, position deletion file serial number, a pre-written log file and a slave node service identity.
5. The data processing method according to claim 4, wherein recording the data to be written in a memory partition created in advance according to the data type of the data to be written, comprises:
when the data type is data to be deleted, recording the data to be written into a delete partition, and deleting records corresponding to the data to be written in an index partition and a List partition;
when the data type is update data and the record corresponding to the data to be written exists in the index partition, updating the record corresponding to the data to be written in the index partition into the List partition;
when the data type is updated data and the index partition does not have the record corresponding to the data to be written, recording the data to be written into a delete partition and a List partition, and recording the record position of the data to be written in the List partition into the index partition;
When the data type is insertion data, recording the data to be written into a List partition;
wherein the delete partitions, index partitions, and List partitions are included in the pre-created memory partition.
6. The data processing method of claim 5, wherein:
closing the pre-written log file and adding the pre-written log file to a pre-built pre-written log file queue when the pre-written log file is full of data;
when the space of the memory partition is full of data, closing the pre-written log file and adding the pre-written log file into a pre-built pre-written log file queue;
when the slave node service consumes a preset tabular file generation thread, the pre-written log file queue is empty, closing the pre-written log file with data currently existing, and adding the pre-written log file into the pre-written log file queue.
7. The method for processing data in a data lake according to claim 5, wherein generating an initial table file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition includes:
When the data type is insertion data, transmitting the data to be written into a data file in a target data lake table format;
when the data type is data to be deleted, transmitting the data to be written into an equivalent deleted file in the form of the target data lake;
when the data type is updated data and is recorded in the delete partition, transmitting the data to be written into the data file, recording the position information of the data to be written into the data file, and transmitting the data to be written into the equivalent deleted file;
when the data type is updated data and is recorded in the index partition, transmitting the data to be written into the data file, recording the position information of the data to be written into the data file, and recording the position information into a position deletion file in the target data lake table format.
8. The data processing method of claim 7, wherein converting the equivalent deleted file in the initial snapshot to a location deleted file comprises:
according to the sequence numbers of the equivalent deleted files in the initial snapshot, the equivalent deleted files are arranged in an ascending order;
Traversing the equivalent deleted files after ascending arrangement, and storing records in the equivalent deleted files into a memory cache;
obtaining a file to be converted, wherein the serial number of the file is smaller than that of the equivalent deleted file;
when the file to be converted exists in the memory cache, generating a new position deletion file according to the file to be converted, and adding the serial number of the equivalent deletion file, the path of the data file and the position of the file to be converted in the data file into the new position deletion file.
9. The data processing method of claim 1, wherein merging the files in the initial snapshot according to a predetermined tabular file capacity comprises:
acquiring a to-be-combined data file and a to-be-combined position deletion file, wherein the file capacity of the initial snapshot is smaller than the file capacity of a preset first table format;
and merging a plurality of data files to be merged and a plurality of deletion files at the positions to be merged according to a preset second table format file capacity, wherein the second table format file capacity is larger than the first table format file capacity.
10. The data processing method of claim 6, further comprising, after transmitting the target tabular file to a host node service to generate a target snapshot:
When receiving a target snapshot submitting result transmitted by the main node service, acquiring a to-be-deleted pre-written log file name according to the target snapshot submitting result;
and deleting the pre-written log file in the pre-written log file queue according to the pre-written log file name to be deleted, and deleting the data corresponding to the pre-written log file name to be deleted in the memory partition.
11. The data processing method of claim 7, further comprising, after completing writing the data to be written to the data lake:
responding to a data reading instruction, and starting a read cache function of the target data lake table format to acquire target data in the target data lake table format;
and deleting the record corresponding to the target data in the delete partition, and updating the record corresponding to the target data in the List partition to finish the reading of the target data in the data lake.
12. A data processing apparatus for a data lake, the apparatus comprising:
the data acquisition module is used for acquiring data to be written;
the data recording module is used for recording the data to be written in a pre-written log file which is created in advance, and recording the data to be written in a pre-created memory partition according to the data type of the data to be written, wherein the pre-written log file and the memory partition have a mapping relation;
The initial snapshot generation module is used for generating an initial table format file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition, and transmitting the initial table format file to a main node service to generate an initial snapshot;
the optimizing module is used for converting the equivalent deleted files in the initial snapshot into position deleted files, and merging the files in the initial snapshot according to the file capacity of a preset table format to obtain target table format files;
and the target snapshot generating module is used for transmitting the target tabular file to a main node service to generate a target snapshot, and finishing writing the data to be written into a data lake.
13. A data processing system for a data lake, the system comprising:
the slave node service is used for acquiring data to be written; recording the data to be written in a pre-created pre-written log file, and recording the data to be written in a pre-created memory partition according to the data type of the data to be written, wherein the pre-written log file and the memory partition have a mapping relation; generating an initial form file corresponding to the data to be written according to the pre-written log file and the information recorded in the memory partition, and transmitting the initial form file to a main node service to generate an initial snapshot; converting the equivalent deleted file in the initial snapshot into a position deleted file, and merging the files in the initial snapshot according to the file capacity of a preset table format to obtain a target table format file; transmitting the target tabular file to a main node service to generate a target snapshot, and finishing writing the data to be written into a data lake;
The main node service is used for receiving the initial table format file to generate an initial snapshot and receiving the target table format file to generate a target snapshot;
and the client is used for transmitting the data to be written to the slave node service.
14. An electronic device, the electronic device comprising:
one or more processors;
storage means for storing one or more programs which when executed by the one or more processors cause the electronic device to implement the data processing method of any of claims 1 to 11.
15. A computer-readable storage medium, having stored thereon a computer program which, when executed by a processor of a computer, causes the computer to perform the data processing method of any one of claims 1 to 11.
Priority Applications (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202311180449.6A CN117111856A (en) | 2023-09-13 | 2023-09-13 | Data lake data processing method, device, system, equipment and medium |
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202311180449.6A CN117111856A (en) | 2023-09-13 | 2023-09-13 | Data lake data processing method, device, system, equipment and medium |
Publications (1)
| Publication Number | Publication Date |
|---|---|
| CN117111856A true CN117111856A (en) | 2023-11-24 |
Family
ID=88803878
Family Applications (1)
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| CN202311180449.6A Pending CN117111856A (en) | 2023-09-13 | 2023-09-13 | Data lake data processing method, device, system, equipment and medium |
Country Status (1)
| Country | Link |
|---|---|
| CN (1) | CN117111856A (en) |
Cited By (4)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN118642665A (en) * | 2024-08-15 | 2024-09-13 | 苏州元脑智能科技有限公司 | Data lake lock-free writing method, device, equipment, medium and product |
| CN119473129A (en) * | 2024-09-26 | 2025-02-18 | 上海森亿医疗科技有限公司 | Medical big data file export method, system and terminal based on memory mapping file |
| CN119806781A (en) * | 2024-12-30 | 2025-04-11 | 北京领雁科技股份有限公司 | Data writing method and device for table file, electronic equipment and storage medium |
| CN119883132A (en) * | 2025-01-14 | 2025-04-25 | 重庆长安汽车股份有限公司 | Data lake-oriented data reading and writing method, device, equipment, medium and vehicle |
-
2023
- 2023-09-13 CN CN202311180449.6A patent/CN117111856A/en active Pending
Cited By (6)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN118642665A (en) * | 2024-08-15 | 2024-09-13 | 苏州元脑智能科技有限公司 | Data lake lock-free writing method, device, equipment, medium and product |
| CN118642665B (en) * | 2024-08-15 | 2024-11-01 | 苏州元脑智能科技有限公司 | Data lake lock-free writing method, device, equipment, medium and product |
| CN119473129A (en) * | 2024-09-26 | 2025-02-18 | 上海森亿医疗科技有限公司 | Medical big data file export method, system and terminal based on memory mapping file |
| CN119806781A (en) * | 2024-12-30 | 2025-04-11 | 北京领雁科技股份有限公司 | Data writing method and device for table file, electronic equipment and storage medium |
| CN119883132A (en) * | 2025-01-14 | 2025-04-25 | 重庆长安汽车股份有限公司 | Data lake-oriented data reading and writing method, device, equipment, medium and vehicle |
| CN119883132B (en) * | 2025-01-14 | 2025-11-14 | 重庆长安汽车股份有限公司 | Data reading and writing methods, devices, equipment, media, and vehicles for data lakes |
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| US11816126B2 (en) | Large scale unstructured database systems | |
| CN117111856A (en) | Data lake data processing method, device, system, equipment and medium | |
| CN107169083B (en) | Mass vehicle data storage and retrieval method and device for public security card port and electronic equipment | |
| CN102169507B (en) | Implementation method of distributed real-time search engine | |
| US12314251B2 (en) | Transaction processing method and apparatus, computing device, and storage medium | |
| US7783607B2 (en) | Decentralized record expiry | |
| US11818012B2 (en) | Online restore to different topologies with custom data distribution | |
| CN101576915A (en) | Distributed B+ tree index system and building method | |
| CN117321583A (en) | Storage engine for hybrid data processing | |
| CN115114294A (en) | Adaptive method, device and computer equipment for database storage mode | |
| CN117677943A (en) | Data consistency mechanism for mixed data processing | |
| CN104885054A (en) | System and method for performing a transaction in a massively parallel processing database | |
| US11341163B1 (en) | Multi-level replication filtering for a distributed database | |
| CN112162846A (en) | Transaction processing method, device and computer readable storage medium | |
| CN116467275A (en) | Shared remote storage method, device, system, electronic equipment and storage medium | |
| CN110505495A (en) | Multimedia resource takes out frame method, device, server and storage medium | |
| JP2023546818A (en) | Transaction processing method, device, electronic device, and computer program for database system | |
| CN118708609A (en) | Data processing method, device, medium and electronic equipment | |
| CN116975053A (en) | Data processing method, device, equipment, medium and program product | |
| CN118981506B (en) | Data storage method, reading method, device and electronic device | |
| CN120600196A (en) | Disease detection data processing method, system, device, equipment and medium | |
| CN111061719B (en) | Data collection method, device, equipment and storage medium | |
| CN116860742A (en) | Data writing method, electronic device and storage medium | |
| CN114205368B (en) | Data storage system, control method, control device, electronic equipment and storage medium | |
| CN117056427A (en) | Data processing method and device in hybrid transaction analysis system and electronic equipment |
Legal Events
| Date | Code | Title | Description |
|---|---|---|---|
| PB01 | Publication | ||
| PB01 | Publication | ||
| SE01 | Entry into force of request for substantive examination | ||
| SE01 | Entry into force of request for substantive examination |