[go: up one dir, main page]

WO2025161762A1 - Data processing method and apparatus, and computing cluster - Google Patents

Data processing method and apparatus, and computing cluster

Info

Publication number
WO2025161762A1
WO2025161762A1 PCT/CN2024/141155 CN2024141155W WO2025161762A1 WO 2025161762 A1 WO2025161762 A1 WO 2025161762A1 CN 2024141155 W CN2024141155 W CN 2024141155W WO 2025161762 A1 WO2025161762 A1 WO 2025161762A1
Authority
WO
WIPO (PCT)
Prior art keywords
data processing
tasks
processing unit
computing
operator
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Pending
Application number
PCT/CN2024/141155
Other languages
French (fr)
Chinese (zh)
Inventor
刘春�
包小明
陈强
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Huawei Technologies Co Ltd
Original Assignee
Huawei Technologies Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Huawei Technologies Co Ltd filed Critical Huawei Technologies Co Ltd
Publication of WO2025161762A1 publication Critical patent/WO2025161762A1/en
Pending legal-status Critical Current
Anticipated expiration legal-status Critical

Links

Classifications

    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F16/00Information retrieval; Database structures therefor; File system structures therefor
    • G06F16/20Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
    • G06F16/27Replication, distribution or synchronisation of data between databases or within a distributed database system; Distributed database system architectures therefor

Definitions

  • the present application relates to the field of computer technology, and in particular to a data processing method, device, and computing cluster.
  • Map-Reduce a commonly used big data programming model
  • Map-Reduce a commonly used big data programming model
  • Map-Reduce a commonly used big data programming model
  • the data processing process using this model consists of two phases: the Map and Reduce phases, each of which executes a number of operators.
  • Map phase complex tasks are broken down into several simple subtasks, reducing the scale of data processing, localizing data processing, and enabling parallel processing.
  • Reduce phase the results generated in the Map phase are aggregated.
  • the subtasks divided in the Map phase are executed by multiple compute nodes, and a single compute node typically performs multiple subtasks.
  • the Reduce phase further processes the data generated in the Map phase, it needs to obtain (or pull) the data processing results of each subtask. Because there are many subtasks in the Map phase, multiple data pulls are required during the Reduce phase, resulting in high data transmission latency during the data processing process.
  • the present application provides a data processing method, device, and computing cluster, which can reduce data transmission delay during data processing and improve data processing efficiency.
  • the present application provides a computing cluster, comprising: a plurality of first computing nodes, a plurality of first data processing units, each first computing node being connected to a first data processing unit, and a plurality of second computing nodes.
  • the plurality of first computing nodes are configured to execute N first tasks, the N first tasks being configured to process N data shards of to-be-processed data, and generating M first results after executing each first task, each of which is identified as A0 to AM-1; each first data processing unit being configured to execute M second tasks, each of the M second tasks being configured to process a first result having the same identifier in the first computing node to which the first data processing unit is connected, generating a second result after processing, and generating M second results after executing the M second tasks, the M second results being identified as B0 to BM-1; and a plurality of second computing nodes being configured to execute M third tasks, each of the third tasks being configured to process a second result having the same identifier in the plurality of first data processing units, generating a third result, and generating M third results after executing the M third tasks.
  • the first data processing unit connected to each first computing node can first execute the second task on the first result generated after the first computing node executes multiple first tasks to generate the second result, and then the second computing node executes the third task to process the second results generated by multiple computing nodes, it can avoid the data generated after the same computing node executes multiple first tasks being directly pulled by the second computing node multiple times and in large quantities, thereby solving the problem of large data transmission delay caused by multiple and massive data pulling, and can reduce the data transmission delay in the data processing process and improve the efficiency of data processing.
  • the first task is a map task
  • the second and third tasks are reduce tasks.
  • a third computing node among the multiple first computing nodes is further configured to send a first request to the connected first data processing unit to instruct the first data processing unit to execute M second tasks.
  • the first request carries the operator identifier of the operator to be called during the execution of the second task and the storage location information of the M first results.
  • the computing cluster provided in this application further includes: a plurality of second data processing units, each second computing node being connected to a second data processing unit.
  • a fourth computing node among the plurality of second computing nodes is further configured to send a second request to the connected second data processing unit to instruct the second data processing unit to execute M third tasks.
  • offloading the M third tasks executed by the second computing node to the second data processing unit for execution can reduce the load on the second computing node, thereby saving resources of the second computing node.
  • the second request carries the operator identifier of the operator to be called during the execution of the third task and the storage location information of the M second results.
  • the third data processing unit among the multiple first data processing units is also used to send a notification to the fifth computing node among the multiple second computing nodes to indicate the completion of execution of M second tasks, thereby triggering the second computing node to execute M third tasks.
  • the fourth data processing unit among the multiple first data processing units is also used to receive a third request to instruct the update of the operator to be called by the fourth data processing unit, and load the operator logic microcode of the operator to be updated to the fourth data processing unit. Due to the limited hardware physical resources of the data processing unit (such as DPU), it is often only possible to solidify a small number of high-frequency used hardware operators. In an embodiment of the present application, the logic circuit of the data processing unit supports hardware operator updates. Therefore, the operators in the data processing unit can be flexibly updated, expanding the range of operators that can be executed by the data processing unit.
  • the third request carries the operator logic microcode of the operator to be updated.
  • the operator to be updated is a custom operator. Based on actual data processing requirements, the user designs and generates the operator logic microcode, then updates the custom operator to the data processing unit, thereby persisting the operator logic microcode within the data processing unit.
  • supporting user-defined operators can meet the needs of diverse users and expand the application scope of computing clusters.
  • the present application provides a data processing method, which is applied to a computing cluster, and is applied to a computing cluster, wherein the computing cluster includes multiple first computing nodes, multiple first data processing units, and multiple second computing nodes, wherein each first computing node is connected to a first data processing unit; the method includes: multiple first computing nodes execute N first tasks, and generate M first results after executing each first task, and are respectively identified as A0 ⁇ AM-1; wherein the N first tasks are used to process N data shards of the data to be processed; and each first data processing unit executes M second tasks, and generates M second results after executing the M second tasks, and the M second results are identified as B0 ⁇ BM-1; wherein each second task in the M second tasks is used to process the first result with the same identifier in the first computing node connected to the first data processing unit, and generate the second result after processing; then the multiple second computing nodes execute M third tasks, and generate M third results after executing the M third tasks; each third task is used to process the second
  • the first task is a map task
  • the second and third tasks are reduce tasks.
  • the data processing method provided in the present application further includes: a third computing node among the multiple first computing nodes sends a first request to the connected first data processing unit to instruct the first data processing unit to execute M second tasks.
  • the first request carries the operator identifier of the operator to be called during the execution of the second task and the storage location information of the M first results.
  • the computing cluster also includes multiple second data processing units, and each second computing node is connected to a second data processing unit; based on this, the data processing method provided by the present application also includes: a fourth computing node among the multiple second computing nodes sends a second request to the connected second data processing unit to instruct the second data processing unit to execute M third tasks; wherein the second request carries the operator identifier of the operator to be called during the execution of the third task and the storage location information of the M second results.
  • the data processing method provided in the present application further includes: a third data processing unit among the multiple first data processing units sends a notification to a fifth computing node among the multiple second computing nodes to indicate that the execution of the M second tasks is completed.
  • the data processing method provided by the present application also includes: a fourth data processing unit among multiple first data processing units receives a third request to instruct the update of the operator to be called by the fourth data processing unit, and the third request carries the operator logic microcode of the operator to be updated; and the fourth data processing unit loads the operator logic microcode of the operator to be updated.
  • the operator to be updated is a custom operator.
  • the present application provides a computing node comprising a memory and at least one processor connected to the memory, the memory being used to store computer program code, the computer program code comprising computer instructions, which, when executed by at least one processor, causes the computing node to perform the actions performed by the first computing node in the first aspect or the second aspect; or, to perform the actions performed by the second computing node in the first aspect or the second aspect.
  • the present application provides a data processing unit comprising a memory and at least one processor connected to the memory, the memory being used to store computer program code, the computer program code comprising computer instructions, which, when executed by at least one processor, causes the data processing unit to perform the actions performed by the first data processing unit in the first aspect or the second aspect; or, to perform the actions performed by the second data processing unit in the first aspect or the second aspect.
  • the present application provides a computer-readable storage medium storing computer instructions.
  • the computer instructions When the computer instructions are run on a computer, the method of the second aspect and any one of its possible implementations is executed.
  • the present application provides a computer program product, which includes computer instructions.
  • the computer instructions When the computer instructions are run on a computer, the method of the second aspect and any one of its possible implementations is executed.
  • the present application provides a chip system, comprising: a processor for calling and running a computer program from a memory, so that a device equipped with the chip system executes the method of the second aspect and any one of its possible implementation methods.
  • FIG1 is a diagram showing a data processing framework based on a Map-Reduce model according to an embodiment of the present application
  • FIG2 is a second framework diagram of data processing based on the Map-Reduce model provided in an embodiment of the present application.
  • FIG3 is one of the schematic diagrams of the architecture of a computing cluster provided in an embodiment of the present application.
  • FIG4 is a second schematic diagram of the architecture of a computing cluster provided in an embodiment of the present application.
  • FIG5 is a third schematic diagram of the architecture of a computing cluster provided in an embodiment of the present application.
  • FIG6 is a fourth schematic diagram of the architecture of a computing cluster provided in an embodiment of the present application.
  • FIG7 is a flowchart of a data processing method according to an embodiment of the present application.
  • FIG8 is a second flow chart of a data processing method provided in an embodiment of the present application.
  • FIG9 is a schematic diagram of an operator updating method provided in an embodiment of the present application.
  • a and/or B in this article is merely a description of the association relationship between associated objects, indicating that three relationships may exist.
  • a and/or B can mean: A exists alone, A and B exist at the same time, and B exists alone.
  • first and second are used to distinguish different objects, rather than to describe a specific order of objects.
  • first task and second task are used to distinguish different tasks, rather than to describe a specific order of tasks;
  • first request and second request are used to distinguish different requests, rather than to describe a specific order of requests.
  • words such as “exemplary” or “for example” are used to indicate examples, illustrations, or descriptions. Any embodiment or design described as “exemplary” or “for example” in the embodiments of this application should not be interpreted as being preferred or advantageous over other embodiments or designs. Rather, the use of words such as “exemplary” or “for example” is intended to present the relevant concepts in a concrete manner.
  • multiple means two or more, and “multiple” can also be described as “at least two”.
  • the data processing method, apparatus, and computing cluster provided in the embodiments of the present application are primarily used in distributed big data processing scenarios, and the programming model for big data processing should meet at least one of the following conditions:
  • the programming model includes different processing stages. Different processing stages have data dependencies (for example, the subsequent processing stage depends on the processing results of the previous processing stage). Different processing stages include multiple tasks distributed on different computing nodes (that is, tasks executed by different computers), and multiple tasks are executed on one computing node.
  • the programming model applicable to the embodiments of the present application can be any programming model that meets the above conditions, including but not limited to the Map-Reduce model for data processing.
  • the relevant content of the data processing method is mainly described using the Map-Reduce model as an example.
  • the Map-Reduce model is a big data programming paradigm, which divides a data processing task (hereinafter referred to as a task) into multiple Map-Reduces.
  • a Map-Reduce includes a Map phase and a Reduce phase.
  • the Map phase and the Reduce phase in each Map-Reduce execute the operators corresponding to each phase.
  • the types of operators executed in the Map phase and the Reduce phase are related to actual business needs and are not limited in the embodiments of this application.
  • the Map-Reduce model is used for big data processing in a computing cluster.
  • the computing cluster includes computing nodes that execute tasks, and may also include management nodes for task scheduling and resource management.
  • the resource manager pools the computing resources (processing cores) and memory resources of all computing nodes in the computing cluster.
  • the task scheduler divides the input data (also called raw data) into multiple data shards according to a certain granularity (such as 64MB). Then, it requests sufficient resources from the resource pool for the data processing task according to a certain container specification (4 cores + 12GB) to execute the task.
  • N map tasks are generated for the data processing task.
  • the N map tasks can be homogeneous (that is, the N map tasks execute the same operator or function).
  • Each of the N map tasks performs operator calculations, and each map task generates M data shards.
  • M reduce tasks are generated for the data processing task.
  • reduce task 1 needs to pull data shard 1_1 from the M data shards generated by map task 1, and data shard 2_1 from the M data shards generated by map task 2, and so on, until data shard M_1 from the M data shards generated by map task N is pulled. It can be seen that for a reduce task, N data shards need to be pulled.
  • the data generated by each reduce task can be further divided into K data shards, for example, and then these K fine-grained data shards are used as input data to execute the next Map-Reduce.
  • the N map tasks generated in the Map phase are executed by different first computing nodes, and multiple map tasks are executed on one first computing node.
  • the M reduce tasks generated in the Reduce phase can also be executed by different second computing nodes, and multiple reduce tasks can also be executed on one second computing node.
  • the map task and the reduce task can be executed by the same computing node (i.e., the first computing node and the second computing node are the same computing node), or can be executed by different computing nodes (i.e., the first computing node and the second computing node are different computing nodes), which is not limited in the embodiments of the present application.
  • a framework diagram for data processing based on the Map-Reduce model in a computing cluster the data to be processed is divided into N data shards.
  • M reduce tasks need to be executed. These M reduce tasks can be executed by one or more compute nodes.
  • Figure 2 illustrates that the M reduce tasks are executed by M compute nodes respectively.
  • compute node 4 executes reduce task 1
  • compute node 5 executes reduce task 2
  • compute node 6 executes reduce task M.
  • reduce task 1 needs to pull the first data shard from all the data shards generated by map tasks.
  • each of the N1 map tasks on compute node 1 generates M data shards. Therefore, reduce task 1 needs to pull N1 data shards from compute node 1.
  • Each of the N2 map tasks on compute node 2 generates M data shards.
  • reduce task 1 needs to pull N2 data shards from compute node 2.
  • the N3 map tasks on compute node 3 generate M data shards.
  • the reduce task pulls the corresponding data shards from the data shards generated by multiple map tasks through shuffle (data pulling operation).
  • Shuffle is a data distribution mechanism across nodes and processes within a computing cluster, and can also be understood as data exchange between the Reduce stage and the Map stage.
  • the shuffle process requires cross-node data fetching over the network. Due to limited network bandwidth in the compute cluster, shuffling large numbers of data shards generated by map tasks consumes significant bandwidth. Furthermore, when there are a large number of map tasks, fetching these massive data shards repeatedly results in a high number of input/output operations per second (IOPS) on the network, which can easily cause network congestion and significant end-to-end data transmission latency (i.e., from the compute nodes executing the map tasks to the compute nodes executing the reduce tasks). Furthermore, when network congestion is caused by excessive network load, network performance deteriorates dramatically, impacting the normal operation of other jobs.
  • IOPS input/output operations per second
  • the types of operators executed by the reduce task may include but are not limited to Join (data splicing), Aggregate (aggregation), Filter (filtering), Sort (sorting), distinct (deduplication), subtract (set difference), sample (sampling) and other operators.
  • each type of operator can also include different types of operators.
  • the Join operator can include cross join, inner join, outer join, self join and other types of operators.
  • Some operators executed during the Reduce phase feature a significant reduction in the amount of data before and after the operation. This means that the input data for these operators is large, but the output data is very small. This means that for these operators, shuffling across the network actually pulls in a massive amount of raw data (i.e., input data), resulting in more significant data transmission delays.
  • the embodiment of the present application provides a data processing method, device and computing cluster
  • the computing cluster includes multiple first computing nodes, multiple first data processing units and multiple second computing nodes, wherein each first computing node is connected to a first data processing unit, the multiple first computing nodes can execute N first tasks (such as map tasks), the N first tasks are used to process N data shards of the data to be processed, and after executing each first task, M first results (M second results) are generated.
  • a result is M data fragments), and they are respectively identified as A0 ⁇ AM-1; each first data processing unit executes M second tasks, each of the M second tasks is used to process the first result with the same identification in the first computing node connected to the first data processing unit, and generate a second result after processing. After executing M second tasks, M second results are generated, and the M second results are identified as B0 ⁇ BM-1; then multiple second computing nodes execute M third tasks, each third task is used to process the second results with the same identification in multiple first data processing units, and generate a third result. After executing M third tasks, M third results are generated.
  • the method has the following steps: during the data processing process, the first data processing unit connected to each first computing node executes a second task on the first result generated after the first computing node executes multiple first tasks to generate a second result, and then the second computing node executes a third task to process the second results generated by the multiple computing nodes.
  • This can avoid the data generated after the same computing node executes multiple first tasks being directly pulled multiple times and in large quantities by the second computing node, thereby solving the problem of large data transmission delay caused by multiple and massive data pulling, that is, it can reduce the data transmission delay in the data processing process and improve the efficiency of data processing.
  • the data processing method, device and computing cluster provided in the embodiments of the present application can be applied to scenarios of big data distributed computing based on the Map-Reduce model.
  • the computing cluster and data processing method provided in the embodiments of the present application are described in detail below.
  • FIG3 is a schematic diagram of the architecture of a computing cluster provided in an embodiment of the present application.
  • the computing cluster includes multiple first computing nodes 301 and multiple second computing nodes 302 .
  • Each first computing node 301 is connected to a first data processing unit 303 .
  • the computing nodes (including the first computing node and the second computing node) in the computing cluster can be servers, such as central servers, edge servers, or local servers in a local data center.
  • the computing nodes can also be terminal devices such as desktop computers, laptop computers, or smart phones.
  • the data processing unit provided in the embodiments of the present application may be a DPU (data processing unit).
  • a DPU is a dedicated data processing unit (or data processor) with strong data processing capabilities.
  • the DPU connects to a host (such as a computing node) via a network card interface and performs data processing functions under the control of the computing node.
  • the data processing unit connected to each computing node may also be other data processing devices, and the embodiments of the present application do not limit this.
  • the data processing task is used to process the data to be processed, and the data to be processed is divided into N data slices for processing.
  • executing the data processing task includes executing N first tasks, M second tasks, and M third tasks, where M and N are both integers greater than or equal to 2.
  • the first task mentioned above is a map task (hereinafter referred to as map task)
  • the second and third tasks mentioned above are reduce tasks (hereinafter referred to as reduce task).
  • the N first tasks (map tasks) are executed by multiple first computing nodes 301
  • the M second tasks are executed by the first data processing unit 303
  • the M third tasks are executed by multiple second computing nodes 302.
  • the Map-Reduce model is used to process the N data shards into which the data to be processed is divided.
  • multiple first computing nodes 301 are configured to execute N first tasks (map tasks), each of which is configured to process N data shards of the to-be-processed data. Specifically, each first computing node 301 executes multiple first tasks to process multiple data shards of the N data shards. After executing each first task, M first results are generated, each of which is labeled A0 through AM-1.
  • Each of the multiple first data processing units 303 is configured to execute M second tasks (reduce tasks).
  • Each of the M second tasks is configured to process first results with the same identifier in the first computing node to which the first data processing unit is connected, generating a second result after processing.
  • M second results are generated, and the M second results are identified as B0 to BM-1.
  • FIG4 taking a second task executed by a first data processing unit as an example, when executing the second task, multiple first results with the identifier A0 are pulled from the multiple first results obtained after the connected first computing node executes multiple first tasks, and then the second task is executed to generate a second result B0.
  • Multiple second computing nodes 302 are configured to execute M third tasks (reduce tasks), each of which is configured to process second results with the same identifier from multiple first data processing units to generate a third result. After executing M third tasks, M third results are generated. Each second computing node 302 executes one or more third tasks to process second results generated by multiple first data processing units that are communicatively connected to the second computing node 302. For example, in FIG4 , the first second computing node 302 executes n1+1 reduce tasks, and the generated third results are identified as C0 to Cn1.
  • the input data of each of the multiple first tasks i.e., one data shard among the N data shards
  • the first results generated by each first task are different.
  • the input data of each of the multiple second tasks is different, and after the multiple first data processing units execute the multiple second tasks, the second results generated are different. Therefore, the identification of the first results (A0 ⁇ AM-1) and the second results (B0 ⁇ BM-1) in Figure 4 is only used to identify the data shards in each processing result, and does not limit the content of the data shards in the processing results.
  • the first result (i.e., the first data split) among M first results generated by different map tasks is labeled A0, but this does not mean that the contents of the multiple first results labeled A0 are identical.
  • the first result among M second results generated by different first data processing units after executing the first reduce task is labeled B0, but this does not mean that the contents of the multiple second results labeled B0 are identical.
  • the computing cluster provided in the embodiment of the present application may further include multiple second data processing units 304, with each second computing node 302 connected to a second data processing unit 304.
  • the above-mentioned process of executing M third tasks (reduce tasks) by the multiple second computing nodes 302 may be replaced by execution by the second data processing units 304 connected to the second computing nodes 302.
  • each second computing node may instruct the second data processing unit 304 connected thereto to process one or more third tasks to process the second results with the same identifier in the multiple first data processing units.
  • M third results are generated.
  • the M third tasks executed by the second computing node are offloaded to the second data processing unit for execution, which can reduce the load of the second computing node and thus save resources of the second computing node.
  • the computing cluster may also include other nodes, such as a management node, which is used to manage the computing nodes in the computing cluster.
  • the management node may have task scheduling functions and resource management functions.
  • a task scheduler and/or a resource manager may be deployed on the management node to schedule data processing tasks and allocate computing resources and memory resources to data processing tasks.
  • the computing cluster provided by the embodiment of the present application can avoid the data generated after the same computing node executes multiple first tasks directly by the second computing node and pulls massive data multiple times, thereby solving the problem of large data transmission delay caused by multiple and massive data pulling, reducing the data transmission delay in the data processing process and improving the efficiency of data processing.
  • the multiple first computing nodes 301, the multiple second computing nodes 302, the multiple first data processing units 303, and the multiple second data processing units 304 in the computing cluster interact with each other to complete the processing of the data to be processed.
  • a third computing node among the plurality of first computing nodes 301 is further configured to send a first request to the connected first data processing unit 303 to instruct the first data processing unit 303 to execute M second tasks, wherein the third computing node may be any computing node among the plurality of first computing nodes 301. It is understandable that after each first computing node 301 executes the plurality of first tasks (map tasks), it sends the first request to the connected first data processing unit 303, so that the first data processing unit 303 begins to execute the second task on the plurality of first results generated in the first computing node.
  • the first request includes the operator identifier of the operator to be called during the execution of the second task and the storage location information of the M first results.
  • the operator identifier indicates the operator to be called included in the second task
  • the storage location information indicates the storage location of the M first results, such as the physical address of the M first results in memory.
  • the first data processing unit 303 reads the M first results based on the storage location information and calls the operator indicated by the operator identifier to execute the second task.
  • a third data processing unit among the plurality of first data processing units 303 is further configured to send a notification to a fifth computing node among the plurality of second computing nodes to indicate the completion of execution of the M second tasks, wherein the third data processing unit is any first data processing unit among the plurality of first data processing units 303, and the fifth computing node is the second computing node corresponding to the third data processing unit (i.e., the second computing node for processing the results generated by the third data processing unit).
  • the third data processing unit After the third data processing unit completes executing the M second tasks, it sends a notification indicating the completion of execution of the M second tasks to the first computing node 301 connected thereto, and the first computing node 301 then forwards the notification to the second computing node 302.
  • the notification indicating the completion of the execution of the M second tasks carries the storage location information of the M second results.
  • the second computing node reads the M second results according to the storage location information to execute the third task.
  • a fourth computing node among the multiple second computing nodes 302 is further configured to send a second request to the connected second data processing unit 304 to instruct the second data processing unit 304 to execute M third tasks, wherein the fourth computing node can be any computing node among the multiple second computing nodes 301. It is understandable that after each first data processing unit 303 executes multiple second tasks (reduce tasks), it sends a first request to the first data processing unit 303 connected thereto, so that the first data processing unit 303 begins to execute the second task on the multiple first results generated in the first computing node.
  • the second request carries the operator identifier of the operator to be called during the execution of the third task and the storage location of the M second results.
  • the second data processing unit 304 reads the M second results according to the storage location information and calls the operator indicated by the operator identifier to execute the third task.
  • each first data processing unit 303 executes M second tasks, and the process of executing the second task is actually calling the to-be-called operator included in the second task.
  • a fourth data processing unit among the plurality of first data processing units 303 is further configured to receive a third request instructing the fourth data processing unit to update an operator to be called, and to load the operator logic microcode of the operator to be updated into the fourth data processing unit, thereby persisting the operator to be updated in the fourth data processing unit.
  • the fourth data processing unit may be any one of the plurality of first data processing units 303.
  • the third request carries the operator logic microcode of the operator to be updated and may further include an operator identifier, so that the operator logic microcode is stored according to the correspondence between the operator identifier and the operator logic microcode.
  • a data processing unit e.g., a DPU
  • the logic circuit of the data processing unit supports hardware operator updates. Therefore, operators in the data processing unit can be flexibly updated, expanding the range of operators that can be executed by the data processing unit.
  • the operator to be updated can be a user-defined operator.
  • the user can design and generate the operator logic microcode and update the customized operator to the data processing unit, thereby persisting the operator logic microcode within the data processing unit.
  • Supporting user-defined operators can meet the needs of different users and expand the application range of computing clusters.
  • the following embodiments describe the data processing process from the perspective of the internal module interactions between the various computing nodes and data processing units in a computing cluster. Taking the computing cluster shown in Figure 5 as an example, for ease of description, the data processing process of the computing cluster is explained using two first computing nodes and one second computing node in the computing cluster as examples.
  • the first data processing unit 303a connected to the first computing node 301a, the first computing node 301b, the first data processing unit 303b connected to the first computing node 301b, the second computing node 302 and the second data processing unit 304 connected to the second computing node 302.
  • the first computing node 301a, the first computing node 301b and the second computing node 302 all include an agent module, which is responsible for converting the received request from business semantics into a hardware computing request.
  • the first data processing unit 303a, the first data processing unit 303b and the second data processing unit 304 all include a request handler module and a data offloading engine (DOE), which is a hardware module for operator calculation.
  • DOE data offloading engine
  • each map task After the first computing node 301a executes multiple map tasks (first tasks), each map task generates M first results (identified as A0 ⁇ AM-1), and the agent module in the first computing node 301a sends a first request to the request processing module in the first data processing unit 303a to instruct the first data processing unit 303a to execute the reduce task (second task) on the multiple first results generated by the first computing node 301b.
  • the request processing module in the first data processing unit 303a forwards the first request to the DOE of the first data processing unit 303a, causing the DOE to execute the reduce task.
  • the first data processing unit 303a pulls the first result A0 generated by each map task and executes the reduce task (the reduce task execution process may require the invocation of operators such as Join and Aggregate), resulting in a second result B0.
  • the process of data processing performed by the first computing node 301b and the first data processing unit 303b can refer to the process of data processing performed by the first computing node 301a and the first data processing unit 303a, which will not be repeated here.
  • the first data processing unit 303a and the first data processing unit 303b have executed multiple reduce tasks, they notify the proxy module of the second computing node 302 through the proxy module, and then the proxy module of the second computing node 302 sends a second request to the request processing module of the second data processing unit 304 to instruct the second data processing unit 304 to execute M reduce tasks (the third task).
  • the request processing module of the second data processing unit 304 forwards the second request to the DOE of the second data processing unit 304, causing the DOE to execute the reduce task.
  • the second data processing unit 304 pulls the second result B0 generated by the first data processing unit 303a and the second result B0 generated by the first data processing unit 303b, and executes the reduce task, obtaining a third result C0 after the execution is completed.
  • the embodiments of the present application also provide a data processing method.
  • the data processing method provided in the embodiments of the present application is described in more detail below from the perspective of interaction between various modules within a first computing node, a first data processing unit (first DPU), a second computing node, and a second data processing unit (second DPU).
  • the data processing method provided in an embodiment of the present application includes the following steps.
  • S701 After executing a plurality of first tasks, the processing module of the first computing node sends a notification message to the agent module of the first computing node to indicate completion of execution of the plurality of first tasks.
  • M first results are generated (respectively labeled A0 to AM-1).
  • the first results generated are cached in the memory of the first computing node.
  • the agent module of the first computing node sends a first request to the request processing module of the first DPU.
  • the first request is used to instruct the first DPU to execute M second tasks.
  • the first request carries the operator identifier of the operator to be called during the execution of the second task and the storage location information of the M first results.
  • the description of the first request please refer to the description of the above embodiment and will not be repeated here.
  • the requested cache is used to store the result generated after the first DPU executes the task.
  • the request processing module of the first DPU sends a first request to the DOE of the first DPU to trigger the DOE of the first DPU to execute M second tasks.
  • the request processing module of the first DPU fills the operator identifier of the operator to be called and the storage location information of the M first results into the register of the DOE of the first DPU to trigger the DOE to execute the M second tasks.
  • S705 The DOE of the first DPU executes M second tasks and generates M second results (respectively labeled as B0 to BM-1).
  • the DOE of the first DPU After the DOE of the first DPU obtains the operator identifier of the operator to be called and the storage location information of the M first results, it obtains the M first results and executes the M second tasks to obtain the M second results.
  • the operators called during the execution of the second task may include operators with the following characteristics: operators that have a large input data volume and a significantly reduced output data volume after execution. Examples include the Join operator and Aggregate operator mentioned in the above embodiments.
  • the operators called during the second task may also include other operators, such as the Sort operator.
  • the first DPU sends a notification message to the agent module of the first computing node through the request processing module of the first DPU.
  • the notification message is used to notify the completion of execution of the M second tasks.
  • the notification message may carry the storage location information of the M second results and the data length of the M second results.
  • the proxy module of the first computing node forwards the notification message in S707 to the proxy module of the second computing node.
  • the second computing node sends a second request to the request processing module of the second DPU through its processing module and proxy module.
  • the second request is used to instruct the second data processing unit to execute M third tasks.
  • the second request carries the operator identifier of the operator to be called during the execution of the third task and the storage location information of the M second results.
  • the proxy module of the second computing node fills the storage location information of the M second results generated by the first DPU and the operator identifier of the operator to be called during the execution of the third task into the second request according to the interface format and sends it to the request processing module of the second DPU.
  • the request processing module of the second DPU obtains M second results generated by the first DPU from the cache.
  • the request processing module of the second DPU obtains M second results from the cache corresponding to each first DPU, that is, the request processing module of the second DPU obtains M second results generated by connecting multiple first computing nodes to multiple first DPUs.
  • the request processing module of the second DPU sends a second request to the DOE of the second DPU.
  • the second request also carries M second results obtained from the cache (generated by each first DPU executing the M second tasks).
  • S713 The DOE of the second DPU executes M third tasks and generates M third results.
  • the DOE of the second DPU sends a notification message indicating completion of execution of the M third tasks to the second computing node through the request processing module of the second DPU.
  • the second computing node obtains M third results from the cache.
  • the data processing method provided in an embodiment of the present application includes the following steps.
  • S801 After executing a plurality of first tasks, the processing module of the first computing node sends a notification message to the agent module of the second computing node to indicate completion of execution of the plurality of first tasks.
  • the proxy module of the second computing node sends a fourth request to the proxy module of the first computing node.
  • the fourth request is used to instruct the first DPU connected to the first computing node to execute M second tasks.
  • the second computing node can obtain a list of computing nodes where the data required to perform the second task (M first results) are located from the metadata server node (which can be implemented by the management node), and send a fourth request to the agent module of the first computing node.
  • the fourth request carries the index information of the data and the operator identifier of the operator to be called during the execution of the second task, wherein the index information of the data is used to indicate the characteristics of the data required for the second task, and the index information of the data may include the identification information of the application (such as APP id), shuffle id, and stage id, wherein APP id is used to indicate which application the input data comes from, shuffle id is used to indicate which shuffle operation of the application the input data corresponds to, and stage id is used to indicate the task (indicating the second task).
  • the index information of the data is used to indicate the characteristics of the data required for the second task
  • the index information of the data may include the identification information of the application (such as APP id), shuffle id, and stage id, wherein APP id is used to indicate which application the input data comes from, shuffle id is used to indicate which shuffle operation of the application the input data corresponds
  • the agent module of the first computing node sends the first request to the request processing module of the first DPU according to the fourth request.
  • the first computing node determines the storage locations (such as physical addresses) in the memory of M first results generated after the first computing node executes multiple first tasks through the index information in a mapping table (a mapping table of index information and addresses).
  • the request processing module of the first DPU sends a first request to the DOE of the first DPU to trigger the DOE of the first DPU to execute M second tasks.
  • S806 The DOE of the first DPU executes M second tasks and generates M second results.
  • the first DPU sends a notification message to the agent module of the first computing node through the request processing module of the first DPU.
  • the notification message is used to notify the completion of execution of the M second tasks.
  • the proxy module of the first computing node forwards the notification message in S808 to the proxy module of the second computing node.
  • the second computing node sends a second request to the request processing module of the second DPU through its processing module and proxy module.
  • the request processing module of the second DPU obtains M second results generated by the first DPU from the cache.
  • the request processing module of the second DPU sends a second request to the DOE of the second DPU.
  • the second request also carries M second results obtained from the cache (generated by each first DPU executing the M second tasks).
  • the DOE of the second DPU executes M third tasks and generates M third results.
  • the DOE of the second DPU writes the M third results into the cache.
  • the DOE of the second DPU sends a notification message indicating the completion of execution of the M third tasks to the second computing node through the request processing module of the second DPU.
  • the second computing node obtains M third results from the cache.
  • the data processing method described in S701-S716 and S801-S817 since the first data processing unit connected to each first computing node can first execute the second task on the first result generated after the first computing node executes multiple first tasks to generate the second result, and then the second computing node executes the third task to process the second results generated by multiple computing nodes, it can avoid the data generated after the same computing node executes multiple first tasks being directly pulled by the second computing node multiple times and massive data, thereby solving the problem of large data transmission delay caused by multiple and massive data pulling, and can reduce the data transmission delay in the data processing process and improve the efficiency of data processing.
  • the DPU may include a programmable logic circuit (such as an FPGA), which can support users to update custom operators to the DPU, that is, persist them to the DPU, so that operators can be updated flexibly during the DPU execution of tasks.
  • a programmable logic circuit such as an FPGA
  • operator update can be achieved by interacting with the computing node and the DPU connected thereto.
  • the operator update process includes the following steps.
  • the proxy module of the computing node receives a third request to instruct the DPU connected to the computing node to update an operator to be called.
  • the third request carries the operator identifier (such as operator ID) and the operator logic microcode of the operator to be updated.
  • the third request may be triggered by a user, and the user may send a request for online update of the hardware operator to the proxy module of the computing node through the configuration interface.
  • the proxy module of the computing node sends a third request to the request processing module of the DPU.
  • S903 The request processing module of the DPU forwards the third request to the DOE of the DPU.
  • the DOE of the DPU stores the operator logic microcode corresponding to the operator identifier.
  • the DPU's DOE After the DPU's DOE receives the third request, it first determines whether the operator indicated by the operator identifier in the third request is a new operator, that is, determines whether the operator logic microcode is a new operator logic microcode. If it is a new operator logic microcode, the DOE will persist the operator logic microcode in the DPU's flash memory.
  • the DPU's DOE loads the operator logic microcode from the flash into the DOE's hardware engine pool.
  • DOE when DOE receives the first request or the second request in the above embodiment, it loads the operator logic microcode of the operator from the flash into the hardware engine pool of DOE according to the operator identifier carried in the request, thereby executing the corresponding task.
  • the logic circuitry of the data processing unit supports hardware operator updates. Therefore, operators within the data processing unit can be flexibly updated, expanding the range of operators that can be executed by the data processing unit. Furthermore, these operators to be updated can be user-defined operators. Supporting user-defined operators can meet the needs of diverse users, broadening the application scope of the computing cluster.
  • An embodiment of the present application also provides a computing node, including a memory and at least one processor connected to the memory, the memory being used to store computer program code, the computer program code including computer instructions, which, when executed by at least one processor, causes the computing node to perform an action performed by a first computing node in a computing cluster; or, to perform an action performed by a second computing node in the computing cluster.
  • a computing node including a memory and at least one processor connected to the memory, the memory being used to store computer program code, the computer program code including computer instructions, which, when executed by at least one processor, causes the computing node to perform an action performed by a first computing node in a computing cluster; or, to perform an action performed by a second computing node in the computing cluster.
  • An embodiment of the present application provides a data processing unit, including a memory and at least one processor connected to the memory, wherein the memory is used to store computer program code, and the computer program code includes computer instructions.
  • the data processing unit When the computer instructions are executed by the at least one processor, the data processing unit performs the action performed by the first data processing unit in the computing cluster; or, performs the action performed by the second data processing unit in the computing cluster.
  • An embodiment of the present application further provides a computer-readable storage medium storing computer instructions.
  • the computer instructions When the computer instructions are run on a computer, the data processing method described in the above embodiment is executed.
  • An embodiment of the present application further provides a computer program product, which includes computer instructions.
  • the computer instructions When the computer instructions are run on a computer, the data processing method described in the above embodiment is executed.
  • An embodiment of the present application also provides a chip system, including: a processor, used to call and run a computer program from a memory, so that a device equipped with the chip system executes the data processing method described in the above embodiment.
  • all or part of the embodiments may be implemented by software, hardware, firmware or any combination thereof.
  • all or part of the embodiments may be implemented in the form of a computer program product.
  • the computer program product includes one or more computer instructions.
  • the computer may be a general-purpose computer, a special-purpose computer, a computer network or other programmable device.
  • the computer instructions may be stored in a computer-readable storage medium or transmitted from one computer-readable storage medium to another computer-readable storage medium.
  • the computer instructions may be transmitted from one website, computer, server or data center to another website, computer, server or data center via a wired (e.g., coaxial cable, optical fiber, digital subscriber line (DSL)) or wireless (e.g., infrared, wireless, microwave, etc.) method.
  • the computer-readable storage medium may be any available medium that a computer can access or a data storage device such as a server or data center that includes one or more available media integrated therein.
  • the available medium can be a magnetic medium (e.g., floppy disk, magnetic disk, tape), an optical medium (e.g., a digital video disc (DVD)), or a semiconductor medium (e.g., a solid state drive (SSD)), etc.
  • the disclosed systems, devices and methods can be implemented in other ways.
  • the device embodiments described above are merely schematic.
  • the division of the modules or units is only a logical function division.
  • Another point is that the mutual coupling or direct coupling or communication connection shown or discussed can be an indirect coupling or communication connection through some interfaces, devices or units, which can be electrical, mechanical or other forms.
  • the units described as separate components may or may not be physically separate, and the components shown as units may or may not be physical units, that is, they may be located in one place or distributed across multiple network units. Some or all of these units may be selected to achieve the purpose of this embodiment according to actual needs.
  • the functional units in the various embodiments of the present application may be integrated into a single processing unit, or each unit may exist physically separately, or two or more units may be integrated into a single unit.
  • the aforementioned integrated units may be implemented in the form of hardware or software functional units.
  • the integrated unit is implemented in the form of a software functional unit and sold or used as an independent product, it can be stored in a computer-readable storage medium.
  • the technical solution of the present application is essentially or the part that contributes to the prior art or all or part of the technical solution can be embodied in the form of a software product, and the computer software product is stored in a storage medium, including a number of instructions for enabling a computer device (which can be a personal computer, server, or network device, etc.) or a processor to execute all or part of the steps of the method described in each embodiment of the present application.
  • the aforementioned storage medium includes: various media that can store program codes, such as flash memory, mobile hard disk, read-only memory, random access memory, magnetic disk or optical disk.

Landscapes

  • Engineering & Computer Science (AREA)
  • Databases & Information Systems (AREA)
  • Theoretical Computer Science (AREA)
  • Computing Systems (AREA)
  • Data Mining & Analysis (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • Information Retrieval, Db Structures And Fs Structures Therefor (AREA)

Abstract

The present application relates to the technical field of computers, and provides a data processing method and apparatus, and a computing cluster, capable of reducing data transmission time delay and improving data processing efficiency. In the computing cluster, a plurality of first computing nodes are used for executing N first tasks for processing data to be processed, wherein M first results are generated upon execution of each first task and are respectively identified as A0 to AM-1; each data processing unit among a plurality of first data processing units is used for executing M second tasks, wherein each second task among the M second tasks is used for processing a first result having the same identifier to generate a second result, and M second results are generated upon execution of the M second tasks and are respectively identified as B0 to BM-1; a plurality of second computing nodes are used for executing M third tasks, wherein each third task is used for processing a plurality of second results having the same identifier to generate a third result, and M third results are generated upon execution of the M third tasks.

Description

一种数据处理方法、装置及计算集群Data processing method, device and computing cluster

本申请要求于2024年2月1日提交国家知识产权局、申请号为202410147754.3、申请名称为“一种数据处理方法、装置及计算集群”的中国专利申请的优先权,其全部内容通过引用结合在本申请中。This application claims priority to the Chinese patent application filed with the State Intellectual Property Office on February 1, 2024, with application number 202410147754.3 and application name “A Data Processing Method, Device and Computing Cluster”, the entire contents of which are incorporated by reference into this application.

技术领域Technical Field

本申请涉及计算机技术领域,尤其涉及一种数据处理方法、装置及计算集群。The present application relates to the field of computer technology, and in particular to a data processing method, device, and computing cluster.

背景技术Background Art

随着数据分析技术的快速发展和普及,“大数据”已经成为家喻户晓的时代热词,如何处理大数据备受关注。通常,可以基于编程模型进行大规模数据的并行处理。With the rapid development and widespread adoption of data analysis technology, "big data" has become a household buzzword, and how to process big data has attracted considerable attention. Typically, large-scale data can be processed in parallel using programming models.

例如,一种常用的大数据编程模型是Map-Reduce模型,使用该模型进行数据处理的过程包括Map阶段和Reduce阶段,Map阶段和Reduce阶段各自执行一些算子。在Map阶段,将复杂的任务分解为若干个简单的子任务,使得数据处理规模缩小、数据本地化处理以及并行处理;在Reduce阶段,对Map阶段产生的处理结果进行汇总处理。For example, a commonly used big data programming model is the Map-Reduce model. The data processing process using this model consists of two phases: the Map and Reduce phases, each of which executes a number of operators. In the Map phase, complex tasks are broken down into several simple subtasks, reducing the scale of data processing, localizing data processing, and enabling parallel processing. In the Reduce phase, the results generated in the Map phase are aggregated.

上述Map阶段划分的若干个子任务由多个计算节点执行,并且一个计算节点通常执行多个子任务,Reduce阶段基于Map阶段产生的数据做进一步处理时,需要获取(或拉取)每一个子任务的数据处理结果。由于Map阶段中子任务的数量很多,这样在Reduce阶段需要多次数据拉取,使得数据处理过程中数据传输时延较高。The subtasks divided in the Map phase are executed by multiple compute nodes, and a single compute node typically performs multiple subtasks. When the Reduce phase further processes the data generated in the Map phase, it needs to obtain (or pull) the data processing results of each subtask. Because there are many subtasks in the Map phase, multiple data pulls are required during the Reduce phase, resulting in high data transmission latency during the data processing process.

发明内容Summary of the Invention

本申请提供一种数据处理方法、装置及计算集群,能够降低数据处理过程中的数据传输时延,提高数据处理的效率。The present application provides a data processing method, device, and computing cluster, which can reduce data transmission delay during data processing and improve data processing efficiency.

本申请采用如下技术方案:This application adopts the following technical solutions:

第一方面,本申请提供一种计算集群,该计算集群包括:多个第一计算节点,多个第一数据处理单元,每个第一计算节点连接一个第一数据处理单元,以及多个第二计算节点。多个第一计算节点,用于执行N个第一任务,N个第一任务用于处理待处理数据的N个数据分片,执行每个第一任务后生成M个第一结果,且分别被标识为A0~AM-1;每个第一数据处理单元,用于执行M个第二任务,M个第二任务中的每个第二任务用于对第一数据处理单元所连接的第一计算节点中的具有相同标识的第一结果进行处理,处理后生成第二结果,执行M个第二任务后生成M个第二结果,M个第二结果被标识为B0~BM-1;多个第二计算节点,用于执行M个第三任务,每个第三任务用于对多个第一数据处理单元中的具有相同标识的第二结果进行处理,生成第三结果,执行M个第三任务后生成M个第三结果。In a first aspect, the present application provides a computing cluster, comprising: a plurality of first computing nodes, a plurality of first data processing units, each first computing node being connected to a first data processing unit, and a plurality of second computing nodes. The plurality of first computing nodes are configured to execute N first tasks, the N first tasks being configured to process N data shards of to-be-processed data, and generating M first results after executing each first task, each of which is identified as A0 to AM-1; each first data processing unit being configured to execute M second tasks, each of the M second tasks being configured to process a first result having the same identifier in the first computing node to which the first data processing unit is connected, generating a second result after processing, and generating M second results after executing the M second tasks, the M second results being identified as B0 to BM-1; and a plurality of second computing nodes being configured to execute M third tasks, each of the third tasks being configured to process a second result having the same identifier in the plurality of first data processing units, generating a third result, and generating M third results after executing the M third tasks.

本申请的计算集群中,由于可以由每一个第一计算节点连接的第一数据处理单元对第一计算节点执行的多个第一任务后生成的第一结果先执行第二任务,生成第二结果,再由第二计算节点执行第三任务对多个计算节点生成的第二结果进行处理,可以避免同一计算节点执行多个第一任务后产生的数据直接被第二计算节点进行多次以及海量数据的拉取,从而解决因多次及海量数据拉取导致的数据传输延迟较大的问题,能够降低数据处理过程中的数据传输时延,提高数据处理的效率。In the computing cluster of the present application, since the first data processing unit connected to each first computing node can first execute the second task on the first result generated after the first computing node executes multiple first tasks to generate the second result, and then the second computing node executes the third task to process the second results generated by multiple computing nodes, it can avoid the data generated after the same computing node executes multiple first tasks being directly pulled by the second computing node multiple times and in large quantities, thereby solving the problem of large data transmission delay caused by multiple and massive data pulling, and can reduce the data transmission delay in the data processing process and improve the efficiency of data processing.

一种可能的实现方式中,第一任务为map任务,第二任务及第三任务为reduce任务。In a possible implementation, the first task is a map task, and the second and third tasks are reduce tasks.

一种可能的实现方式中,多个第一计算节点中的第三计算节点,还用于发送第一请求至所连接的第一数据处理单元,以指示第一数据处理单元执行M个第二任务。In a possible implementation, a third computing node among the multiple first computing nodes is further configured to send a first request to the connected first data processing unit to instruct the first data processing unit to execute M second tasks.

一种可能的实现方式中,第一请求中携带第二任务执行过程中的待调用算子的算子标识以及M个第一结果的存储位置信息。In a possible implementation, the first request carries the operator identifier of the operator to be called during the execution of the second task and the storage location information of the M first results.

一种可能的实现方式中,本申请提供的计算集群还包括:多个第二数据处理单元,每个第二计算节点连接一个第二数据处理单元。多个第二计算节点中的第四计算节点,还用于发送第二请求至所连接的第二数据处理单元,以指示第二数据处理单元执行M个第三任务。本申请中,将第二计算节点执行的M个第三任务卸载至第二数据处理单元上执行,能够减轻第二计算节点的负载,从而节省第二计算节点的资源。In one possible implementation, the computing cluster provided in this application further includes: a plurality of second data processing units, each second computing node being connected to a second data processing unit. A fourth computing node among the plurality of second computing nodes is further configured to send a second request to the connected second data processing unit to instruct the second data processing unit to execute M third tasks. In this application, offloading the M third tasks executed by the second computing node to the second data processing unit for execution can reduce the load on the second computing node, thereby saving resources of the second computing node.

一种可能的实现方式中,第二请求中携带第三任务执行过程中的待调用算子的算子标识以及M个第二结果的存储位置信息。In a possible implementation, the second request carries the operator identifier of the operator to be called during the execution of the third task and the storage location information of the M second results.

一种可能的实现方式中,多个第一数据处理单元中的第三数据处理单元,还用于发送通知至多个第二计算节点中的第五计算节点,以指示M个第二任务执行结束,从而触发第二计算节点执行M个第三任务。In one possible implementation, the third data processing unit among the multiple first data processing units is also used to send a notification to the fifth computing node among the multiple second computing nodes to indicate the completion of execution of M second tasks, thereby triggering the second computing node to execute M third tasks.

一种可能的实现方式中,多个第一数据处理单元中的第四数据处理单元,还用于接收第三请求,以指示更新第四数据处理单元待调用的算子,并加载待更新算子的算子逻辑微码至第四数据处理单元。由于数据处理单元(例如DPU)的硬件物理资源受限,往往只能固化少量高频使用的硬件算子。本申请实施例中,数据处理单元的逻辑电路支持硬件算子更新,因此,可以灵活地对数据处理单元中的算子进行更新,扩充了数据处理单元可执行的算子的范围。In one possible implementation, the fourth data processing unit among the multiple first data processing units is also used to receive a third request to instruct the update of the operator to be called by the fourth data processing unit, and load the operator logic microcode of the operator to be updated to the fourth data processing unit. Due to the limited hardware physical resources of the data processing unit (such as DPU), it is often only possible to solidify a small number of high-frequency used hardware operators. In an embodiment of the present application, the logic circuit of the data processing unit supports hardware operator updates. Therefore, the operators in the data processing unit can be flexibly updated, expanding the range of operators that can be executed by the data processing unit.

一种可能的实现方式中,第三请求中携带待更新算子的算子逻辑微码。In a possible implementation, the third request carries the operator logic microcode of the operator to be updated.

一种可能的实现方式中,待更新算子是自定义的算子。用户根据实际数据处理需求,自行设计生成算子逻辑微码,将自定义的算子更新至数据处理单元,即将算子逻辑微码持久化至数据处理单元内。本申请中,支持用户自定义算子能够满足不同用户的需求,使得计算集群的应用范围更加广泛。In one possible implementation, the operator to be updated is a custom operator. Based on actual data processing requirements, the user designs and generates the operator logic microcode, then updates the custom operator to the data processing unit, thereby persisting the operator logic microcode within the data processing unit. In this application, supporting user-defined operators can meet the needs of diverse users and expand the application scope of computing clusters.

第二方面,本申请提供一种数据处理方法,该方法应用于计算集群,应用于计算集群,该计算集群包括多个第一计算节点、多个第一数据处理单元以及多个第二计算节点,其中,每个第一计算节点连接一个第一数据处理单元;该方法包括:多个第一计算节点执行N个第一任务,执行每个第一任务后生成M个第一结果,且分别被标识为A0~AM-1;其中,N个第一任务用于处理待处理数据的N个数据分片;并且每个第一数据处理单元执行M个第二任务,执行M个第二任务后生成M个第二结果,M个第二结果被标识为B0~BM-1;其中,M个第二任务中的每个第二任务用于对第一数据处理单元所连接的第一计算节点中的具有相同标识的第一结果进行处理,处理后生成第二结果;进而多个第二计算节点执行M个第三任务,执行M个第三任务后生成M个第三结果;每个第三任务用于对多个第一数据处理单元中的具有相同标识的第二结果进行处理,生成第三结果。In a second aspect, the present application provides a data processing method, which is applied to a computing cluster, and is applied to a computing cluster, wherein the computing cluster includes multiple first computing nodes, multiple first data processing units, and multiple second computing nodes, wherein each first computing node is connected to a first data processing unit; the method includes: multiple first computing nodes execute N first tasks, and generate M first results after executing each first task, and are respectively identified as A0~AM-1; wherein the N first tasks are used to process N data shards of the data to be processed; and each first data processing unit executes M second tasks, and generates M second results after executing the M second tasks, and the M second results are identified as B0~BM-1; wherein each second task in the M second tasks is used to process the first result with the same identifier in the first computing node connected to the first data processing unit, and generate the second result after processing; then the multiple second computing nodes execute M third tasks, and generate M third results after executing the M third tasks; each third task is used to process the second results with the same identifier in the multiple first data processing units to generate the third result.

一种可能的实现方式中,上述第一任务为map任务,上述第二任务及第三任务为reduce任务。In a possible implementation, the first task is a map task, and the second and third tasks are reduce tasks.

一种可能的实现方式中,本申请提供的数据处理方法还包括:多个第一计算节点中的第三计算节点发送第一请求至所连接的第一数据处理单元,以指示第一数据处理单元执行M个第二任务。In one possible implementation, the data processing method provided in the present application further includes: a third computing node among the multiple first computing nodes sends a first request to the connected first data processing unit to instruct the first data processing unit to execute M second tasks.

一种可能的实现方式中,第一请求中携带第二任务执行过程中的待调用算子的算子标识以及M个第一结果的存储位置信息。In a possible implementation, the first request carries the operator identifier of the operator to be called during the execution of the second task and the storage location information of the M first results.

一种可能的实现方式中,计算集群还包括多个第二数据处理单元,每个第二计算节点连接一个第二数据处理单元;基于此,本申请提供的数据处理方法还包括:多个第二计算节点中的第四计算节点发送第二请求至所连接的第二数据处理单元,以指示第二数据处理单元执行M个第三任务;其中,第二请求中携带第三任务执行过程中的待调用算子的算子标识以及M个第二结果的存储位置信息。In one possible implementation, the computing cluster also includes multiple second data processing units, and each second computing node is connected to a second data processing unit; based on this, the data processing method provided by the present application also includes: a fourth computing node among the multiple second computing nodes sends a second request to the connected second data processing unit to instruct the second data processing unit to execute M third tasks; wherein the second request carries the operator identifier of the operator to be called during the execution of the third task and the storage location information of the M second results.

一种可能的实现方式中,本申请提供的数据处理方法还包括:多个第一数据处理单元中的第三数据处理单元发送通知至多个第二计算节点中的第五计算节点,以指示M个第二任务执行结束。In one possible implementation, the data processing method provided in the present application further includes: a third data processing unit among the multiple first data processing units sends a notification to a fifth computing node among the multiple second computing nodes to indicate that the execution of the M second tasks is completed.

一种可能的实现方式中,本申请提供的数据处理方法还包括:多个第一数据处理单元中的第四数据处理单元接收第三请求,以指示更新第四数据处理单元待调用的算子,第三请求中携带待更新算子的算子逻辑微码;并且第四数据处理单元加载待更新算子的算子逻辑微码。In one possible implementation, the data processing method provided by the present application also includes: a fourth data processing unit among multiple first data processing units receives a third request to instruct the update of the operator to be called by the fourth data processing unit, and the third request carries the operator logic microcode of the operator to be updated; and the fourth data processing unit loads the operator logic microcode of the operator to be updated.

一种可能的实现方式中,上述待更新算子是自定义的算子。In a possible implementation, the operator to be updated is a custom operator.

第三方面,本申请提供一种计算节点,包括存储器和与存储器连接的至少一个处理器,该存储器用于存储计算机程序代码,计算机程序代码包括计算机指令,当计算机指令被至少一个处理器执行时,使得计算节点执行第一方面或第二方面的第一计算节点所执行的动作;或者,执行第一方面或第二方面中第二计算节点所执行的动作。In a third aspect, the present application provides a computing node comprising a memory and at least one processor connected to the memory, the memory being used to store computer program code, the computer program code comprising computer instructions, which, when executed by at least one processor, causes the computing node to perform the actions performed by the first computing node in the first aspect or the second aspect; or, to perform the actions performed by the second computing node in the first aspect or the second aspect.

第四方面,本申请提供一种数据处理单元,包括存储器和与存储器连接的至少一个处理器,该存储器用于存储计算机程序代码,计算机程序代码包括计算机指令,当计算机指令被至少一个处理器执行时,使得数据处理单元执行第一方面或第二方面的第一数据处理单元所执行的动作;或者,执行第一方面或第二方面中第二数据处理单元所执行的动作。In a fourth aspect, the present application provides a data processing unit comprising a memory and at least one processor connected to the memory, the memory being used to store computer program code, the computer program code comprising computer instructions, which, when executed by at least one processor, causes the data processing unit to perform the actions performed by the first data processing unit in the first aspect or the second aspect; or, to perform the actions performed by the second data processing unit in the first aspect or the second aspect.

第五方面,本申请提供一种计算机可读存储介质,存储有计算机指令,计算机指令在计算机上运行时,执行第二方面及其可能的实现方式中任意之一的方法。In a fifth aspect, the present application provides a computer-readable storage medium storing computer instructions. When the computer instructions are run on a computer, the method of the second aspect and any one of its possible implementations is executed.

第六方面,本申请提供一种计算机程序产品,该计算机程序产品包含计算机指令,当计算机指令在计算机上运行时,执行第二方面及其可能的实现方式中任意之一的方法。In a sixth aspect, the present application provides a computer program product, which includes computer instructions. When the computer instructions are run on a computer, the method of the second aspect and any one of its possible implementations is executed.

第七方面,本申请提供一种芯片系统,包括:处理器,用于从存储器中调用并运行计算机程序,使得安装有芯片系统的设备执行第二方面及其可能的实现方式中任意之一的方法。In the seventh aspect, the present application provides a chip system, comprising: a processor for calling and running a computer program from a memory, so that a device equipped with the chip system executes the method of the second aspect and any one of its possible implementation methods.

应当理解的是,本申请的第二方面至第七方面技术方案及对应的可能的实施方式所取得的有益效果可以参见上述对第一方面及其对应的可能的实施方式的技术效果,此处不再赘述。It should be understood that the beneficial effects achieved by the technical solutions of the second to seventh aspects of this application and the corresponding possible implementation methods can be referred to the technical effects of the first aspect and its corresponding possible implementation methods mentioned above, and will not be repeated here.

附图说明BRIEF DESCRIPTION OF THE DRAWINGS

图1为本申请实施例提供的一种基于Map-Reduce模型进行数据处理的框架图之一;FIG1 is a diagram showing a data processing framework based on a Map-Reduce model according to an embodiment of the present application;

图2为本申请实施例提供的一种基于Map-Reduce模型进行数据处理的框架图之二;FIG2 is a second framework diagram of data processing based on the Map-Reduce model provided in an embodiment of the present application;

图3为本申请实施例提供的一种计算集群的架构示意图之一;FIG3 is one of the schematic diagrams of the architecture of a computing cluster provided in an embodiment of the present application;

图4为本申请实施例提供的一种计算集群的架构示意图之二;FIG4 is a second schematic diagram of the architecture of a computing cluster provided in an embodiment of the present application;

图5为本申请实施例提供的一种计算集群的架构示意图之三;FIG5 is a third schematic diagram of the architecture of a computing cluster provided in an embodiment of the present application;

图6为本申请实施例提供的一种计算集群的架构示意图之四;FIG6 is a fourth schematic diagram of the architecture of a computing cluster provided in an embodiment of the present application;

图7为本申请实施例提供的一种数据处理方法的流程示意图之一;FIG7 is a flowchart of a data processing method according to an embodiment of the present application;

图8为本申请实施例提供的一种数据处理方法的流程示意图之二;FIG8 is a second flow chart of a data processing method provided in an embodiment of the present application;

图9为本申请实施例提供的一种算子更新方法示意图。FIG9 is a schematic diagram of an operator updating method provided in an embodiment of the present application.

具体实施方式DETAILED DESCRIPTION

本文中术语“和/或”,仅仅是一种描述关联对象的关联关系,表示可以存在三种关系,例如,A和/或B,可以表示:单独存在A,同时存在A和B,单独存在B这三种情况。The term "and/or" in this article is merely a description of the association relationship between associated objects, indicating that three relationships may exist. For example, A and/or B can mean: A exists alone, A and B exist at the same time, and B exists alone.

本申请实施例的说明书和权利要求书中的术语“第一”和“第二”等是用于区别不同的对象,而不是用于描述对象的特定顺序。例如,第一任务和第二任务等是用于区别不同的任务,而不是用于描述任务的特定顺序;第一请求和第二请求等是用于区别不同的请求,而不是用于描述请求的特定顺序。In the description and claims of the embodiments of this application, the terms "first" and "second" are used to distinguish different objects, rather than to describe a specific order of objects. For example, "first task" and "second task" are used to distinguish different tasks, rather than to describe a specific order of tasks; "first request" and "second request" are used to distinguish different requests, rather than to describe a specific order of requests.

在本申请实施例中,“示例性的”或者“例如”等词用于表示作例子、例证或说明。本申请实施例中被描述为“示例性的”或者“例如”的任何实施例或设计方案不应被解释为比其它实施例或设计方案更优选或更具优势。确切而言,使用“示例性的”或者“例如”等词旨在以具体方式呈现相关概念。In the embodiments of this application, words such as "exemplary" or "for example" are used to indicate examples, illustrations, or descriptions. Any embodiment or design described as "exemplary" or "for example" in the embodiments of this application should not be interpreted as being preferred or advantageous over other embodiments or designs. Rather, the use of words such as "exemplary" or "for example" is intended to present the relevant concepts in a concrete manner.

在本申请实施例的描述中,除非另有说明,“多个”的含义是指两个或两个以上,“多个”也可以描述为“至少两个”。In the description of the embodiments of the present application, unless otherwise specified, “multiple” means two or more, and “multiple” can also be described as “at least two”.

本申请实施例提供的数据处理方法、装置及计算集群主要应用在分布式大数据处理的场景中,并且大数据处理的编程模型应满足以下条件中的至少一个:The data processing method, apparatus, and computing cluster provided in the embodiments of the present application are primarily used in distributed big data processing scenarios, and the programming model for big data processing should meet at least one of the following conditions:

1)编程模型包括不同的处理阶段,不同的处理阶段存在数据依赖(如后一处理阶段依赖于前一处理阶段的处理结果),不同的处理阶段分别包括多个分布在不同的计算节点上的任务(即由不同计算机执行的任务),且一个计算节点上执行多个任务。1) The programming model includes different processing stages. Different processing stages have data dependencies (for example, the subsequent processing stage depends on the processing results of the previous processing stage). Different processing stages include multiple tasks distributed on different computing nodes (that is, tasks executed by different computers), and multiple tasks are executed on one computing node.

2)后一处理阶段需要获取前一处理阶段生成的处理结果中的海量数据分片。2) The subsequent processing stage needs to obtain massive data fragments from the processing results generated by the previous processing stage.

需要说明的是,本申请实施例适用的编程模型可以为任意满足上述条件的编程模型,例如包括但不限于用于数据处理Map-Reduce模型。在本申请实施例中,主要以Map-Reduce模型为例对数据处理方法的相关内容进行描述。It should be noted that the programming model applicable to the embodiments of the present application can be any programming model that meets the above conditions, including but not limited to the Map-Reduce model for data processing. In the embodiments of the present application, the relevant content of the data processing method is mainly described using the Map-Reduce model as an example.

首先对Map-Reduce模型的相关知识进行简要说明。First, a brief introduction to the Map-Reduce model is given.

Map-Reduce模型是一种大数据编程范式,即将一个数据处理任务(以下也可以简称为任务)划分为多次Map-Reduce,一次Map-Reduce包括Map阶段和Reduce阶段,每次Map-Reduce中Map阶段和Reduce阶段执行各阶段对应的算子,Map阶段和Reduce阶段执行的算子的类型与实际业务需求有关,本申请实施例不做限定。The Map-Reduce model is a big data programming paradigm, which divides a data processing task (hereinafter referred to as a task) into multiple Map-Reduces. A Map-Reduce includes a Map phase and a Reduce phase. The Map phase and the Reduce phase in each Map-Reduce execute the operators corresponding to each phase. The types of operators executed in the Map phase and the Reduce phase are related to actual business needs and are not limited in the embodiments of this application.

Map-Reduce模型用于在计算集群中进行大数据处理,计算集群包括执行任务的计算节点,也可以包括用于实现任务调度和资源管理的管理节点。以计算集群为分布式计算集群为例,资源管理器将计算集群内的所有计算节点的计算资源(处理核)和内存资源池化,对于一个待执行的数据处理任务,任务调度器将输入数据(也可以称为原始数据)按照某个粒度(例如64M等)进行切分,将输入数据切分为多个数据分片,之后,按照某个容器规格(4核+12G)从资源池中为该数据处理任务申请足够的资源以执行该任务。The Map-Reduce model is used for big data processing in a computing cluster. The computing cluster includes computing nodes that execute tasks, and may also include management nodes for task scheduling and resource management. Taking a distributed computing cluster as an example, the resource manager pools the computing resources (processing cores) and memory resources of all computing nodes in the computing cluster. For a data processing task to be executed, the task scheduler divides the input data (also called raw data) into multiple data shards according to a certain granularity (such as 64MB). Then, it requests sufficient resources from the resource pool for the data processing task according to a certain container specification (4 cores + 12GB) to execute the task.

具体的,参考图1所示的基于Map-Reduce模型进行数据处理的框架图,在Map阶段,根据上述输入数据划分后的数据分片的数量(例如N个数据分片,N为大于1的整数),生成数据处理任务的N个map task(也可以称为map任务),N个map task可以是同质化的(即N个map task执行的是相同的算子或函数)。N个map task中的每一个map task执行算子计算,每一个map task生成M个数据分片。在Reduce阶段,生成该数据处理任务的M个reduce task(也可以称为reduce任务)。其中,对于每个reduce task,需要拉取所有map task生成的M个数据分片中与reduce task对应的某个下标的数据分片,之后再执行Reduce阶段的算子。示例性的,如图1所示,reduce task1需要拉取map task 1生成的M个数据分片中的数据分片1_1,map task 2生成的M个数据分片中的数据分片2_1,依次拉取,直至拉取map task N生成的M个数据分片中的数据分片M_1,可见,对于一个reduce task需要拉取N次数据分片。Specifically, referring to the framework diagram for data processing based on the Map-Reduce model shown in Figure 1, in the Map phase, based on the number of data shards after the input data is divided (for example, N data shards, where N is an integer greater than 1), N map tasks (also called map tasks) are generated for the data processing task. The N map tasks can be homogeneous (that is, the N map tasks execute the same operator or function). Each of the N map tasks performs operator calculations, and each map task generates M data shards. In the Reduce phase, M reduce tasks (also called reduce tasks) are generated for the data processing task. Among them, for each reduce task, it is necessary to pull the data shard with a certain index corresponding to the reduce task from the M data shards generated by all map tasks, and then execute the operator of the Reduce phase. For example, as shown in Figure 1, reduce task 1 needs to pull data shard 1_1 from the M data shards generated by map task 1, and data shard 2_1 from the M data shards generated by map task 2, and so on, until data shard M_1 from the M data shards generated by map task N is pulled. It can be seen that for a reduce task, N data shards need to be pulled.

可选地,还可以继续对每一个reduce task生成的数据进行更细粒度的划分,例如分为K个数据分片,进而将这K个细粒度的数据分片作为输入数据,执行下一次的Map-Reduce。Optionally, the data generated by each reduce task can be further divided into K data shards, for example, and then these K fine-grained data shards are used as input data to execute the next Map-Reduce.

应理解,通常,上述在Map阶段生成的N个map task由不同的第一计算节点执行,一个第一计算节点上执行多个map task,上述在Reduce阶段生成的M个reduce task也可以由不同的第二计算节点执行,一个第二计算节点上也可以执行多个reduce task。It should be understood that, generally, the N map tasks generated in the Map phase are executed by different first computing nodes, and multiple map tasks are executed on one first computing node. The M reduce tasks generated in the Reduce phase can also be executed by different second computing nodes, and multiple reduce tasks can also be executed on one second computing node.

可选地,map task和reduce task可以由同一计算节点(即第一计算节点和第二计算节点是同一计算节点)执行,也可以由不同的计算节点执行(即第一计算节点和第二计算节点是不同的计算节点),本申请实施例不做限定。Optionally, the map task and the reduce task can be executed by the same computing node (i.e., the first computing node and the second computing node are the same computing node), or can be executed by different computing nodes (i.e., the first computing node and the second computing node are different computing nodes), which is not limited in the embodiments of the present application.

参考图2所示的计算集群中基于Map-Reduce模型进行数据处理的框架图,对于待处理数据,将待处理数据划分为N个数据分片,则Map阶段需要执行N个map task,N个map task由计算集群中的多个计算节点执行。例如,如图2所示,由3个计算节点处理N个数据分片,其中,计算节点1执行N1个map task,计算节点2执行N2个map task,计算节点3执行N3个map task,N1+N2+N3=N,每一个map task执行结束后生成M个数据分片。Referring to Figure 2, a framework diagram for data processing based on the Map-Reduce model in a computing cluster, the data to be processed is divided into N data shards. The Map phase requires executing N map tasks, each of which is executed by multiple compute nodes in the computing cluster. For example, as shown in Figure 2, three compute nodes process N data shards. Compute node 1 executes N1 map tasks, compute node 2 executes N2 map tasks, and compute node 3 executes N3 map tasks. N1 + N2 + N3 = N. Each map task generates M data shards.

继续参考图2,在Reduce阶段需执行M个reduce task,M个reduce task可以由一个或多个计算节点执行,图2中示意的是M个reduce task分别由M个计算节点执行,例如,计算节点4执行reduce task 1,计算节点5执行reduce task 2,计算节点6执行reduce task M。如图2所示,reduce task 1需要拉取所有map task生成数据分片中的第1个数据分片,具体的,计算节点1上的N1个map task中的每个map task均生成M个数据分片,则reduce task 1需从计算节点1中拉取N1次数据分片;计算节点2上的N2个map task中的每个map task均生成M个数据分片,则reduce task 1需从计算节点2中拉取N2次数据分片,计算节点3上的N3个map task中的每个map task均生成M个数据分片,则reduce task 1需从计算节点3中拉取N3次数据分片,reduce task 1总共需要拉取N(N1+N2+N3=N)次;同理,reduce task 2需要拉取所有map task生成数据分片中的第2个数据分片,例如,reduce task 2从计算节点1中拉取N1次数据分片,从计算节点2中拉取N2次数据分片,从计算节点3中拉取N3次数据分片,总共需要拉取N次。Continuing with Figure 2, in the Reduce phase, M reduce tasks need to be executed. These M reduce tasks can be executed by one or more compute nodes. Figure 2 illustrates that the M reduce tasks are executed by M compute nodes respectively. For example, compute node 4 executes reduce task 1, compute node 5 executes reduce task 2, and compute node 6 executes reduce task M. As shown in Figure 2, reduce task 1 needs to pull the first data shard from all the data shards generated by map tasks. Specifically, each of the N1 map tasks on compute node 1 generates M data shards. Therefore, reduce task 1 needs to pull N1 data shards from compute node 1. Each of the N2 map tasks on compute node 2 generates M data shards. Therefore, reduce task 1 needs to pull N2 data shards from compute node 2. The N3 map tasks on compute node 3 generate M data shards. Each map task in ask generates M data shards, then reduce task 1 needs to pull N3 data shards from computing node 3, and reduce task 1 needs to pull N (N1+N2+N3=N) data shards a total of N times; similarly, reduce task 2 needs to pull the second data shard among the data shards generated by all map tasks. For example, reduce task 2 pulls N1 data shards from computing node 1, N2 data shards from computing node 2, and N3 data shards from computing node 3, and needs to pull N times in total.

结合图2,目前,在一种实现方式中,map task和reduce task由不同的计算节点执行时,上述数据拉取的过程中,需要跨计算节点进行数据拉取,例如,reduce task通过shuffle(数据拉取的操作)拉取多个map task生成的数据分片中相应的数据分片,shuffle是计算集群内跨节点、跨进程的数据分发机制,也可以理解为Reduce阶段与Map阶段之间进行数据交换。With reference to Figure 2, currently, in one implementation, when the map task and the reduce task are executed by different computing nodes, the data pulling process mentioned above needs to be performed across computing nodes. For example, the reduce task pulls the corresponding data shards from the data shards generated by multiple map tasks through shuffle (data pulling operation). Shuffle is a data distribution mechanism across nodes and processes within a computing cluster, and can also be understood as data exchange between the Reduce stage and the Map stage.

关于shuffle的更多内容以及通过shuffle拉取数据的详细过程可以参考现有技术资料,本申请不做详述。For more information about shuffle and the detailed process of pulling data through shuffle, please refer to the existing technical information, which will not be described in detail in this application.

上述shuffle过程中需要通过网络进行跨节点数据拉取,由于计算集群中的网络带宽资源有限,通过shuffle拉取大量map task生成的数据分片会占用较多的网络带宽,并且当存在大量的map task时,多次拉取海量数据分片使得网络每秒进行读写(I/O)操作(input/output operations per second,IOPS)的次数较多,容易引起网络拥塞,从而导致数据端到端(即从执行map task的计算节点到执行reduce task的计算节点)的传输延迟较大。另外,由于网络负担过重引发拥塞时,网络性能急剧恶化,影响其他作业的正常运行。The shuffle process requires cross-node data fetching over the network. Due to limited network bandwidth in the compute cluster, shuffling large numbers of data shards generated by map tasks consumes significant bandwidth. Furthermore, when there are a large number of map tasks, fetching these massive data shards repeatedly results in a high number of input/output operations per second (IOPS) on the network, which can easily cause network congestion and significant end-to-end data transmission latency (i.e., from the compute nodes executing the map tasks to the compute nodes executing the reduce tasks). Furthermore, when network congestion is caused by excessive network load, network performance deteriorates dramatically, impacting the normal operation of other jobs.

应理解,在Reduce阶段,reduce task通过shuffle拉取到的所有map task产生的某个数据分片之后,reduce task执行的算子的种类可以包括但不限于Join(数据拼接),Aggregate(聚合)、Filter(过滤)、Sort(排序)、distinct(去重)、subtract(集合求差)、sample(采样)等算子,其中,每一类的算子还可以包括不同类型的算子,例如Join算子可以包括cross join、inner join、outer join、self join等类型的算子。It should be understood that in the Reduce stage, after the reduce task pulls a data partition generated by all map tasks through shuffle, the types of operators executed by the reduce task may include but are not limited to Join (data splicing), Aggregate (aggregation), Filter (filtering), Sort (sorting), distinct (deduplication), subtract (set difference), sample (sampling) and other operators. Among them, each type of operator can also include different types of operators. For example, the Join operator can include cross join, inner join, outer join, self join and other types of operators.

Reduce阶段执行的算子中,有些算子的特点是运算前后数据量发生显著减少,即算子的输入数据量很大,经过算子运算之后,输出的数据量很小。也就说是,相对于这些算子,通过网络进行shuffle时,实际上拉取了海量的原始数据(即输入数据),数据传输的延迟将更加明显。Some operators executed during the Reduce phase feature a significant reduction in the amount of data before and after the operation. This means that the input data for these operators is large, but the output data is very small. This means that for these operators, shuffling across the network actually pulls in a massive amount of raw data (i.e., input data), resulting in more significant data transmission delays.

针对上述数据处理过程中,reduce task通过shuffle拉取所有map task产生的数据分片导致数据传输延迟较大的问题,本申请实施例提供一种数据处理方法、装置及计算集群,计算集群包括多个第一计算节点、多个第一数据处理单元以及多个第二计算节点,其中,每个第一计算节点连接一个第一数据处理单元,多个第一计算节点可以执行N个第一任务(如map task),N个第一任务用于处理待处理数据的N个数据分片,执行每个第一任务后,生成M个第一结果(M个第一结果是M个数据分片),且分别被标识为A0~AM-1;每个第一数据处理单元执行M个第二任务,M个第二任务中的每个第二任务用于对第一数据处理单元所连接的第一计算节点中的具有相同标识的第一结果进行处理,处理后生成第二结果,执行M个第二任务后生成M个第二结果,M个第二结果被标识为B0~BM-1;进而多个第二计算节点执行M个第三任务,每个第三任务用于对多个第一数据处理单元中的具有相同标识的第二结果进行处理,生成第三结果,执行M个第三任务后生成M个第三结果。In view of the problem that in the above data processing process, the reduce task pulls all the data shards generated by the map task through shuffle, resulting in a large data transmission delay, the embodiment of the present application provides a data processing method, device and computing cluster, the computing cluster includes multiple first computing nodes, multiple first data processing units and multiple second computing nodes, wherein each first computing node is connected to a first data processing unit, the multiple first computing nodes can execute N first tasks (such as map tasks), the N first tasks are used to process N data shards of the data to be processed, and after executing each first task, M first results (M second results) are generated. A result is M data fragments), and they are respectively identified as A0~AM-1; each first data processing unit executes M second tasks, each of the M second tasks is used to process the first result with the same identification in the first computing node connected to the first data processing unit, and generate a second result after processing. After executing M second tasks, M second results are generated, and the M second results are identified as B0~BM-1; then multiple second computing nodes execute M third tasks, each third task is used to process the second results with the same identification in multiple first data processing units, and generate a third result. After executing M third tasks, M third results are generated.

该方法将数据处理过程中,由每一个第一计算节点连接的第一数据处理单元对第一计算节点执行的多个第一任务后生成的第一结果先执行第二任务,生成第二结果,再由第二计算节点执行第三任务对多个计算节点生成的第二结果进行处理,可以避免同一计算节点执行多个第一任务后产生的数据直接被第二计算节点进行多次以及海量数据的拉取,从而解决因多次及海量数据拉取导致的数据传输延迟较大的问题,即能够降低数据处理过程中的数据传输时延,提高数据处理的效率。The method has the following steps: during the data processing process, the first data processing unit connected to each first computing node executes a second task on the first result generated after the first computing node executes multiple first tasks to generate a second result, and then the second computing node executes a third task to process the second results generated by the multiple computing nodes. This can avoid the data generated after the same computing node executes multiple first tasks being directly pulled multiple times and in large quantities by the second computing node, thereby solving the problem of large data transmission delay caused by multiple and massive data pulling, that is, it can reduce the data transmission delay in the data processing process and improve the efficiency of data processing.

本申请实施例提供的数据处理方法、装置及计算集群可以应用在基于Map-Reduce模型的大数据分布式计算的场景,下面对本申请实施例提供的计算集群、数据处理方法进行详细描述。The data processing method, device and computing cluster provided in the embodiments of the present application can be applied to scenarios of big data distributed computing based on the Map-Reduce model. The computing cluster and data processing method provided in the embodiments of the present application are described in detail below.

本申请实施例提供一种计算集群,图3为本申请实施例提供的一种计算集群的架构示意图,该计算集群包括多个第一计算节点301和多个第二计算节点302,每一个第一计算节点301连接有一个第一数据处理单元303。An embodiment of the present application provides a computing cluster. FIG3 is a schematic diagram of the architecture of a computing cluster provided in an embodiment of the present application. The computing cluster includes multiple first computing nodes 301 and multiple second computing nodes 302 . Each first computing node 301 is connected to a first data processing unit 303 .

计算集群中的计算节点(包括第一计算节点和第二计算节点)可以是服务器,例如是中心服务器、边缘服务器,或者是本地数据中心中的本地服务器。在一些实施例中,计算节点也可以是台式机、笔记本电脑或者智能手机等终端设备。The computing nodes (including the first computing node and the second computing node) in the computing cluster can be servers, such as central servers, edge servers, or local servers in a local data center. In some embodiments, the computing nodes can also be terminal devices such as desktop computers, laptop computers, or smart phones.

可选地,本申请实施例提供的数据处理单元可以是DPU(data processing unit,DPU),可以理解的,DPU是一种专用的数据处理单元(或称为数据处理器),具有较强的数据处理能力,DPU作为一种智能网卡,通过网卡接口与主机(如计算节点)连接,并在计算节点的控制下实现数据处理功能。当然,与每个计算节点连接的数据处理单元也可以是其他的数据处理设备,本申请实施例不做限定。Optionally, the data processing unit provided in the embodiments of the present application may be a DPU (data processing unit). It is understood that a DPU is a dedicated data processing unit (or data processor) with strong data processing capabilities. As a smart network card, the DPU connects to a host (such as a computing node) via a network card interface and performs data processing functions under the control of the computing node. Of course, the data processing unit connected to each computing node may also be other data processing devices, and the embodiments of the present application do not limit this.

对于一个数据处理任务,该数据处理任务用于处理待处理数据,将待处理数据划分为N个数据分片进行处理,本申请实施例中,执行该数据处理任务包括执行N个第一任务、M个第二任务以及M个第三任务,M,N均为大于或等于2的整数。以数据处理模型为Map-Reduce模型为例,上述的第一任务是map任务(以下简称为map task),上述的第二任务和第三任务是reduce任务(以下简称为reduce task)。N个第一任务(map task)由多个第一计算节点301执行,M个第二任务由第一数据处理单元303执行,M个第三任务由多个第二计算节点302执行。For a data processing task, the data processing task is used to process the data to be processed, and the data to be processed is divided into N data slices for processing. In the embodiment of the present application, executing the data processing task includes executing N first tasks, M second tasks, and M third tasks, where M and N are both integers greater than or equal to 2. Taking the Map-Reduce model as an example, the first task mentioned above is a map task (hereinafter referred to as map task), and the second and third tasks mentioned above are reduce tasks (hereinafter referred to as reduce task). The N first tasks (map tasks) are executed by multiple first computing nodes 301, the M second tasks are executed by the first data processing unit 303, and the M third tasks are executed by multiple second computing nodes 302.

本申请实施例中,使用Map-Reduce模型对待处理数据划分成的N个数据分片进行处理。In the embodiment of the present application, the Map-Reduce model is used to process the N data shards into which the data to be processed is divided.

结合图3,如图4所示,多个第一计算节点301,用于执行N个第一任务(map task),N个第一任务用于处理待处理数据的N个数据分片,即一个第一任务处理一个数据分片(N个第一任务与N个数据分片一一对应)。其中,每个第一计算节点301执行多个第一任务以处理N个数据分片中的多个数据分片,执行每个第一任务后生成M个第一结果,M个第一结果分别被标识为A0~AM-1。As shown in FIG4 , in conjunction with FIG3 , multiple first computing nodes 301 are configured to execute N first tasks (map tasks), each of which is configured to process N data shards of the to-be-processed data. Specifically, each first computing node 301 executes multiple first tasks to process multiple data shards of the N data shards. After executing each first task, M first results are generated, each of which is labeled A0 through AM-1.

多个第一数据处理单元303中的每个第一数据处理单元303,用于执行M个第二任务(reduce task),M个第二任务中的每个第二任务用于对第一数据处理单元所连接的第一计算节点中的具有相同标识的第一结果进行处理,处理后生成第二结果,执行M个第二任务后生成M个第二结果,M个第二结果被标识为B0~BM-1。参考图4,以一个第一数据处理单元执行的一个第二任务为例,执行第二任务时,从所连接的第一计算节点执行多个第一任务后得到的多个第一结果中,拉取标识均为A0的多个第一结果,然后执行第二任务,生成一个第二结果B0。Each of the multiple first data processing units 303 is configured to execute M second tasks (reduce tasks). Each of the M second tasks is configured to process first results with the same identifier in the first computing node to which the first data processing unit is connected, generating a second result after processing. After executing the M second tasks, M second results are generated, and the M second results are identified as B0 to BM-1. Referring to FIG4 , taking a second task executed by a first data processing unit as an example, when executing the second task, multiple first results with the identifier A0 are pulled from the multiple first results obtained after the connected first computing node executes multiple first tasks, and then the second task is executed to generate a second result B0.

多个第二计算节点302,用于执行M个第三任务(reduce task),每个第三任务用于对多个第一数据处理单元中的具有相同标识的第二结果进行处理,生成第三结果,执行M个第三任务后生成M个第三结果。其中,每一个第二计算节点302执行一个或多个第三任务,以对与该第二计算节点302通信连接的多个第一数据处理单元生成的第二结果进行处理,例如图4中,第一个第二计算节点302执行n1+1个reduce task,生成的第三结果被标识为C0~Cn1。参考图4,以一个第二计算节点302执行的第三任务为例,执行第三任务时,从多个第一数据处理单元生成的多个第二结果中,拉取标识均为B0的多个第二结果,然后执行第三任务,生成一个第三结果C0。Multiple second computing nodes 302 are configured to execute M third tasks (reduce tasks), each of which is configured to process second results with the same identifier from multiple first data processing units to generate a third result. After executing M third tasks, M third results are generated. Each second computing node 302 executes one or more third tasks to process second results generated by multiple first data processing units that are communicatively connected to the second computing node 302. For example, in FIG4 , the first second computing node 302 executes n1+1 reduce tasks, and the generated third results are identified as C0 to Cn1. Referring to FIG4 , taking the third task executed by a second computing node 302 as an example, when executing the third task, multiple second results with the identifier B0 are pulled from the multiple second results generated by the multiple first data processing units, and then the third task is executed to generate a single third result C0.

需要说明的是,多个第一任务中的每个第一任务的输入数据(即N个数据分片中的一个数据分片)不同,执行多个第一任务后,每个第一任务后生成第一结果不同。同理,多个第二任务中的每个第二任务的输入数据不同,多个第一数据处理单元执行多个第二任务后,生成的第二结果不同。因此,图4中对第一结果(A0~AM-1)、第二结果(B0~BM-1)的标识仅用于标识各个处理结果中的数据分片,并不是对处理结果中的数据分片的内容的限定。It should be noted that the input data of each of the multiple first tasks (i.e., one data shard among the N data shards) is different, and after executing the multiple first tasks, the first results generated by each first task are different. Similarly, the input data of each of the multiple second tasks is different, and after the multiple first data processing units execute the multiple second tasks, the second results generated are different. Therefore, the identification of the first results (A0~AM-1) and the second results (B0~BM-1) in Figure 4 is only used to identify the data shards in each processing result, and does not limit the content of the data shards in the processing results.

参考图4,例如,不同map task生成的M个第一结果中的第一个结果(即第一数据分片)均被标识为A0,但并不表示多个被标识为A0的第一结果的内容相同。又例如,不同第一数据处理单元执行第一个reduce task后生成的M个第二结果中的第一个结果均被表示为B0,但并不表示多个被标识为B0的第二结果的内容相同。Referring to Figure 4 , for example, the first result (i.e., the first data split) among M first results generated by different map tasks is labeled A0, but this does not mean that the contents of the multiple first results labeled A0 are identical. For another example, the first result among M second results generated by different first data processing units after executing the first reduce task is labeled B0, but this does not mean that the contents of the multiple second results labeled B0 are identical.

结合图4,如图5所示,本申请实施例提供的计算集群还可以包括多个第二数据处理单元304,每一个第二计算节点302连接一个第二数据处理单元304。计算集群包括多个第二数据处理单元304的情况下,上述由多个第二计算节点302执行M个第三任务(reduce task)的过程可以替换为由第二计算节点302所连接的第二数据处理单元304执行。例如,每一个第二计算节点可以指示与其连接的第二数据处理单元304处理一个或多个第三任务,以对多个第一数据处理单元中的具有相同标识的第二结果进行处理。多个第二计算节点执行M个第三任务后生成M个第三结果。In conjunction with FIG4 , as shown in FIG5 , the computing cluster provided in the embodiment of the present application may further include multiple second data processing units 304, with each second computing node 302 connected to a second data processing unit 304. When the computing cluster includes multiple second data processing units 304, the above-mentioned process of executing M third tasks (reduce tasks) by the multiple second computing nodes 302 may be replaced by execution by the second data processing units 304 connected to the second computing nodes 302. For example, each second computing node may instruct the second data processing unit 304 connected thereto to process one or more third tasks to process the second results with the same identifier in the multiple first data processing units. After the multiple second computing nodes execute the M third tasks, M third results are generated.

本申请实施例中,将第二计算节点执行的M个第三任务卸载至第二数据处理单元上执行,能够减轻第二计算节点的负载,从而节省第二计算节点的资源。In the embodiment of the present application, the M third tasks executed by the second computing node are offloaded to the second data processing unit for execution, which can reduce the load of the second computing node and thus save resources of the second computing node.

可选地,计算集群中还可以包括其他节点,例如管理节点,管理节点用于计算集群中的计算节点进行管理,管理节点可以具有任务调度的功能以及资源管理的功能,例如管理节点上可以部署任务调度器和/或资源管理器,以对数据处理任务进行调度,并为数据处理任务分配计算资源和内存资源。Optionally, the computing cluster may also include other nodes, such as a management node, which is used to manage the computing nodes in the computing cluster. The management node may have task scheduling functions and resource management functions. For example, a task scheduler and/or a resource manager may be deployed on the management node to schedule data processing tasks and allocate computing resources and memory resources to data processing tasks.

综上,本申请实施例提供的计算集群,由于可以由每一个第一计算节点连接的第一数据处理单元对第一计算节点执行的多个第一任务后生成的第一结果先执行第二任务,生成第二结果,再由第二计算节点执行第三任务对多个计算节点生成的第二结果进行处理,可以避免同一计算节点执行多个第一任务后产生的数据直接被第二计算节点进行多次以及海量数据的拉取,从而解决因多次及海量数据拉取导致的数据传输延迟较大的问题,能够降低数据处理过程中的数据传输时延,提高数据处理的效率。In summary, the computing cluster provided by the embodiment of the present application can avoid the data generated after the same computing node executes multiple first tasks directly by the second computing node and pulls massive data multiple times, thereby solving the problem of large data transmission delay caused by multiple and massive data pulling, reducing the data transmission delay in the data processing process and improving the efficiency of data processing.

可以理解的,计算集群中的多个第一计算节点301、多个第二计算节点302、多个第一数据处理单元303、多个第二数据处理单元304之间互相交互以完成待处理数据的处理过程。It can be understood that the multiple first computing nodes 301, the multiple second computing nodes 302, the multiple first data processing units 303, and the multiple second data processing units 304 in the computing cluster interact with each other to complete the processing of the data to be processed.

一种实现方式中,多个第一计算节点301中的第三计算节点,还用于发送第一请求至所连接的第一数据处理单元303,以指示第一数据处理单元303执行M个第二任务,其中,第三计算节点可以是多个第一计算节点301中的任意一个计算节点。可以理解的,每个第一计算节点301执行完多个第一任务(map task)之后,均向与其连接的第一数据处理单元303发送第一请求,从而第一数据处理单元303开始对第一计算节点内生成的多个第一结果执行第二任务。In one implementation, a third computing node among the plurality of first computing nodes 301 is further configured to send a first request to the connected first data processing unit 303 to instruct the first data processing unit 303 to execute M second tasks, wherein the third computing node may be any computing node among the plurality of first computing nodes 301. It is understandable that after each first computing node 301 executes the plurality of first tasks (map tasks), it sends the first request to the connected first data processing unit 303, so that the first data processing unit 303 begins to execute the second task on the plurality of first results generated in the first computing node.

可选地,上述第一请求中携带第二任务执行过程中的待调用算子的算子标识以及M个第一结果的存储位置信息,算子标识用于指示第二任务包含的待调用算子,存储位置信息用于指示执行M个第一结果的存储位置,例如M个第一结果在内存中的物理地址。如此,第一数据处理单元303根据存储位置信息读取M个第一结果,并调用算子标识指示的算子,以执行第二任务。Optionally, the first request includes the operator identifier of the operator to be called during the execution of the second task and the storage location information of the M first results. The operator identifier indicates the operator to be called included in the second task, and the storage location information indicates the storage location of the M first results, such as the physical address of the M first results in memory. In this manner, the first data processing unit 303 reads the M first results based on the storage location information and calls the operator indicated by the operator identifier to execute the second task.

一种实现方式中,多个第一数据处理单元303中的第三数据处理单元,还用于发送通知至多个第二计算节点中的第五计算节点,以指示M个第二任务执行结束,其中,第三数据处理单元是多个第一数据处理单元303中的任意一个第一数据处理单元,第五计算节点是与第三数据处理单元对应的第二计算节点(即用于对第三数据处理单元生成的结果进行处理的第二计算节点)。可选地,第三数据处理单元执行完M个第二任务之后,向与其连接的第一计算节点301发送指示M个第二任务执行结束的通知,进而,第一计算节点301向第二计算节点302转发该通知。In one implementation, a third data processing unit among the plurality of first data processing units 303 is further configured to send a notification to a fifth computing node among the plurality of second computing nodes to indicate the completion of execution of the M second tasks, wherein the third data processing unit is any first data processing unit among the plurality of first data processing units 303, and the fifth computing node is the second computing node corresponding to the third data processing unit (i.e., the second computing node for processing the results generated by the third data processing unit). Optionally, after the third data processing unit completes executing the M second tasks, it sends a notification indicating the completion of execution of the M second tasks to the first computing node 301 connected thereto, and the first computing node 301 then forwards the notification to the second computing node 302.

可选地,上述指示M个第二任务执行结束的通知中携带M个第二结果的存储位置信息。如此,第二计算节点接收到该通知之后,根据存储位置信息读取M个第二结果以执行第三任务。Optionally, the notification indicating the completion of the execution of the M second tasks carries the storage location information of the M second results. Thus, after receiving the notification, the second computing node reads the M second results according to the storage location information to execute the third task.

一种实现方式中,计算集群包括多个第二数据处理单元304的情况下,上述多个第二计算节点302中的第四计算节点,还用于发送第二请求至所连接的第二数据处理单元304,以指示第二数据处理单元304执行M个第三任务,其中,第四计算节点可以是多个第二计算节点301中的任意一个计算节点。可以理解的,每个第一数据处理单元303执行完多个第二任务(reduce task)之后,均向与其连接的第一数据处理单元303发送第一请求,从而第一数据处理单元303开始对第一计算节点内生成的多个第一结果执行第二任务。In one implementation, when the computing cluster includes multiple second data processing units 304, a fourth computing node among the multiple second computing nodes 302 is further configured to send a second request to the connected second data processing unit 304 to instruct the second data processing unit 304 to execute M third tasks, wherein the fourth computing node can be any computing node among the multiple second computing nodes 301. It is understandable that after each first data processing unit 303 executes multiple second tasks (reduce tasks), it sends a first request to the first data processing unit 303 connected thereto, so that the first data processing unit 303 begins to execute the second task on the multiple first results generated in the first computing node.

可选地,上述第二请求中携带第三任务执行过程中的待调用算子的算子标识以及M个第二结果的存储位置。如此,第二数据处理单元304根据存储位置信息读取M个第二结果,并调用算子标识指示的算子,以执行第三任务。Optionally, the second request carries the operator identifier of the operator to be called during the execution of the third task and the storage location of the M second results. In this way, the second data processing unit 304 reads the M second results according to the storage location information and calls the operator indicated by the operator identifier to execute the third task.

可以理解的,每一个第一数据处理单元303执行均执行M个第二任务,执行第二任务过程实际是调用第二任务包含的待调用算子。It can be understood that each first data processing unit 303 executes M second tasks, and the process of executing the second task is actually calling the to-be-called operator included in the second task.

基于此,一种实现方式中,多个第一数据处理单元303中的第四数据处理单元,还用于接收第三请求,以指示更新第四数据处理单元待调用的算子,并加载待更新算子的算子逻辑微码至第四数据处理单元,以将待更新算子持久化至第四数据处理单元。其中,第四数据处理单元可以是多个第一数据处理单元303中的任意一个数据处理单元。Based on this, in one implementation, a fourth data processing unit among the plurality of first data processing units 303 is further configured to receive a third request instructing the fourth data processing unit to update an operator to be called, and to load the operator logic microcode of the operator to be updated into the fourth data processing unit, thereby persisting the operator to be updated in the fourth data processing unit. The fourth data processing unit may be any one of the plurality of first data processing units 303.

可选地,上述第三请求中携带待更新算子的算子逻辑微码,还可以包括算子标识,从而按照算子标识与算子逻辑微码的对应关系存储算子逻辑微码。Optionally, the third request carries the operator logic microcode of the operator to be updated and may further include an operator identifier, so that the operator logic microcode is stored according to the correspondence between the operator identifier and the operator logic microcode.

应理解,通常,由于数据处理单元(例如DPU)的硬件物理资源受限,往往只能固化少量高频使用的硬件算子。本申请实施例中,数据处理单元的逻辑电路支持硬件算子更新,因此,可以灵活地对数据处理单元中的算子进行更新,扩充了数据处理单元可执行的算子的范围。It should be understood that, generally, due to the limited hardware physical resources of a data processing unit (e.g., a DPU), only a small number of frequently used hardware operators can be solidified. In the embodiments of the present application, the logic circuit of the data processing unit supports hardware operator updates. Therefore, operators in the data processing unit can be flexibly updated, expanding the range of operators that can be executed by the data processing unit.

可选地,本申请实施例中,上述待更新算子可以是用户自定义的算子,用户根据实际数据处理需求,自行设计生成算子逻辑微码,将自定义的算子更新至数据处理单元,即将算子逻辑微码持久化至数据处理单元内。支持用户自定义算子能够满足不同用户的需求,使得计算集群的应用范围更加广泛。Optionally, in this embodiment of the present application, the operator to be updated can be a user-defined operator. Based on actual data processing requirements, the user can design and generate the operator logic microcode and update the customized operator to the data processing unit, thereby persisting the operator logic microcode within the data processing unit. Supporting user-defined operators can meet the needs of different users and expand the application range of computing clusters.

以下实施例从计算集群中的各个计算节点、数据处理单元之间的内部模块交互的角度对数据处理的过程进行描述。以图5所示的计算集群为例,为了便于描述,以该计算集群中的2个第一计算点和1个第二计算节点为例说明计算集群处理数据的过程。The following embodiments describe the data processing process from the perspective of the internal module interactions between the various computing nodes and data processing units in a computing cluster. Taking the computing cluster shown in Figure 5 as an example, for ease of description, the data processing process of the computing cluster is explained using two first computing nodes and one second computing node in the computing cluster as examples.

如图6所示,对于计算集群中的第一计算节点301a、与第一计算节点301a连接的第一数据处理单元303a、第一计算节点301b、与第一计算节点301b连接的第一数据处理单元303b、第二计算节点302以及与第二计算节点302连接的第二数据处理单元304。As shown in Figure 6, for the first computing node 301a in the computing cluster, the first data processing unit 303a connected to the first computing node 301a, the first computing node 301b, the first data processing unit 303b connected to the first computing node 301b, the second computing node 302 and the second data processing unit 304 connected to the second computing node 302.

其中,第一计算节点301a、第一计算节点301b以及第二计算节点302中均包括代理(agent)模块,代理模块负责将接收到的请求从业务语义转换成硬件计算的请求,第一数据处理单元303a、第一数据处理单元303b以及第二数据处理单元304中均包括请求处理(request handler)模块和数据卸载引擎(data offloading engine,DOE),该DOE是用于算子运算的硬件模块。Among them, the first computing node 301a, the first computing node 301b and the second computing node 302 all include an agent module, which is responsible for converting the received request from business semantics into a hardware computing request. The first data processing unit 303a, the first data processing unit 303b and the second data processing unit 304 all include a request handler module and a data offloading engine (DOE), which is a hardware module for operator calculation.

其中,第一计算节点301a执行完多个map task(第一任务)之后,每一个map task生成M个第一结果(标识为A0~AM-1),第一计算节点301a中的代理模块向第一数据处理单元303a中的请求处理模块发送第一请求,以指示第一数据处理单元303a对第一计算节点301b生成的多个第一结果执行reduce task(第二任务)。Among them, after the first computing node 301a executes multiple map tasks (first tasks), each map task generates M first results (identified as A0~AM-1), and the agent module in the first computing node 301a sends a first request to the request processing module in the first data processing unit 303a to instruct the first data processing unit 303a to execute the reduce task (second task) on the multiple first results generated by the first computing node 301b.

第一数据处理单元303a中的请求处理模块将第一请求转发至第一数据处理单元303a的DOE,以使得DOE执行reduce task。例如,参考图6,对于每一个第一计算节点301a,由第一数据处理单元303a拉取每一个map task生成的第一结果A0,并执行reduce task(reduce task执行过程中需要调用的包括如Join算子、Aggregate算子等),执行结束后得到一个第二结果B0。The request processing module in the first data processing unit 303a forwards the first request to the DOE of the first data processing unit 303a, causing the DOE to execute the reduce task. For example, referring to Figure 6, for each first computing node 301a, the first data processing unit 303a pulls the first result A0 generated by each map task and executes the reduce task (the reduce task execution process may require the invocation of operators such as Join and Aggregate), resulting in a second result B0.

应理解,第一计算节点301b以及第一数据处理单元303b进行数据处理的过程可以参考第一计算节点301a以及第一数据处理单元303a执行数据处理的过程,此处不再赘述。It should be understood that the process of data processing performed by the first computing node 301b and the first data processing unit 303b can refer to the process of data processing performed by the first computing node 301a and the first data processing unit 303a, which will not be repeated here.

进一步的,第一数据处理单元303a和第一数据处理单元303b均执行完多个reduce task之后,通过代理模块通知第二计算节点302的代理模块,进而第二计算节点302的代理模块向第二数据处理单元304的请求处理模块发送第二请求,以指示第二数据处理单元304执行M个reduce task(第三任务)。Furthermore, after the first data processing unit 303a and the first data processing unit 303b have executed multiple reduce tasks, they notify the proxy module of the second computing node 302 through the proxy module, and then the proxy module of the second computing node 302 sends a second request to the request processing module of the second data processing unit 304 to instruct the second data processing unit 304 to execute M reduce tasks (the third task).

第二数据处理单元304的请求处理模块将第二请求转发至第二数据处理单元304的DOE,以使得DOE执行reduce task。例如,参考图6,第二数据处理单元304拉取第一数据处理单元303a生成的第二结果B0和第一数据处理单元303b生成的第二结果B0,并执行reduce task,执行结束后得到一个第三结果C0。The request processing module of the second data processing unit 304 forwards the second request to the DOE of the second data processing unit 304, causing the DOE to execute the reduce task. For example, referring to Figure 6, the second data processing unit 304 pulls the second result B0 generated by the first data processing unit 303a and the second result B0 generated by the first data processing unit 303b, and executes the reduce task, obtaining a third result C0 after the execution is completed.

基于以上内容可知,本申请实施例中,将原本由第二计算节点302完成的Reduce阶段的全部算子运算按照计算节点的粒度,先在第一计算节点内部执行了节点级的reduce task,得到M个第二结果,如此,减少了第二计算节点进行数据拉取的次数以及数据量。Based on the above content, it can be seen that in the embodiment of the present application, all operator operations in the Reduce stage originally completed by the second computing node 302 are first executed as node-level reduce tasks within the first computing node according to the granularity of the computing node to obtain M second results. In this way, the number of times and the amount of data pulled by the second computing node are reduced.

基于上述实施例描述的计算集群的架构,本申请实施例还提供一种数据处理方法,下面从一个第一计算节点、第一数据处理单元(第一DPU)、第二计算节点、以及第二数据处理单元(第二DPU)的内部的各个模块进行交互的角度对本申请实施例提供的数据处理方法进行更加详细的描述。Based on the architecture of the computing cluster described in the above embodiments, the embodiments of the present application also provide a data processing method. The data processing method provided in the embodiments of the present application is described in more detail below from the perspective of interaction between various modules within a first computing node, a first data processing unit (first DPU), a second computing node, and a second data processing unit (second DPU).

如图7所示,一种实现方式中,本申请实施例提供的数据处理方法包括如下步骤。As shown in FIG7 , in one implementation, the data processing method provided in an embodiment of the present application includes the following steps.

S701、第一计算节点的处理模块执行多个第一任务之后,向第一计算节点的代理模块发送通知消息以指示多个第一任务执行结束。S701: After executing a plurality of first tasks, the processing module of the first computing node sends a notification message to the agent module of the first computing node to indicate completion of execution of the plurality of first tasks.

每一个第一任务执行后生成M个第一结果(分别被标识为A0~AM-1)。可选地,第一计算节点执行每个第一任务后生成的第一结果缓存在第一计算节点的内存中。After each first task is executed, M first results are generated (respectively labeled A0 to AM-1). Optionally, after the first computing node executes each first task, the first results generated are cached in the memory of the first computing node.

S702、第一计算节点的代理模块向第一DPU的请求处理模块发送第一请求。S702. The agent module of the first computing node sends a first request to the request processing module of the first DPU.

第一请求用于指示第一DPU执行M个第二任务,第一请求中携带第二任务执行过程中的待调用算子的算子标识以及M个第一结果的存储位置信息,关于第一请求的描述可参考上述实施例的描述,此处不再赘述。The first request is used to instruct the first DPU to execute M second tasks. The first request carries the operator identifier of the operator to be called during the execution of the second task and the storage location information of the M first results. For the description of the first request, please refer to the description of the above embodiment and will not be repeated here.

S703、第一DPU的请求处理模块接收到第一请求之后,申请满足需求大小的缓存。S703: After receiving the first request, the request processing module of the first DPU applies for a cache that meets the required size.

申请的缓存用于存储第一DPU执行任务之后生成的结果。The requested cache is used to store the result generated after the first DPU executes the task.

S704、第一DPU的请求处理模块向第一DPU的DOE发送第一请求,以触发第一DPU的DOE执行M个第二任务。S704: The request processing module of the first DPU sends a first request to the DOE of the first DPU to trigger the DOE of the first DPU to execute M second tasks.

可选地,第一DPU的请求处理模块将上述待调用算子的算子标识以及M个第一结果的存储位置信息填充到第一DPU的DOE的寄存器中,以触发DOE执行M个第二任务。Optionally, the request processing module of the first DPU fills the operator identifier of the operator to be called and the storage location information of the M first results into the register of the DOE of the first DPU to trigger the DOE to execute the M second tasks.

S705、第一DPU的DOE执行M个第二任务,生成M个第二结果(分别被标识为B0~BM-1)。S705 : The DOE of the first DPU executes M second tasks and generates M second results (respectively labeled as B0 to BM-1).

第一DPU的DOE获取到待调用算子的算子标识以及M个第一结果的存储位置信息之后,获取M个第一结果,并执行M个第二任务,得到M个第二结果。After the DOE of the first DPU obtains the operator identifier of the operator to be called and the storage location information of the M first results, it obtains the M first results and executes the M second tasks to obtain the M second results.

执行第二任务中调用的算子中可以包括具有下述特性的算子:输入数据量较大,执行算子后输出的数据量明显减少的算子。例如上述实施例中提及到的Join算子、Aggregate算子等等。第二任务调用的算子也可以包括其他的算子,例如Sort算子等。The operators called during the execution of the second task may include operators with the following characteristics: operators that have a large input data volume and a significantly reduced output data volume after execution. Examples include the Join operator and Aggregate operator mentioned in the above embodiments. The operators called during the second task may also include other operators, such as the Sort operator.

S706、第一DPU的DOE将M个第二结果写入申请的缓存中。S706 : The DOE of the first DPU writes the M second results into the requested cache.

S707、第一DPU通过第一DPU的请求处理模块向第一计算节点的代理模块发送通知消息,该通知消息用于通知M个第二任务执行结束。S707: The first DPU sends a notification message to the agent module of the first computing node through the request processing module of the first DPU. The notification message is used to notify the completion of execution of the M second tasks.

该通知消息中可以携带M个第二结果的存储位置信息,以及M个第二结果的数据长度。The notification message may carry the storage location information of the M second results and the data length of the M second results.

S708、第一计算节点的代理模块向第二计算节点的代理模块转发S707中的通知消息。S708. The proxy module of the first computing node forwards the notification message in S707 to the proxy module of the second computing node.

S709、第二计算节点通过其处理模块、代理模块向第二DPU的请求处理模块发送第二请求。S709. The second computing node sends a second request to the request processing module of the second DPU through its processing module and proxy module.

该第二请求用于指示第二数据处理单元执行M个第三任务,第二请求中携带第三任务执行过程中的待调用算子的算子标识以及M个第二结果的存储位置信息。The second request is used to instruct the second data processing unit to execute M third tasks. The second request carries the operator identifier of the operator to be called during the execution of the third task and the storage location information of the M second results.

可选地,第二计算节点的代理模块将第一DPU生成的M个第二结果的存储位置信息以及执行第三任务过程中待调用算子的算子标识,按照接口格式填充到第二请求中发送至第二DPU的请求处理模块。Optionally, the proxy module of the second computing node fills the storage location information of the M second results generated by the first DPU and the operator identifier of the operator to be called during the execution of the third task into the second request according to the interface format and sends it to the request processing module of the second DPU.

S710、第二DPU的请求处理模块从缓存中获取第一DPU生成的M个第二结果。S710: The request processing module of the second DPU obtains M second results generated by the first DPU from the cache.

具体的,第二DPU的请求处理模块从每一个第一DPU对应的缓存中获取M个第二结果,即第二DPU的请求处理模块获取多个第一计算节点连接多个第一DPU生成的M个第二结果。Specifically, the request processing module of the second DPU obtains M second results from the cache corresponding to each first DPU, that is, the request processing module of the second DPU obtains M second results generated by connecting multiple first computing nodes to multiple first DPUs.

S711、第二DPU的请求处理模块申请缓存。S711: The request processing module of the second DPU applies for cache.

S712、第二DPU的请求处理模块向第二DPU的DOE发送第二请求。S712: The request processing module of the second DPU sends a second request to the DOE of the second DPU.

该第二请求还携带从缓存中获取的M个第二结果(由每一个第一DPU执行M个第二任务生成)。The second request also carries M second results obtained from the cache (generated by each first DPU executing the M second tasks).

S713、第二DPU的DOE执行M个第三任务,生成M个第三结果。S713: The DOE of the second DPU executes M third tasks and generates M third results.

S714、第二DPU的DOE将M个第三结果写入缓存中。S714 : The DOE of the second DPU writes the M third results into the cache.

S715、第二DPU的DOE通过第二DPU的请求处理模块向第二计算节点发送指示M个第三任务执行结束的通知消息。S715. The DOE of the second DPU sends a notification message indicating completion of execution of the M third tasks to the second computing node through the request processing module of the second DPU.

S716、第二计算节点从缓存中获取M个第三结果。S716. The second computing node obtains M third results from the cache.

如图8所示,另一种实现方式中,本申请实施例提供的数据处理方法包括如下步骤。As shown in FIG8 , in another implementation, the data processing method provided in an embodiment of the present application includes the following steps.

S801、第一计算节点的处理模块执行多个第一任务之后,向第二计算节点的代理模块发送通知消息以指示多个第一任务执行结束。S801: After executing a plurality of first tasks, the processing module of the first computing node sends a notification message to the agent module of the second computing node to indicate completion of execution of the plurality of first tasks.

每一个第一任务执行后生成M个第一结果。After each first task is executed, M first results are generated.

S802、第二计算节点的代理模块向第一计算节点的代理模块发送第四请求。S802: The proxy module of the second computing node sends a fourth request to the proxy module of the first computing node.

第四请求用于指示由与第一计算节点连接的第一DPU执行M个第二任务。The fourth request is used to instruct the first DPU connected to the first computing node to execute M second tasks.

一种实现方式中,第二计算节点可以从元数据服务器节点(可以由管理节点实现)获取执行第二任务所需的数据(M个第一结果)所在的计算节点清单,并且向第一计算节点的代理(agent)模块发送第四请求。In one implementation, the second computing node can obtain a list of computing nodes where the data required to perform the second task (M first results) are located from the metadata server node (which can be implemented by the management node), and send a fourth request to the agent module of the first computing node.

该第四请求中中携带数据的索引信息以及第二任务执行过程中待调用的算子的算子标识,其中,数据的索引信息用于指示第二任务所需的数据的特征,数据的索引信息可以包括应用的标识信息(例如APP id)、shuffle id,以及stage id,其中,APP id用于指示输入数据来自哪个应用,shuffle id用于指示输入数据对应应用的哪个shuffle操作,stage id用于指示任务(指示第二任务)。The fourth request carries the index information of the data and the operator identifier of the operator to be called during the execution of the second task, wherein the index information of the data is used to indicate the characteristics of the data required for the second task, and the index information of the data may include the identification information of the application (such as APP id), shuffle id, and stage id, wherein APP id is used to indicate which application the input data comes from, shuffle id is used to indicate which shuffle operation of the application the input data corresponds to, and stage id is used to indicate the task (indicating the second task).

S803、第一计算节点的代理模块根据第四请求,向第一DPU的请求处理模块发送第一请求。S803. The agent module of the first computing node sends the first request to the request processing module of the first DPU according to the fourth request.

关于第一请求的描述可以参考上述实施例中的相关描述。For the description of the first request, reference may be made to the relevant description in the above embodiment.

第一计算节点通过索引信息在映射表(索引信息与地址的映射表)确定出第一计算节点执行多个第一任务后生成的M个第一结果在内存中的存储位置(如物理地址)。The first computing node determines the storage locations (such as physical addresses) in the memory of M first results generated after the first computing node executes multiple first tasks through the index information in a mapping table (a mapping table of index information and addresses).

S804、第一DPU的请求处理模块接收到第一请求之后,申请满足需求大小的缓存。S804: After receiving the first request, the request processing module of the first DPU applies for a cache that meets the required size.

S805、第一DPU的请求处理模块向第一DPU的DOE发送第一请求,以触发第一DPU的DOE执行M个第二任务。S805: The request processing module of the first DPU sends a first request to the DOE of the first DPU to trigger the DOE of the first DPU to execute M second tasks.

S806、第一DPU的DOE执行M个第二任务,生成M个第二结果。S806: The DOE of the first DPU executes M second tasks and generates M second results.

S807、第一DPU的DOE将M个第二结果写入申请的缓存中。S807 : The DOE of the first DPU writes the M second results into the requested cache.

S808、第一DPU通过第一DPU的请求处理模块向第一计算节点的代理模块发送通知消息,该通知消息用于通知M个第二任务执行结束。S808. The first DPU sends a notification message to the agent module of the first computing node through the request processing module of the first DPU. The notification message is used to notify the completion of execution of the M second tasks.

S809、第一计算节点的代理模块向第二计算节点的代理模块转发S808中的通知消息。S809. The proxy module of the first computing node forwards the notification message in S808 to the proxy module of the second computing node.

S810、第二计算节点通过其处理模块、代理模块向第二DPU的请求处理模块发送第二请求。S810. The second computing node sends a second request to the request processing module of the second DPU through its processing module and proxy module.

关于第二请求的描述可以参考上述实施例中的相关描述。For the description of the second request, reference may be made to the relevant description in the above embodiment.

S811、第二DPU的请求处理模块从缓存中获取第一DPU生成的M个第二结果。S811. The request processing module of the second DPU obtains M second results generated by the first DPU from the cache.

S812、第二DPU的请求处理模块申请缓存。S812: The request processing module of the second DPU applies for cache.

S813、第二DPU的请求处理模块向第二DPU的DOE发送第二请求。S813: The request processing module of the second DPU sends a second request to the DOE of the second DPU.

该第二请求还携带从缓存中获取的M个第二结果(由每一个第一DPU执行M个第二任务生成)。The second request also carries M second results obtained from the cache (generated by each first DPU executing the M second tasks).

S814、第二DPU的DOE执行M个第三任务,生成M个第三结果。S814. The DOE of the second DPU executes M third tasks and generates M third results.

S815、第二DPU的DOE将M个第三结果写入缓存中。S815. The DOE of the second DPU writes the M third results into the cache.

S816、第二DPU的DOE通过第二DPU的请求处理模块向第二计算节点发送指示M个第三任务执行结束的通知消息。S816. The DOE of the second DPU sends a notification message indicating the completion of execution of the M third tasks to the second computing node through the request processing module of the second DPU.

S817、第二计算节点的从缓存中获取M个第三结果。S817. The second computing node obtains M third results from the cache.

综上S701-S716,以及S801-S817所述的数据处理方法,由于可以由每一个第一计算节点连接的第一数据处理单元对第一计算节点执行的多个第一任务后生成的第一结果先执行第二任务,生成第二结果,再由第二计算节点执行第三任务对多个计算节点生成的第二结果进行处理,可以避免同一计算节点执行多个第一任务后产生的数据直接被第二计算节点进行多次以及海量数据的拉取,从而解决因多次及海量数据拉取导致的数据传输延迟较大的问题,能够降低数据处理过程中的数据传输时延,提高数据处理的效率。In summary, the data processing method described in S701-S716 and S801-S817, since the first data processing unit connected to each first computing node can first execute the second task on the first result generated after the first computing node executes multiple first tasks to generate the second result, and then the second computing node executes the third task to process the second results generated by multiple computing nodes, it can avoid the data generated after the same computing node executes multiple first tasks being directly pulled by the second computing node multiple times and massive data, thereby solving the problem of large data transmission delay caused by multiple and massive data pulling, and can reduce the data transmission delay in the data processing process and improve the efficiency of data processing.

结合上述实施例的内容,本申请实施例中,DPU可以包括可编程逻辑电路(如FPGA),能够支持用户将自定义算子更新至DPU,即持久化至DPU,使得DPU执行任务的过程中可以灵活得进行算子更新。In combination with the contents of the above embodiments, in an embodiment of the present application, the DPU may include a programmable logic circuit (such as an FPGA), which can support users to update custom operators to the DPU, that is, persist them to the DPU, so that operators can be updated flexibly during the DPU execution of tasks.

本申请实施例中,通过计算节点与其连接的DPU进行交互可以实现算子更新,参考图9,算子更新过程包括如下步骤。In an embodiment of the present application, operator update can be achieved by interacting with the computing node and the DPU connected thereto. Referring to FIG9 , the operator update process includes the following steps.

S901、计算节点的代理模块接收第三请求,以指示与该计算节点连接的DPU更新待调用的算子。S901. The proxy module of the computing node receives a third request to instruct the DPU connected to the computing node to update an operator to be called.

第三请求中携带待更新算子的算子标识(如算子id)和算子逻辑微码。The third request carries the operator identifier (such as operator ID) and the operator logic microcode of the operator to be updated.

可选地,本申请实施例中,第三请求可以由用户触发,用户可以通过配置界面下发硬件算子在线更新的请求至计算节点的代理模块。Optionally, in an embodiment of the present application, the third request may be triggered by a user, and the user may send a request for online update of the hardware operator to the proxy module of the computing node through the configuration interface.

S902、计算节点的代理模块向DPU的请求处理模块发送第三请求。S902: The proxy module of the computing node sends a third request to the request processing module of the DPU.

S903、DPU的请求处理模块向DPU的DOE转发该第三请求。S903: The request processing module of the DPU forwards the third request to the DOE of the DPU.

S904、DPU的DOE存储算子标识对应的算子逻辑微码。S904. The DOE of the DPU stores the operator logic microcode corresponding to the operator identifier.

DPU的DOE接收到第三请求之后,根据第三请求中的算子标识先判断该算子标识所指示的算子是否为新的算子,即确定算子逻辑微码是否为新的算子逻辑微码,若是新的算子逻辑微码,则DOE将该算子逻辑微码持久化保存在DPU的闪存(flash)中。After the DPU's DOE receives the third request, it first determines whether the operator indicated by the operator identifier in the third request is a new operator, that is, determines whether the operator logic microcode is a new operator logic microcode. If it is a new operator logic microcode, the DOE will persist the operator logic microcode in the DPU's flash memory.

S905、DPU的DOE将算子逻辑微码从flash中加载到DOE的硬件引擎池中。S905. The DPU's DOE loads the operator logic microcode from the flash into the DOE's hardware engine pool.

可以理解的,DOE接收到上述实施例中的第一请求或第二请求时,根据请求中携带的算子标识将算子的算子逻辑微码从flash中加载到DOE的硬件引擎池中,从而执行相应的任务。It can be understood that when DOE receives the first request or the second request in the above embodiment, it loads the operator logic microcode of the operator from the flash into the hardware engine pool of DOE according to the operator identifier carried in the request, thereby executing the corresponding task.

综上对S901-S905的描述,本申请实施例中,数据处理单元的逻辑电路支持硬件算子更新,因此,可以灵活地对数据处理单元中的算子进行更新,扩充了数据处理单元可执行的算子的范围。并且上述待更新算子可以是用户自定义的算子,支持用户自定义算子能够满足不同用户的需求,使得计算集群的应用范围更加广泛。In summary, as described above for S901-S905, in this embodiment of the present application, the logic circuitry of the data processing unit supports hardware operator updates. Therefore, operators within the data processing unit can be flexibly updated, expanding the range of operators that can be executed by the data processing unit. Furthermore, these operators to be updated can be user-defined operators. Supporting user-defined operators can meet the needs of diverse users, broadening the application scope of the computing cluster.

本申请实施例还提供一种计算节点,包括存储器和与存储器连接的至少一个处理器,该存储器用于存储计算机程序代码,计算机程序代码包括计算机指令,当计算机指令被至少一个处理器执行时,使得计算节点执行计算集群中第一计算节点所执行的动作;或者,执行计算集群中第二计算节点所执行的动作。An embodiment of the present application also provides a computing node, including a memory and at least one processor connected to the memory, the memory being used to store computer program code, the computer program code including computer instructions, which, when executed by at least one processor, causes the computing node to perform an action performed by a first computing node in a computing cluster; or, to perform an action performed by a second computing node in the computing cluster.

本申请实施例提供一种数据处理单元,包括存储器和与存储器连接的至少一个处理器,该存储器用于存储计算机程序代码,计算机程序代码包括计算机指令,当计算机指令被至少一个处理器执行时,使得数据处理单元执行计算集群中第一数据处理单元所执行的动作;或者,执行计算集群中第二数据处理单元所执行的动作。An embodiment of the present application provides a data processing unit, including a memory and at least one processor connected to the memory, wherein the memory is used to store computer program code, and the computer program code includes computer instructions. When the computer instructions are executed by the at least one processor, the data processing unit performs the action performed by the first data processing unit in the computing cluster; or, performs the action performed by the second data processing unit in the computing cluster.

本申请实施例还提供一种计算机可读存储介质,存储有计算机指令,计算机指令在计算机上运行时,执行上述实施例中所述的数据处理方法。An embodiment of the present application further provides a computer-readable storage medium storing computer instructions. When the computer instructions are run on a computer, the data processing method described in the above embodiment is executed.

本申请实施例还提供一种计算机程序产品,该计算机程序产品包含计算机指令,当计算机指令在计算机上运行时,执行上述实施例中所述的数据处理方法。An embodiment of the present application further provides a computer program product, which includes computer instructions. When the computer instructions are run on a computer, the data processing method described in the above embodiment is executed.

本申请实施例还提供一种芯片系统,包括:处理器,用于从存储器中调用并运行计算机程序,使得安装有芯片系统的设备执行上述实施例中所述的数据处理方法。An embodiment of the present application also provides a chip system, including: a processor, used to call and run a computer program from a memory, so that a device equipped with the chip system executes the data processing method described in the above embodiment.

在上述实施例中,可以全部或部分地通过软件、硬件、固件或者其任意组合来实现。当使用软件程序实现时,可以全部或部分地以计算机程序产品的形式实现。该计算机程序产品包括一个或多个计算机指令。在计算机上加载和执行该计算机指令时,全部或部分地产生按照本申请实施例中的流程或功能。该计算机可以是通用计算机、专用计算机、计算机网络或者其他可编程装置。该计算机指令可以存储在计算机可读存储介质中,或者从一个计算机可读存储介质向另一个计算机可读存储介质传输,例如,该计算机指令可以从一个网站站点、计算机、服务器或数据中心通过有线(例如同轴电缆、光纤、数字用户线(digital subscriber line,DSL))方式或无线(例如红外、无线、微波等)方式向另一个网站站点、计算机、服务器或数据中心传输。该计算机可读存储介质可以是计算机能够存取的任何可用介质或者是包括一个或多个可用介质集成的服务器、数据中心等数据存储设备。该可用介质可以是磁性介质(例如,软盘、磁盘、磁带)、光介质(例如,数字视频光盘(digital video disc,DVD))、或者半导体介质(例如固态硬盘(solid state drives,SSD))等。In the above embodiments, all or part of the embodiments may be implemented by software, hardware, firmware or any combination thereof. When implemented using a software program, all or part of the embodiments may be implemented in the form of a computer program product. The computer program product includes one or more computer instructions. When the computer instructions are loaded and executed on a computer, all or part of the processes or functions in accordance with the embodiments of the present application are generated. The computer may be a general-purpose computer, a special-purpose computer, a computer network or other programmable device. The computer instructions may be stored in a computer-readable storage medium or transmitted from one computer-readable storage medium to another computer-readable storage medium. For example, the computer instructions may be transmitted from one website, computer, server or data center to another website, computer, server or data center via a wired (e.g., coaxial cable, optical fiber, digital subscriber line (DSL)) or wireless (e.g., infrared, wireless, microwave, etc.) method. The computer-readable storage medium may be any available medium that a computer can access or a data storage device such as a server or data center that includes one or more available media integrated therein. The available medium can be a magnetic medium (e.g., floppy disk, magnetic disk, tape), an optical medium (e.g., a digital video disc (DVD)), or a semiconductor medium (e.g., a solid state drive (SSD)), etc.

通过以上的实施方式的描述,所属领域的技术人员可以清楚地了解到,为描述的方便和简洁,仅以上述各功能模块的划分进行举例说明,实际应用中,可以根据需要而将上述功能分配由不同的功能模块完成,即将装置的内部结构划分成不同的功能模块,以完成以上描述的全部或者部分功能。上述描述的系统,装置和单元的具体工作过程,可以参考前述方法实施例中的对应过程,在此不再赘述。Through the description of the above embodiments, those skilled in the art will clearly understand that for the sake of convenience and brevity, only the division of the above functional modules is used as an example. In actual applications, the above functions can be assigned to different functional modules as needed, that is, the internal structure of the device can be divided into different functional modules to complete all or part of the functions described above. The specific working processes of the above-described systems, devices, and units can refer to the corresponding processes in the aforementioned method embodiments and will not be repeated here.

在本申请所提供的几个实施例中,应该理解到,所揭露的系统,装置和方法,可以通过其它的方式实现。例如,以上所描述的装置实施例仅仅是示意性的,例如,所述模块或单元的划分,仅仅为一种逻辑功能划分,实际实现时可以有另外的划分方式,例如多个单元或组件可以结合或者可以集成到另一个系统,或一些特征可以忽略,或不执行。另一点,所显示或讨论的相互之间的耦合或直接耦合或通信连接可以是通过一些接口,装置或单元的间接耦合或通信连接,可以是电性,机械或其它的形式。In the several embodiments provided in this application, it should be understood that the disclosed systems, devices and methods can be implemented in other ways. For example, the device embodiments described above are merely schematic. For example, the division of the modules or units is only a logical function division. In actual implementation, there may be other division methods, such as multiple units or components can be combined or integrated into another system, or some features can be ignored or not executed. Another point is that the mutual coupling or direct coupling or communication connection shown or discussed can be an indirect coupling or communication connection through some interfaces, devices or units, which can be electrical, mechanical or other forms.

所述作为分离部件说明的单元可以是或者也可以不是物理上分开的,作为单元显示的部件可以是或者也可以不是物理单元,即可以位于一个地方,或者也可以分布到多个网络单元上。可以根据实际的需要选择其中的部分或者全部单元来实现本实施例方案的目的。The units described as separate components may or may not be physically separate, and the components shown as units may or may not be physical units, that is, they may be located in one place or distributed across multiple network units. Some or all of these units may be selected to achieve the purpose of this embodiment according to actual needs.

另外,在本申请各个实施例中的各功能单元可以集成在一个处理单元中,也可以是各个单元单独物理存在,也可以两个或两个以上单元集成在一个单元中。上述集成的单元既可以采用硬件的形式实现,也可以采用软件功能单元的形式实现。In addition, the functional units in the various embodiments of the present application may be integrated into a single processing unit, or each unit may exist physically separately, or two or more units may be integrated into a single unit. The aforementioned integrated units may be implemented in the form of hardware or software functional units.

所述集成的单元如果以软件功能单元的形式实现并作为独立的产品销售或使用时,可以存储在一个计算机可读取存储介质中。基于这样的理解,本申请的技术方案本质上或者说对现有技术做出贡献的部分或者该技术方案的全部或部分可以以软件产品的形式体现出来,该计算机软件产品存储在一个存储介质中,包括若干指令用以使得一台计算机设备(可以是个人计算机,服务器,或者网络设备等)或处理器执行本申请各个实施例所述方法的全部或部分步骤。而前述的存储介质包括:快闪存储器、移动硬盘、只读存储器、随机存取存储器、磁碟或者光盘等各种可以存储程序代码的介质。If the integrated unit is implemented in the form of a software functional unit and sold or used as an independent product, it can be stored in a computer-readable storage medium. Based on this understanding, the technical solution of the present application is essentially or the part that contributes to the prior art or all or part of the technical solution can be embodied in the form of a software product, and the computer software product is stored in a storage medium, including a number of instructions for enabling a computer device (which can be a personal computer, server, or network device, etc.) or a processor to execute all or part of the steps of the method described in each embodiment of the present application. The aforementioned storage medium includes: various media that can store program codes, such as flash memory, mobile hard disk, read-only memory, random access memory, magnetic disk or optical disk.

以上所述,仅为本申请的具体实施方式,但本申请的保护范围并不局限于此,任何在本申请揭露的技术范围内的变化或替换,都应涵盖在本申请的保护范围之内。因此,本申请的保护范围应以所述权利要求的保护范围为准。The above is only a specific embodiment of the present application, but the scope of protection of this application is not limited to this. Any changes or substitutions within the technical scope disclosed in this application should be included in the scope of protection of this application. Therefore, the scope of protection of this application should be based on the scope of protection of the claims.

Claims (19)

一种计算集群,其特征在于,包括:A computing cluster, comprising: 多个第一计算节点,用于执行N个第一任务,所述N个第一任务用于处理待处理数据的N个数据分片,执行每个第一任务后生成M个第一结果,且分别被标识为A0~AM-1;A plurality of first computing nodes, configured to execute N first tasks, wherein the N first tasks are configured to process N data shards of to-be-processed data, and generate M first results after executing each first task, and the results are identified as A0 to AM-1, respectively; 多个第一数据处理单元,每个第一计算节点连接一个第一数据处理单元;每个第一数据处理单元,用于执行M个第二任务,所述M个第二任务中的每个第二任务用于对所述第一数据处理单元所连接的第一计算节点中的具有相同标识的第一结果进行处理,处理后生成第二结果,执行M个第二任务后生成M个第二结果,所述M个第二结果被标识为B0~BM-1;a plurality of first data processing units, each first computing node being connected to a first data processing unit; each first data processing unit being configured to execute M second tasks, each of the M second tasks being configured to process a first result having the same identifier in the first computing node to which the first data processing unit is connected, and generate a second result after the processing; after executing the M second tasks, M second results are generated, and the M second results are identified as B0 to BM-1; 多个第二计算节点,用于执行M个第三任务,每个第三任务用于对所述多个第一数据处理单元中的具有相同标识的第二结果进行处理,生成第三结果,执行M个第三任务后生成M个第三结果。Multiple second computing nodes are used to execute M third tasks, each third task is used to process the second results with the same identifier in the multiple first data processing units to generate a third result, and M third results are generated after executing M third tasks. 根据权利要求1所述的计算集群,其特征在于,The computing cluster according to claim 1, characterized in that 所述第一任务为map任务,所述第二任务及第三任务为reduce任务。The first task is a map task, and the second and third tasks are reduce tasks. 根据权利要求1或2所述的计算集群,其特征在于,The computing cluster according to claim 1 or 2, characterized in that: 所述多个第一计算节点中的第三计算节点,还用于发送第一请求至所连接的第一数据处理单元,以指示所述第一数据处理单元执行所述M个第二任务。The third computing node among the multiple first computing nodes is further configured to send a first request to the connected first data processing unit to instruct the first data processing unit to execute the M second tasks. 根据权利要求3所述的计算集群,其特征在于,The computing cluster according to claim 3, characterized in that 所述第一请求中携带所述第二任务执行过程中的待调用算子的算子标识以及所述M个第一结果的存储位置信息。The first request carries the operator identifier of the operator to be called during the execution of the second task and the storage location information of the M first results. 根据权利要求1至4任一项所述的计算集群,其特征在于,还包括:多个第二数据处理单元,每个第二计算节点连接一个第二数据处理单元;The computing cluster according to any one of claims 1 to 4, further comprising: a plurality of second data processing units, each second computing node being connected to a second data processing unit; 所述多个第二计算节点中的第四计算节点,还用于发送第二请求至所连接的第二数据处理单元,以指示所述第二数据处理单元执行所述M个第三任务;所述第二请求中携带所述第三任务执行过程中的待调用算子的算子标识以及所述M个第二结果的存储位置信息。The fourth computing node among the multiple second computing nodes is also used to send a second request to the connected second data processing unit to instruct the second data processing unit to execute the M third tasks; the second request carries the operator identifier of the operator to be called during the execution of the third task and the storage location information of the M second results. 根据权利要求1至5任一项所述的计算集群,其特征在于,The computing cluster according to any one of claims 1 to 5, characterized in that: 所述多个第一数据处理单元中的第三数据处理单元,还用于发送通知至所述多个第二计算节点中的第五计算节点,以指示所述M个第二任务执行结束。The third data processing unit among the multiple first data processing units is further configured to send a notification to a fifth computing node among the multiple second computing nodes to indicate that the execution of the M second tasks has ended. 根据权利要求1至6任一项所述的计算集群,其特征在于,The computing cluster according to any one of claims 1 to 6, characterized in that: 所述多个第一数据处理单元中的第四数据处理单元,还用于接收第三请求,以指示更新所述第四数据处理单元待调用的算子;所述第三请求中携带待更新算子的算子逻辑微码;并加载所述待更新算子的算子逻辑微码。The fourth data processing unit among the multiple first data processing units is also used to receive a third request to instruct the update of the operator to be called by the fourth data processing unit; the third request carries the operator logic microcode of the operator to be updated; and loads the operator logic microcode of the operator to be updated. 根据权利要求7所述的计算集群,其特征在于,The computing cluster according to claim 7, characterized in that 所述待更新算子是自定义的算子。The operator to be updated is a user-defined operator. 一种数据处理方法,其特征在于,应用于计算集群,所述计算集群包括多个第一计算节点、多个第一数据处理单元以及多个第二计算节点,其中,每个第一计算节点连接一个第一数据处理单元;所述方法包括:A data processing method, characterized by being applied to a computing cluster, wherein the computing cluster includes a plurality of first computing nodes, a plurality of first data processing units, and a plurality of second computing nodes, wherein each first computing node is connected to a first data processing unit; the method comprises: 多个第一计算节点执行N个第一任务,执行每个第一任务后生成M个第一结果,且分别被标识为A0~AM-1;所述N个第一任务用于处理待处理数据的N个数据分片;The plurality of first computing nodes execute N first tasks, and generate M first results after executing each first task, and the results are respectively identified as A0 to AM-1; the N first tasks are used to process N data slices of the data to be processed; 每个第一数据处理单元执行M个第二任务,执行M个第二任务后生成M个第二结果,所述M个第二结果被标识为B0~BM-1;所述M个第二任务中的每个第二任务用于对所述第一数据处理单元所连接的第一计算节点中的具有相同标识的第一结果进行处理,处理后生成所述第二结果;Each first data processing unit executes M second tasks, and generates M second results after executing the M second tasks, wherein the M second results are identified as B0 to BM-1; each of the M second tasks is used to process the first result with the same identification in the first computing node connected to the first data processing unit, and generate the second result after processing; 多个第二计算节点执行M个第三任务,执行M个第三任务后生成M个第三结果;每个第三任务用于对所述多个第一数据处理单元中的具有相同标识的第二结果进行处理,生成所述第三结果。Multiple second computing nodes execute M third tasks, and generate M third results after executing the M third tasks; each third task is used to process the second results with the same identifier in the multiple first data processing units to generate the third result. 根据权利要求9所述的方法,其特征在于,The method according to claim 9, characterized in that 所述第一任务为map任务,所述第二任务及第三任务为reduce任务。The first task is a map task, and the second and third tasks are reduce tasks. 根据权利要求9或10所述的方法,其特征在于,所述方法还包括:The method according to claim 9 or 10, characterized in that the method further comprises: 所述多个第一计算节点中的第三计算节点发送第一请求至所连接的第一数据处理单元,以指示所述第一数据处理单元执行所述M个第二任务。A third computing node among the plurality of first computing nodes sends a first request to the connected first data processing unit to instruct the first data processing unit to execute the M second tasks. 根据权利要求11所述的方法,其特征在于,The method according to claim 11, characterized in that 所述第一请求中携带所述第二任务执行过程中的待调用算子的算子标识以及所述M个第一结果的存储位置信息。The first request carries the operator identifier of the operator to be called during the execution of the second task and the storage location information of the M first results. 根据权利要求9至12任一项所述的方法,其特征在于,所述计算集群还包括:多个第二数据处理单元,每个第二计算节点连接一个第二数据处理单元;所述方法还包括:The method according to any one of claims 9 to 12, wherein the computing cluster further comprises: a plurality of second data processing units, each second computing node being connected to a second data processing unit; the method further comprises: 所述多个第二计算节点中的第四计算节点发送第二请求至所连接的第二数据处理单元,以指示所述第二数据处理单元执行所述M个第三任务;所述第二请求中携带所述第三任务执行过程中的待调用算子的算子标识以及所述M个第二结果的存储位置信息。The fourth computing node among the multiple second computing nodes sends a second request to the connected second data processing unit to instruct the second data processing unit to execute the M third tasks; the second request carries the operator identifier of the operator to be called during the execution of the third task and the storage location information of the M second results. 根据权利要求9至13任一项所述的方法,其特征在于,所述方法还包括:The method according to any one of claims 9 to 13, further comprising: 所述多个第一数据处理单元中的第三数据处理单元发送通知至所述多个第二计算节点中的第五计算节点,以指示所述M个第二任务执行结束。The third data processing unit among the plurality of first data processing units sends a notification to a fifth computing node among the plurality of second computing nodes to indicate that the execution of the M second tasks is completed. 根据权利要求9至14任一项所述的方法,其特征在于,所述方法还包括:The method according to any one of claims 9 to 14, further comprising: 所述多个第一数据处理单元中的第四数据处理单元接收第三请求,以指示更新所述第四数据处理单元待调用的算子,所述第三请求中携带待更新算子的算子逻辑微码;A fourth data processing unit among the plurality of first data processing units receives a third request to instruct an update of an operator to be called by the fourth data processing unit, wherein the third request carries an operator logic microcode of the operator to be updated; 所述第四数据处理单元加载所述待更新算子的算子逻辑微码。The fourth data processing unit loads the operator logic microcode of the operator to be updated. 根据权利要求15所述的方法,其特征在于,The method according to claim 15, characterized in that 所述待更新算子是自定义的算子。The operator to be updated is a user-defined operator. 一种数据处理装置,其特征在于,包括存储器和与所述存储器连接的至少一个处理器,所述存储器用于存储计算机程序代码,所述计算机程序代码包括计算机指令,当计算机指令被至少一个处理器执行时,使得所述数据处理装置执行如权利要求9至16任一项所述的方法。A data processing device, characterized in that it includes a memory and at least one processor connected to the memory, the memory is used to store computer program code, and the computer program code includes computer instructions. When the computer instructions are executed by at least one processor, the data processing device executes the method according to any one of claims 9 to 16. 一种计算机可读存储介质,其特征在于,存储有计算机指令,所述计算机指令在计算机上运行时,执行如权利要求9至16任一项所述的方法。A computer-readable storage medium, characterized in that computer instructions are stored therein, and when the computer instructions are executed on a computer, the method according to any one of claims 9 to 16 is executed. 一种计算机程序产品,其特征在于,包含指令,当所述指令被计算设备运行时,使得所述计算设备执行如权利要求9至16任一项所述的方法。A computer program product, characterized in that it contains instructions, which, when executed by a computing device, cause the computing device to perform the method according to any one of claims 9 to 16.
PCT/CN2024/141155 2024-02-01 2024-12-20 Data processing method and apparatus, and computing cluster Pending WO2025161762A1 (en)

Applications Claiming Priority (2)

Application Number Priority Date Filing Date Title
CN202410147754.3 2024-02-01
CN202410147754.3A CN120407673A (en) 2024-02-01 2024-02-01 Data processing method, device and computing cluster

Publications (1)

Publication Number Publication Date
WO2025161762A1 true WO2025161762A1 (en) 2025-08-07

Family

ID=96507095

Family Applications (1)

Application Number Title Priority Date Filing Date
PCT/CN2024/141155 Pending WO2025161762A1 (en) 2024-02-01 2024-12-20 Data processing method and apparatus, and computing cluster

Country Status (2)

Country Link
CN (1) CN120407673A (en)
WO (1) WO2025161762A1 (en)

Citations (5)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN101770402A (en) * 2008-12-29 2010-07-07 中国移动通信集团公司 Map task scheduling method, equipment and system in MapReduce system
CN102467570A (en) * 2010-11-17 2012-05-23 日电(中国)有限公司 Connection query system and method for distributed data warehouse
US20180089258A1 (en) * 2016-09-26 2018-03-29 Splunk Inc. Resource allocation for multiple datasets
US20190012466A1 (en) * 2017-07-10 2019-01-10 Burstiq Analytics Corporation Secure adaptive data storage platform
CN116360691A (en) * 2023-02-27 2023-06-30 深圳华为云计算技术有限公司 Data processing method and device

Patent Citations (5)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN101770402A (en) * 2008-12-29 2010-07-07 中国移动通信集团公司 Map task scheduling method, equipment and system in MapReduce system
CN102467570A (en) * 2010-11-17 2012-05-23 日电(中国)有限公司 Connection query system and method for distributed data warehouse
US20180089258A1 (en) * 2016-09-26 2018-03-29 Splunk Inc. Resource allocation for multiple datasets
US20190012466A1 (en) * 2017-07-10 2019-01-10 Burstiq Analytics Corporation Secure adaptive data storage platform
CN116360691A (en) * 2023-02-27 2023-06-30 深圳华为云计算技术有限公司 Data processing method and device

Also Published As

Publication number Publication date
CN120407673A (en) 2025-08-01

Similar Documents

Publication Publication Date Title
CN113641457B (en) Container creation method, device, apparatus, medium, and program product
CN100533387C (en) System and method for executing job step
EP4052126B1 (en) Management of multiple physical function non-volatile memory devices
US8874638B2 (en) Interactive analytics processing
CN107729139A (en) A kind of method and apparatus for concurrently obtaining resource
CN115269196A (en) Thread pool dynamic creation method, device, equipment and storage medium
CN113934437B (en) Method, system and client cloud phone for installing applications on cloud phone
WO2024037629A1 (en) Data integration method and apparatus for blockchain, and computer device and storage medium
CN114625479A (en) Cloud edge collaborative application management method in edge computing and corresponding device
CN114490062A (en) A local disk scheduling method, device, electronic device and storage medium
CN106302780A (en) The method of cluster device bulk transfer data, Apparatus and system, server
US11929926B2 (en) Traffic service threads for large pools of network addresses
KR20120115398A (en) System and method of controlling power in an electronic device
CN117076096A (en) Task flow execution method and device, computer readable medium and electronic equipment
CN110162410A (en) A kind of message treatment method and device
CN111767126A (en) System and method for distributed batch processing
WO2025161762A1 (en) Data processing method and apparatus, and computing cluster
JP7342089B2 (en) Computer systems and computer system scale-out methods
CN111143033A (en) Operation execution method and device based on scalable operating system
CN118210618A (en) Storage resource management method, device, electronic device and storage medium
CN116301610A (en) A data processing method and related equipment
CN108833532A (en) Service processing method, device and system based on internet of things
CN114995748A (en) Request processing method and device
CN114726657A (en) Method and device for interrupt management and data receiving and sending management and intelligent network card
CN117666921A (en) Data processing methods, accelerators and computing devices

Legal Events

Date Code Title Description
121 Ep: the epo has been informed by wipo that ep was designated in this application

Ref document number: 24921784

Country of ref document: EP

Kind code of ref document: A1