[go: up one dir, main page]

WO2025118656A1 - Task scheduling method and related device - Google Patents

Task scheduling method and related device Download PDF

Info

Publication number
WO2025118656A1
WO2025118656A1 PCT/CN2024/109813 CN2024109813W WO2025118656A1 WO 2025118656 A1 WO2025118656 A1 WO 2025118656A1 CN 2024109813 W CN2024109813 W CN 2024109813W WO 2025118656 A1 WO2025118656 A1 WO 2025118656A1
Authority
WO
WIPO (PCT)
Prior art keywords
task
slice
data
scheduling
execution node
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Pending
Application number
PCT/CN2024/109813
Other languages
French (fr)
Chinese (zh)
Inventor
何代义
阮俊波
杨鹏波
李响
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Huawei Cloud Computing Technologies Co Ltd
Original Assignee
Huawei Cloud Computing Technologies Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Huawei Cloud Computing Technologies Co Ltd filed Critical Huawei Cloud Computing Technologies Co Ltd
Publication of WO2025118656A1 publication Critical patent/WO2025118656A1/en
Pending legal-status Critical Current
Anticipated expiration legal-status Critical

Links

Classifications

    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/48Program initiating; Program switching, e.g. by interrupt
    • G06F9/4806Task transfer initiation or dispatching
    • G06F9/4843Task transfer initiation or dispatching by program, e.g. task dispatcher, supervisor, operating system
    • G06F9/4881Scheduling strategies for dispatcher, e.g. round robin, multi-level priority queues
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/48Program initiating; Program switching, e.g. by interrupt
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/50Allocation of resources, e.g. of the central processing unit [CPU]
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/50Allocation of resources, e.g. of the central processing unit [CPU]
    • G06F9/5083Techniques for rebalancing the load in a distributed system
    • G06F9/5088Techniques for rebalancing the load in a distributed system involving task migration

Definitions

  • the present application relates to the field of computer technology, and in particular to a task scheduling method, a task scheduling system, a computing device cluster, a computer-readable storage medium, and a computer program product.
  • Distributed task scheduling is a task scheduling that runs in a distributed cluster environment.
  • the scheduling node schedules tasks to the execution nodes in the distributed cluster for processing.
  • the requirements for task scheduling strategies are getting higher and higher. How to schedule tasks more accurately, achieve load balancing of cluster resources, and improve the utilization of computing resources requires further exploration.
  • the scheduling node schedules the task to the execution node. After the task starts running, it may cause insufficient node resources, which in turn leads to the failure of running the task on the node or the risk of the entire node crashing.
  • the present application provides a task scheduling method, which can adaptively adjust the number of task slices according to the load of the running task or task slice, and then dynamically adjust the resource usage of the task, so that the overall cluster resources can achieve load balancing, avoid the execution node being occupied by a single task or task slice, causing the risk of task failure or node downtime, and improve reliability and availability.
  • the present application also provides a scheduling system, a computing device cluster, a computer-readable storage medium, and a computer program product corresponding to the above method.
  • the present application provides a task scheduling method.
  • the method can be performed by a scheduling system.
  • the scheduling system can be a distributed task scheduling system for scheduling tasks to a distributed cluster for processing by the distributed cluster.
  • the scheduling system can be a software system, and the software system can be an independent software system, such as an independent scheduling engine, or integrated into other software, such as integrated into other software in the form of plug-ins, applets, etc.
  • the software system can be provided to the user in the form of a code package, which can be deployed by the user himself, or provided to the user in the form of a cloud service.
  • the software system can be deployed in a computing device cluster, and the computing device cluster executes the program code of the software system, thereby executing the task scheduling method of the present application.
  • the task scheduling system can also be a hardware system, such as a computing device cluster with scheduling capabilities. When the hardware system is running, the task scheduling method of the present application is executed.
  • the scheduling system receives the task, and schedules the task or the task slice of the task to the first execution node in the distributed cluster, and the first execution node is at least one execution node in the distributed cluster.
  • the scheduling system receives a slice adjustment request sent by the first execution node.
  • the slice adjustment request is used to request to increase the task slice of the task or reduce the task slice of the task, and the slice adjustment request is generated by the first execution node according to the load of the task or the task slice on the first execution node.
  • the scheduling system adjusts the task slice of the task in the distributed cluster according to the slice adjustment request.
  • adjusting the task slice of the task includes: increasing the task slice of the task in the second execution node, or reducing the task slice of the task in the first execution node.
  • This method can adaptively adjust the number of task slices according to the load of the running tasks or task slices, and then dynamically adjust the resource usage of the tasks, so that the overall cluster resources can be load balanced, avoiding the execution nodes being occupied by a single task or task slice, causing the risk of task failure or node downtime, and improving reliability and availability.
  • the slice adjustment request when the load of the task or task slice on the first execution node is greater than a first threshold, the slice adjustment request is used to request to increase the task slice of the task; or, when the load of the target task slice of the task on the first execution node is less than a second threshold, the slice adjustment request is used to reduce the target task slice of the task.
  • the number of task slices can be adaptively adjusted according to the load size of the task or task slice, and the resource usage of the task can be dynamically adjusted to achieve load balancing of cluster resources.
  • the scheduling system includes a scheduling node and a data module.
  • the data module detects that the task slice of the task is adjusted, the processing data of the task is re-sliced to obtain at least one data slice, and the at least one data slice corresponds to the adjusted task slice one by one.
  • the method re-slices the task processing data when it detects that the task slice has been adjusted, for example, re-load balances the processing data, thereby reducing the resource load occupied by tasks or task slices with heavier loads, thereby achieving the purpose of relatively balanced overall resources.
  • the data module can re-slice the task processing data to obtain at least one data slice through load balancing according to the adjusted number of task slices.
  • the data module re-slices the processing data through load balancing to ensure that the amount of data pulled by each task slice is relatively balanced, thereby achieving relative balance of overall resources.
  • the data module can also record the progress of pulling the processing data of the task. Accordingly, the data module can determine the remaining data based on the progress of pulling the processing data of the task, and then the data module can re-shard the remaining data to obtain at least one data shard. In this way, it can be ensured that when the task shard changes, the processing data can be completely pulled and the uniqueness and integrity of the data can be guaranteed.
  • the data module can also send a heartbeat message to the task slice, which is used to detect the activity of the task slice. Then, the data module adjusts the data slice according to the activity of the task slice. In this way, the status of the task slice can be dynamically monitored, and then the linkage adjustment of the data slice and the task slice can be realized to ensure the balance of the overall resources.
  • tasks or task slices are assembled in a data body.
  • the data body records the load of the running task or task slice and the identification of the task or task slice.
  • the method records the load occupancy of the running task, and can quickly make a decision on the resource load occupied by each running task, providing a reference for whether to reduce the task.
  • the task includes a log processing task.
  • the scheduling system comprises:
  • a scheduling node used for receiving a scheduling task, scheduling the task or the task slice of the task to a first execution node in a distributed cluster, the first execution node being at least one execution node in the distributed cluster, and receiving a slice adjustment request sent by the first execution node during the process of the first execution node executing the task or the task slice, the slice adjustment request being used to request to increase the task slice of the task or reduce the task slice of the task, the slice adjustment request being generated by the first execution node according to the load of the task or the task slice of the task on the first execution node;
  • the scheduling node is further used to adjust the task slice of the task in the distributed cluster according to the slice adjustment request;
  • the adjusting of the task slices of the task includes: increasing the task slices of the task in the second execution node, or reducing the task slices of the task in the first execution node.
  • the slice adjustment request when the load of the task or the task slice of the task on the first execution node is greater than a first threshold, the slice adjustment request is used to request to increase the task slice of the task; or, when the load of the target task slice of the task on the first execution node is less than a second threshold, the slice adjustment request is used to reduce the target task slice of the task.
  • the scheduling system includes the scheduling node and a data module, and the data module is specifically used to:
  • the processed data of the task is re-sliced to obtain at least one data slice, and the at least one data slice corresponds to the adjusted task slice one by one.
  • the data module is specifically used to:
  • the processing data of the task is re-sliced to obtain at least one data slice through load balancing.
  • the data module is further used to:
  • the data module is specifically used for:
  • the remaining data is re-sharded to obtain at least one data shard.
  • the data module is further used to:
  • the data shards are adjusted according to the activity of the task shards.
  • the task or the task slice is assembled in a data body, and the data body records the load of the running task or the task slice and the identification of the task or the task slice.
  • the task includes a log processing task.
  • the log processing task can be to read the original log from the corresponding log source according to the user's task configuration information, process it according to the rules expected by the user, and then write the result to the expected target. It should be noted that the task can also be other tasks with uncertain task demand resource size or large changes in the resources occupied by running tasks.
  • This method can be applied to task scheduling scenarios of different tasks and has high availability.
  • the present application provides a computing device cluster.
  • the computing device cluster includes at least one computing device, and the at least one computing device includes at least one processor and at least one memory.
  • the at least one processor and the at least one memory communicate with each other.
  • the at least one processor is used to execute instructions stored in the at least one memory, so that the computing device or the computing device cluster executes the task scheduling method described in the first aspect or any implementation of the first aspect.
  • the present application provides a computer-readable storage medium, wherein the computer-readable storage medium stores instructions, wherein the instructions instruct a computing device or a computing device cluster to execute the task scheduling method described in the first aspect or any one of the implementations of the first aspect.
  • the present application provides a computer program product comprising instructions, which, when executed on a computing device or a computing device cluster, enables the computing device or the computing device cluster to execute the task scheduling method described in the first aspect or any one of the implementations of the first aspect.
  • FIG1 is a schematic diagram of a task scheduling process provided by the present application.
  • FIG2 is a schematic diagram of the structure of a scheduling system provided by the present application.
  • FIG3 is a flow chart of a task scheduling method provided by the present application.
  • FIG4 is a schematic diagram of a task slice adaptive adjustment provided by the present application.
  • FIG5 is a schematic diagram of adaptive adjustment of data slicing with task slicing provided by the present application.
  • FIG6 is a schematic diagram of a process flow of adaptive adjustment of task slices provided by the present application.
  • FIG7 is a schematic diagram of a task scheduling method provided by the present application applied to a log processing scenario
  • FIG8 is a schematic diagram of the structure of a scheduling node provided by the present application.
  • FIG9 is a schematic diagram of the structure of a computing device provided by the present application.
  • FIG10 is a schematic diagram of the structure of a computing device cluster provided by the present application.
  • FIG11 is a schematic diagram of the structure of another computing device cluster provided by the present application.
  • FIG. 12 is a schematic diagram of the structure of another computing device cluster provided in the present application.
  • first and second in the embodiments of the present application are used for descriptive purposes only and should not be understood as indicating or implying relative importance or implicitly indicating the number of the indicated technical features. Therefore, the features defined as “first” and “second” may explicitly or implicitly include one or more of the features.
  • Distributed task scheduling is a task scheduling that runs in a distributed cluster environment.
  • Distributed means that different businesses are split and deployed according to the microservice architecture style
  • cluster means that the same business split according to the microservice architecture style is deployed on different nodes.
  • a distributed cluster refers to the cluster setting of each node in a distributed system, for example, cluster deployment of different split businesses.
  • the scheduling node can schedule tasks to the execution node in the distributed cluster for operation and processing.
  • the scheduling node can also be called the master node, which is responsible for task scheduling, specifically scheduling tasks to reasonable execution nodes according to reasonable scheduling strategies.
  • the execution node can also be called the worker node, which is the node that actually runs the task and completes task processing according to the task logic written by the user.
  • the scheduling node schedules the task to the execution node. After the task starts running, it may lead to insufficient node resources, which in turn leads to the failure of running the task on the node or the risk of the entire node crashing.
  • MapReduce MapReduce
  • the task center includes task 1 (task 1) and task 2 (task 2), and the scheduling center includes multiple master nodes, such as master1 and master2.
  • the scheduling center is used to schedule tasks to task executors for execution.
  • the task executors include multiple working nodes, such as worker1 to worker3.
  • the master node of the scheduling center can monitor the load status of the cluster (a cluster formed by working nodes) and dynamically monitor the information during the execution of each working node.
  • the master node can decide whether to split the tasks according to the current load of the cluster. In this example, the master node can decide to split the tasks and send them to different working nodes for processing.
  • task1 can be split into task 1map and task1 reduce-1, task1 reduce 2, and dispatched to worker1 to worker3 respectively.
  • a MapReduce job usually divides the input data set into several independent data blocks, and the map task processes the data blocks in a completely parallel manner. The output of the map task can be used as the input of the reduce task.
  • the task scheduling method based on MapReduce can achieve the effect of dynamic sharding.
  • the above task sharding can make better use of cluster resources, but in extreme scenarios, there is still a situation where the reduce task will occupy all the worker resources during processing, resulting in task execution failure or node downtime.
  • the above distributed scheduling methods only assign tasks to specific executors for execution when the tasks are issued, and will not pay attention to the subsequent workload of each executor. Even if the sharding strategy interface is opened for user customization, it is still inevitable that the entire logic is determined when the task is issued and cannot be dynamically adapted.
  • the scheduling engine will still allocate it to each executor for execution according to the static sharding or MapReduce dynamic sharding method without being able to identify the subsequent resource occupancy of the task. There is a risk that the resources of the entire executor will be fully occupied, resulting in the failure of other tasks to execute or the entire node to crash.
  • the present application provides a task scheduling method.
  • the method can be executed by a scheduling system.
  • the scheduling system can be a distributed task scheduling system.
  • the task scheduling system is suitable for scheduling scenarios where the required resource size and the number of shards are uncertain at the beginning of scheduling, there are a large number of batch tasks or the resource size required for the running state changes over time, and there is a long-term stable task scenario where a single task will occupy the execution node resources.
  • the scheduling system can be a software system, and the software system can be an independent software system, such as an independent scheduling engine, or integrated into other software, such as in the form of plug-ins, applets, etc.
  • the software system can be provided to the user in the form of a code package, deployed by the user, or provided to the user in the form of a cloud service.
  • the software system can be deployed in a computing device cluster, and the computing device cluster executes the program code of the software system, thereby executing the task scheduling method of the present application.
  • the task scheduling system can also be a hardware system, such as a computing device cluster with scheduling capabilities. When the hardware system is running, the task scheduling method of the present application is executed.
  • the scheduling system can receive a task, and schedule the task or the task slice of the task to the first execution node in the distributed cluster.
  • the first execution node is at least one execution node in the distributed cluster.
  • the scheduling system receives a slice adjustment request sent by the first execution node.
  • the slice adjustment request is used to request to increase the task slice of the task or reduce the task slice of the task.
  • the slice adjustment request is generated by the first execution node according to the load of the task or the task slice of the task on the first execution node.
  • the scheduling system adjusts the task slice of the task in the distributed cluster according to the slice adjustment request.
  • adjusting the task slice of the task includes increasing the task slice of the task in the second execution node, or reducing the task slice of the task in the first execution node.
  • This method can adaptively adjust the number of task slices according to the load of the running tasks or task slices, and then dynamically adjust the resource usage of the tasks, so that the overall cluster resources can be load balanced, avoiding the execution nodes being occupied by a single task or task slice, causing the risk of task failure or node downtime, and improving reliability and availability.
  • the scheduling system 10 includes a scheduling node 100.
  • the scheduling system 10 may include multiple scheduling nodes 100, and the scheduling node 100 may be a master. Among them, the scheduling nodes may form a scheduling center.
  • the scheduling system 10 is connected to the task center 20 and the task executor 30 respectively, and the task executor 30 may include a distributed cluster formed by multiple execution nodes.
  • the scheduling system 10 is used to schedule the tasks of the task center 20 to the execution nodes in the task executor 30 for execution. Among them, the execution node may be a working node worker. Further, the scheduling system 10 may also include a data module 200.
  • the scheduling node 100 is used to receive tasks, and schedule the tasks or task slices of the tasks to the first execution node in the distributed cluster.
  • the first execution node is at least one execution node in the distributed cluster.
  • the scheduling node 100 receives a slice adjustment request sent by the first execution node.
  • the slice adjustment request is used to request to increase the task slices of the task or reduce the task slices of the task.
  • the slice adjustment request is generated by the first execution node according to the load of the task or the task slice of the task on the first execution node.
  • the slice adjustment request may include an adjustment type and an identifier of the adjustment object. The adjustment type may be to increase the task slices or reduce the task slices.
  • the identifier of the adjustment object may include at least one of the task identifier or the identifier of the task slice.
  • the slice adjustment request may include the load of the task or the task slice.
  • the scheduling node is also used to adjust the task slice of the task in the distributed cluster according to the slice adjustment request. Wherein, adjusting the task slice of the task includes adding the task slice of the task in the second execution node, or reducing the task slice of the task in the first execution node.
  • the data module 200 stores data required for executing tasks, which are also called task processing data. As shown in FIG2 , the data module 200 can store data 1 (such as data 1) required for executing task 1 and data 2 (such as data 2) required for executing task 2. Tasks or task slices running on the execution node can pull task processing data from the data module 200 to execute the tasks.
  • data 1 such as data 1
  • data 2 such as data 2
  • Tasks or task slices running on the execution node can pull task processing data from the data module 200 to execute the tasks.
  • the first execution node is used to sense the load of the task or task slice, decide whether to adjust the number of task slices based on the load, and then generate a slice adjustment request based on the decision result.
  • the scheduling system can set a threshold for deciding whether to adjust the number of task slices, such as a first threshold or a second threshold.
  • a threshold for deciding whether to adjust the number of task slices such as a first threshold or a second threshold.
  • the slice adjustment request is used to request to increase the task slice of the task.
  • the slice adjustment request is used to reduce the target task slice of the task.
  • the first threshold or the second threshold can be set according to experience, the first threshold can be equal to the second threshold, and it can also be unequal to the second threshold, and this embodiment does not limit this.
  • task 1 when it starts scheduling, it is scheduled by the scheduling node 100 (such as master1) to the first execution node (such as working node 1, denoted as worker1).
  • worker1 senses that the load of task 1 is greater than the first threshold, it can send a shard adjustment request to the scheduling node 100.
  • the shard adjustment request is used to request to add a task shard of task 1 to the second execution node.
  • the second execution node can be an execution node other than the first execution node in the distributed cluster, such as worker3.
  • task 1 can be divided into the following task shards: task 1-1 (task1-1), task 1-2 (task1-2).
  • task1-2 can be a newly added task shard
  • task1-1 is a task shard formed after the original task 1 shares part of the load to task1-2.
  • the data module 200 is used to re-slice the processing data of the task to obtain at least one data slice when it is detected that the task slice of the task is adjusted.
  • the at least one data slice corresponds to the adjusted task slice one by one.
  • data 1 can be re-sliced into 2 data slices. For example, data blocks numbered 0 to 3 in data 1 are divided into one data slice, and data blocks numbered 4 to 7 are divided into another data slice.
  • the task slices may pull data from the corresponding data slices, and then process the task according to the data slices.
  • the results of the execution nodes executing the tasks or task slices may be reported to the scheduling node 100.
  • the present application further provides a task scheduling method.
  • the task scheduling method of the present application is described in detail below in conjunction with an embodiment.
  • the method may be executed by a scheduling system 10, the scheduling system 10 includes a scheduling node 100, and further, the scheduling system 10 may also include a data module 200.
  • the method specifically includes the following steps:
  • S301 The scheduling node 100 receives a task.
  • a task refers to a task that needs to be scheduled by the scheduling node 100 to the execution node for execution.
  • the task is a task or a batch processing task with uncertain resource requirements, uncertain number of shards, and resource size that changes over time in the running state.
  • the task may be a log processing task.
  • the log processing task may be based on the user's task configuration information, read the original log from the corresponding log source, process it according to the rules expected by the user, and then write the result to the expected target.
  • the task may also be other tasks with uncertain resource requirements or tasks with large changes in the resources occupied by running tasks.
  • the scheduling node 100 schedules the task or the task slice of the task to the first execution node in the distributed cluster.
  • the first execution node is at least one execution node in the distributed cluster.
  • the scheduling node 100 may determine the first execution node from the distributed cluster according to the load of the execution node in the distributed cluster. For example, the scheduling node 100 may determine the node with the smallest load or less than a set value as the first execution node. Then, the scheduling node 100 may schedule the task to the above-mentioned first execution node.
  • the scheduling node 100 may also slice the task to obtain multiple task slices.
  • the scheduling node 100 may schedule multiple task slices to different first execution nodes. For example, when the scheduling node 100 receives task1, it may divide task1 into multiple task slices, including task1-1 and task1-2, according to the initial requirements of the task, and schedule task1-1 and task1-2 to different first execution nodes, wherein task1-1 is scheduled to worker1 and task1-2 is scheduled to worker2.
  • the scheduling node 100 receives a slice adjustment request sent by the first execution node.
  • the slicing adjustment request is used to request to increase the task slice of a task or reduce the task slice of a task.
  • the slicing adjustment request is generated by the first execution node according to the load of the task or the task slice of the task on the first execution node.
  • the task or task slice scheduled to the first execution node can pull the processing data corresponding to the task or task slice to execute the task or task slice.
  • the first execution node can monitor the load of the task or task slice, wherein the load of the task or task slice refers to the resource load of the task or task slice.
  • the resource load of the task or task slice can be the sum of the number of processes being processed and waiting to be processed by the processor within a period of time.
  • the first execution node can generate a slicing adjustment request according to the load of the task or task slice, and send the slicing adjustment request to the scheduling node 100. Accordingly, the scheduling node 100 receives the slicing adjustment request sent by the first execution node during the execution of the task or task slice, and can perform subsequent slicing adjustment operations.
  • the first execution node compares the load (such as resource load) of the task or the task slice of the task with a set threshold. Specifically, when the load of the task or the task slice is greater than the first threshold, the first execution node can generate a slice adjustment request for requesting to increase the task slice. When the load of the task slice is less than the second threshold, the first execution node can generate a slice adjustment request for requesting to reduce the task slice.
  • the load such as resource load
  • the scheduling center schedules task1 to working node 1 (such as worker1).
  • Worker1 detects that the load of task1 is greater than the first threshold, and sends a slicing adjustment request to the scheduling center.
  • the slicing adjustment request is used to request to increase the task slicing of task1, for example, to add a task slicing of task1 to working node 3 (such as worker3), such as task 1-2 (task1-2). Accordingly, task1 running on worker1 can become a task slicing of task1, such as task 1-1 (such as task1-1).
  • the scheduling center For task 2 (such as task2), the scheduling center divides task2 into three task slicings, such as task 2-1 (such as task2-1), task 2-2 (such as task2-2), and task 2-3 (such as task2-3), which are respectively scheduled to worker1, working node 2 (such as worker2), and worker3.
  • task 2-1 such as task2-1
  • task 2-2 such as task2-2
  • task 2-3 such as task2-3)
  • worker2 detects that the load of task slice task2-2 executing task2 is less than the second threshold, it can send a slice adjustment request to the scheduling center, where the slice adjustment request is used to request to shrink the task slice task2-2 of task2.
  • a task or a task slice can be assembled in a data body.
  • the data body can be a cube that carries the running task or task slice.
  • the data body can also record the load of the running task or task slice and the identifier of the task or task slice.
  • a task slice is a subtask of a task, based on which the data body can record the load of a task or a subtask and the identifier of a task or a subtask.
  • the first execution node can encapsulate the task or task slice and the identifier of the task, the identifier of the task slice, and the resource load in a cube for each task or task slice scheduled to the execution node.
  • an example is given in Figure 4.
  • worker1 can assemble task1 in a cube and record the identifier of the task (recorded as taskId), the identifier of the task slice (recorded as Sub TaskId), and the load of the task or task slice (recorded as ResourceLoad) in the cube.
  • worker2 can assemble task2-2 in a cube and record the identifier of task2, the identifier of task2-2, and the load of task2-2 in the cube.
  • the scheduling node 100 may receive a slice adjustment request generated by the first execution node according to the load of the task or task slice.
  • the slice adjustment request may include an adjustment type and an identifier of an adjustment object, the adjustment type may be to increase a task slice or to reduce a task slice, and the identifier of the adjustment object may include at least one of a task identifier or an identifier of a task slice.
  • the slice adjustment request may include the load of the task or task slice.
  • the scheduling node 100 adjusts the task slices of the task in the distributed cluster according to the slice adjustment request.
  • adjusting the task slice of the task includes increasing the task slice of the task in the second execution node, or reducing the task slice of the task in the first execution node.
  • the second execution node may be an execution node other than the first execution node in the distributed cluster.
  • the slicing adjustment request indicates an adjustment type or an adjustment object.
  • the scheduling node can add the task slicing of the task in the second execution node according to the identifier of the task in the slicing adjustment request.
  • the scheduling node can determine the second execution node in a similar manner to determining the first execution node.
  • the scheduling node can obtain the load of each execution node in the distributed cluster and determine the second execution node according to the load of the execution node.
  • the second execution node can be the execution node with the smallest load among the other execution nodes in the distributed cluster except the first execution node, or the execution node with a load less than a set value.
  • the scheduling node can reduce the target task slicing in the first execution node according to the identifier of the target task slicing, such as the identifier of the target task slicing in the slicing adjustment request.
  • the data module 200 stores the task processing data.
  • the data module 200 can slice the task processing data according to the number of slices of the task to obtain at least one data slice.
  • the newly added task slice can be automatically registered to the data module 200.
  • 200 can detect that the task slice of the task has been adjusted, and the data module 200 can re-slice the processing data of the task to obtain at least one data slice.
  • task1 includes the following task slices task1-1 and task1-2.
  • the scheduling node 100 adds a task slice task1-3
  • the task slice can be automatically registered to the data module 200.
  • the data module 200 detects the increase in task slices and can re-slice the processing data of the task from 2 data slices to 3 data slices. For example, the data block numbered 3 in data slice 1 and the data blocks numbered 4 and 5 in data slice 2 can be stripped from the original data slices and merged into a new data slice.
  • the processing data of the task can be stored in shards.
  • Shard can support operations such as indexing and data query.
  • the data module 200 detects a newly added task shard, it performs a rebalance operation to re-shard the shard to ensure that the amount of data pulled by each task shard is relatively balanced.
  • the data module 200 may also record the progress of pulling the processing data of the task, such as the progress of pulling the data shards. Accordingly, when the data module 200 re-shards the processing data of the task, it may determine the remaining data according to the progress of pulling the processing data of the task, and then re-shard the remaining data to obtain at least one data shard. By recording the progress of pulling each data shard (such as shard), this method can ensure that data pulling is not repeated.
  • the data module 200 can also send a heartbeat message to the task slice, and the heartbeat message is used to detect the activity of the task slice.
  • the activity of the task slice is used to characterize the current state of the task slice, such as a normal operating state or an inactivated state.
  • the data module 200 can also adjust the data slice according to the activity of the task slice. For example, if the data module 200 detects that the task slice is inactivated, the data slice can be readjusted to reduce the number of data slices. In this method, a heartbeat is maintained between the data module 200 and the task slice, and the shard assigned to the task slice is guaranteed to be updated in real time through monitoring to ensure the integrity of data pulling.
  • the present application provides a task scheduling method.
  • the method receives a task and schedules the task or the task slice of the task to the first execution node in the distributed cluster.
  • the first execution node receives a slice adjustment request generated and sent according to the load of the task or the task slice on the node, and adjusts the task slice of the task in the distributed cluster according to the slice adjustment request, thereby monitoring the load of the running task or the task slice, adaptively adjusting the number of tasks or task slices according to the load of the task or the task slice, and then dynamically adjusting the resource usage of the task, so that the overall cluster resources are load balanced, avoiding the execution node being occupied by a single task, causing the risk of task failure or node downtime, and improving reliability and availability.
  • task slice adaptive adjustment can be divided into multiple stages: task scheduling, operation monitoring, and dynamic adjustment.
  • the scheduling node 100 receives new tasks, such as task1 and task2, and can schedule the new tasks to execution nodes with relatively abundant resources according to the scheduling strategy.
  • the scheduling node 100 can schedule task1 to worker1 and task2 to worker2.
  • the worker can determine the resource load of the task. It should be noted that when a task is divided into task slices and scheduled to different workers, the worker can determine the resource load of the task slices.
  • the worker can send a shard adjustment request to the scheduling node 100 to request the addition of task shards based on the monitored task load when the task load exceeds the set threshold, for example, exceeds the first threshold.
  • the newly added task shards can trigger the data module 200 to rebalance.
  • the newly added task shards can pull data from the rebalanced data shards to share the resource load of the existing tasks, so as to reduce the resource occupation of the worker where the existing tasks are located.
  • the worker can send a shard adjustment request to the master to request the reduction of the task shards.
  • the scheduling node 100 can reduce the task shards, and the remaining task shards share the data shards corresponding to the task shards.
  • the present application also provides application scenarios for example illustration.
  • the task may be a log processing task.
  • the user can create a log processing task.
  • the scheduling node 100 can divide task1 into two task slices, specifically task1-1 and task1-2, according to the initial task requirements, and schedule the two task slices, which are respectively allocated to worker1 and worker2.
  • the data of the corresponding log 1 (such as log1 log stream) is pulled.
  • Task 1-1 pulls the log data on shard 0-2, and task 1-2 pulls the log data on shard 3-5.
  • the log data is input into the function module for processing according to the rules set by the user, and the processed log is output.
  • the rules set by the user can be rules based on a domain-specific language (DSL).
  • DSL domain-specific language
  • a domain-specific language is a language used in a specific context in a specific domain, usually optimized for a specific class of problems, and includes a higher-level abstract programming language.
  • DSL can use concepts and rules from a profession or domain to process log data and output processed logs.
  • the scheduling node 100 is automatically requested to increase the shard of task 1 (task1), and the scheduling node 100 schedules the task shard task1-3 to worker3.
  • the log pulling module monitors the task shard corresponding to the newly added log1, and rebalances, assigning 0-1shard to task1-1, 2-3shard to task1-2, and 4-5shard to task1-3, thereby achieving the purpose of reducing the resource load occupied by task1-1 and relatively balancing the overall resources.
  • the scheduling system 10 includes:
  • the scheduling node 100 is used to receive a scheduling task, schedule the task or the task slice of the task to the first execution node in the distributed cluster, the first execution node is at least one execution node in the distributed cluster, and during the process of the first execution node executing the task or the task slice, receive a slice adjustment request sent by the first execution node, the slice adjustment request is used to request to increase the task slice of the task or reduce the task slice of the task, and the slice adjustment request is generated by the first execution node according to the load of the task or the task slice of the task on the first execution node;
  • the scheduling node 100 is further used to adjust the task slices of the task in the distributed cluster according to the slice adjustment request, wherein adjusting the task slices of the task includes: increasing the task slices of the task in the second execution node, or reducing the task slices of the task in the first execution node.
  • the above-mentioned scheduling node 100 can be implemented by software or by hardware.
  • the scheduling node 100 may include at least one computing device, such as a server.
  • the scheduling node 100 may be an application running on a computing device, such as a virtualized application.
  • the scheduling node 100 may include the following functional modules:
  • Interaction module 102 used for receiving scheduling tasks
  • a scheduling module 104 configured to schedule a task or a task slice of a task to a first execution node in a distributed cluster, where the first execution node is at least one execution node in the distributed cluster;
  • the interaction module 102 is further used to receive a slicing adjustment request sent by the first execution node during the process of the first execution node executing the task or the task slice, the slicing adjustment request is used to request to increase the task slice of the task or reduce the task slice of the task, and the slicing adjustment request is generated by the first execution node according to the load of the task or the task slice of the task on the first execution node;
  • the adjustment module 106 is used to adjust the task slices of the task in the distributed cluster according to the slice adjustment request.
  • the interaction module 102, the scheduling module 104 or the adjustment module 106 may be implemented by software or by hardware.
  • the interaction module 102, the scheduling module 104 or the adjustment module 106 may be an application running on a computing device, such as a computing engine.
  • the application may be provided to users through virtualization services.
  • Virtualization services may include virtual machine (VM) services, bare metal server (BMS) services and container services.
  • VM services may be services that use virtualization technology to virtualize virtual machine (VM) resource pools on multiple physical hosts to provide users with VMs on demand for use.
  • BMS services are services that virtualize BMS resource pools on multiple physical hosts to provide users with BMS on demand for use.
  • Container services are services that virtualize container resource pools on multiple physical hosts to provide users with containers on demand for use.
  • VM is a simulated virtual computer, that is, a logical computer.
  • BMS is a high-performance computing service that can be elastically scalable, and its computing performance is no different from that of a traditional physical machine, and it has the characteristics of secure physical isolation.
  • Containers are a kernel virtualization technology that can provide lightweight virtualization to achieve the purpose of isolating user space, processes and resources. It should be understood that the VM service, BMS service and container service in the above-mentioned virtualization services are only specific examples. In actual applications, virtualization services can also be other lightweight or heavyweight virtualization services, which are not specifically limited here.
  • the interaction module 102, the scheduling module 104 or the adjustment module 106 may include at least one computing device, such as a server, etc.
  • the interaction module 102, the scheduling module 104 or the adjustment module 106 may also be implemented by an application-specific integrated circuit (ASIC) or a programmable logic device (PLD).
  • the PLD may be a complex programmable logical device (CPLD), a field-programmable gate array (FPGA), a generic array logic (GAL), or a FPGA. or any combination thereof.
  • a slice adjustment request is used to request an increase in the task slice of the task; or, when the load of a target task slice of a task on the first execution node is less than a second threshold, a slice adjustment request is used to reduce the target task slice of the task.
  • the scheduling system 10 includes a scheduling node 100 and a data module 200, where the data module 200 is specifically used for:
  • the processing data of the task is re-sliced to obtain at least one data slice, and the at least one data slice corresponds to the adjusted task slice one by one.
  • the data module 200 may be implemented by software or by hardware.
  • the data module 200 may be an application running on a computing device, such as a computing engine.
  • the application may be provided to users through virtualization services such as VM services, BMS services, or container services.
  • the data module 200 may include at least one computing device, such as a server.
  • the data module 200 may also be a device implemented by ASIC or PLD.
  • the data module 200 is specifically used to:
  • the task processing data is re-sharded through load balancing to obtain at least one data shard.
  • the data module 200 is further used to:
  • the data module 200 is specifically used for:
  • the remaining data is re-sharded to obtain at least one data shard.
  • the data module 200 is further used to:
  • Heartbeat messages are used to detect the activity of task shards.
  • tasks or task slices are assembled in a data body, and the data body records the load of the running tasks or task slices and the identification of the tasks or task slices.
  • the task includes a log processing task.
  • the present application also provides a computing device 900.
  • the computing device 900 includes: a bus 902, a processor 904, a memory 906, and a communication interface 908.
  • the processor 904, the memory 906, and the communication interface 908 communicate with each other through the bus 902.
  • the computing device 900 can be a server or a terminal device. It should be understood that the present application does not limit the number of processors and memories in the computing device 900.
  • the bus 902 may be a peripheral component interconnect (PCI) bus or an extended industry standard architecture (EISA) bus, etc.
  • the bus may be divided into an address bus, a data bus, a control bus, etc.
  • FIG. 9 is represented by only one line, but does not mean that there is only one bus or one type of bus.
  • the bus 902 may include a path for transmitting information between various components of the computing device 900 (e.g., the memory 906, the processor 904, and the communication interface 908).
  • Processor 904 may include any one or more of a central processing unit (CPU), a graphics processing unit (GPU), a microprocessor (MP) or a digital signal processor (DSP).
  • CPU central processing unit
  • GPU graphics processing unit
  • MP microprocessor
  • DSP digital signal processor
  • the memory 906 may include a volatile memory (volatile memory), such as a random access memory (RAM).
  • the memory 906 may also include a non-volatile memory (non-volatile memory), such as a read-only memory (ROM), a flash memory, a hard disk drive (HDD) or a solid state drive (SSD).
  • the memory 906 stores executable program code, and the processor 904 executes the executable program code to implement the aforementioned task scheduling method.
  • the memory 906 stores instructions for the scheduling system 10 to execute the task scheduling method.
  • the memory 906 may store instructions of the interaction module 102, the scheduling module 104, and the adjustment module 106 in the scheduling node 100. Further, the memory 906 may also store instructions of the data module 200.
  • the communication interface 908 uses a transceiver module such as, but not limited to, a network interface card or a transceiver to implement communication between the computing device 900 and the computing device 900. Communications between other devices or communications networks.
  • a transceiver module such as, but not limited to, a network interface card or a transceiver to implement communication between the computing device 900 and the computing device 900. Communications between other devices or communications networks.
  • the embodiment of the present application also provides a computing device cluster.
  • the computing device cluster includes at least one computing device.
  • the computing device can be a server, such as a central server, an edge server, or a local server in a local data center.
  • the computing device can also be a terminal device such as a desktop computer, a laptop computer, or a smart phone.
  • the computing device cluster includes at least one computing device 900.
  • the memory 906 in one or more computing devices 900 in the computing device cluster may store the same instructions of the scheduling system 10 for executing the task scheduling method.
  • one or more computing devices 900 in the computing device cluster may also be used to execute some instructions of the scheduling system 10 for executing the task scheduling method.
  • a combination of one or more computing devices 900 may jointly execute instructions of the scheduling system 10 for executing the task scheduling method.
  • the memory 906 in different computing devices 900 in the computing device cluster can store different instructions for executing partial functions of the scheduling system 10 .
  • FIG11 shows a possible implementation.
  • two computing devices 900A and 900B are connected via a communication interface 908.
  • the memory in the computing device 900A stores instructions for executing the functions of the scheduling node 100, for example, the memory stores instructions for executing the functions of the interaction module 102, the scheduling module 104, and the adjustment module 106.
  • the memory in the computing device 900B stores instructions for executing the functions of the data module 200.
  • the memory 906 of the computing devices 900A and 900B jointly stores instructions for the scheduling system 10 to execute the task scheduling method.
  • connection mode between the computing device clusters shown in Figure 11 may be considered to be that the task scheduling method provided by the present application requires more resources to detect whether the task slices are adjusted. Therefore, it is considered to hand over the functions implemented by the data module 200 to the computing device 900B for execution.
  • the functions of the computing device 900A shown in FIG11 may also be completed by multiple computing devices 900.
  • the functions of the computing device 900B may also be completed by multiple computing devices 900.
  • one or more computing devices in the computing device cluster can be connected via a network.
  • the network may be a wide area network or a local area network, etc.
  • FIG. 12 shows a possible implementation. As shown in FIG. 12 , two computing devices 900C and 900D are connected via a network. Specifically, the network is connected via a communication interface in each computing device.
  • the memory 906 in the computing device 900C stores instructions for executing the functions of the scheduling node 100, such as instructions for storing the functions of the interaction module 102, the scheduling module 104, and the adjustment module 106. At the same time, the memory 906 in the computing device 900D stores instructions for executing the functions of the data module 200.
  • connection method between the computing device clusters shown in Figure 12 can be considered to be that the task scheduling method provided in this application requires more resources to perform task slicing detection, so it is considered to hand over the functions implemented by the data module 200 to the computing device 900D for execution.
  • the functions of the computing device 900C shown in FIG12 may also be completed by multiple computing devices 900.
  • the functions of the computing device 900D may also be completed by multiple computing devices 900.
  • the embodiment of the present application also provides a computer-readable storage medium.
  • the computer-readable storage medium can be any available medium that can be stored by the computing device or a data storage device such as a data center containing one or more available media.
  • the available medium can be a magnetic medium (e.g., a floppy disk, a hard disk, a tape), an optical medium (e.g., a DVD), or a semiconductor medium (e.g., a solid-state hard disk).
  • the computer-readable storage medium includes instructions that instruct the computing device to execute the above-mentioned task scheduling method applied to the scheduling system 10.
  • the embodiment of the present application also provides a computer program product including instructions.
  • the computer program product may be software or a program product including instructions that can be run on a computing device or stored in any available medium.
  • the at least one computing device executes the above-mentioned task scheduling method.

Landscapes

  • Engineering & Computer Science (AREA)
  • Software Systems (AREA)
  • Theoretical Computer Science (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • Debugging And Monitoring (AREA)
  • Multi Processors (AREA)

Abstract

The present application discloses a task scheduling method, comprising: receiving a task, and scheduling the task or a task shard of the task to a first execution node in a distributed cluster; during execution of the task or the task shard by the first execution node, receiving a shard adjustment request sent by the first execution node on the basis of a load of the task or the task shard on the first execution node; and on the basis of the shard adjustment request, increasing a task shard of a task in a second execution node in the distributed cluster, or reducing the task shard of the task in the first execution node. The method can implement adaptive adjustment of a task or task shard in a running state, and the resource use is adjusted by dynamically adjusting the task shard of the task, so that the overall cluster resource reaches a load balance, preventing the risk of a task failure or node downtime due to the fact that execution nodes are completely occupied by a single task or task shard.

Description

一种任务调度方法及相关设备A task scheduling method and related equipment

本申请要求于2023年12月05日提交国家知识产权局、申请号为202311677310.2、发明名称为“一种任务调度方法及相关设备”的中国专利申请的优先权,以及要求于2024年03月12日提交国家知识产权局、申请号为202410284789.1、发明名称为“一种任务调度方法及相关设备”的中国专利申请的优先权,其全部内容通过引用结合在本申请中。This application claims the priority of the Chinese patent application filed with the State Intellectual Property Office on December 5, 2023, with application number 202311677310.2, and invention name “A task scheduling method and related equipment”, and claims the priority of the Chinese patent application filed with the State Intellectual Property Office on March 12, 2024, with application number 202410284789.1, and invention name “A task scheduling method and related equipment”, all contents of which are incorporated by reference in this application.

技术领域Technical Field

本申请涉及计算机技术领域,尤其涉及一种任务调度方法、任务调度系统、计算设备集群、计算机可读存储介质以及计算机程序产品。The present application relates to the field of computer technology, and in particular to a task scheduling method, a task scheduling system, a computing device cluster, a computer-readable storage medium, and a computer program product.

背景技术Background Art

随着计算机技术的不断发展,为大规模用户提供服务的应用不断涌现。以互联网应用或者企业级应用为代表的应用中通常有大量的批处理任务需要处理。随着上述应用的架构逐渐由单体架构演进为微服务架构,基于微服务架构的应用中的任务调度可以采用分布式任务调度策略。With the continuous development of computer technology, applications that provide services to large-scale users continue to emerge. Applications represented by Internet applications or enterprise-level applications usually have a large number of batch processing tasks to be processed. As the architecture of the above applications gradually evolves from a monolithic architecture to a microservice architecture, the task scheduling in applications based on the microservice architecture can adopt a distributed task scheduling strategy.

分布式任务调度是一种运行在分布式集群环境下的任务调度。调度节点将任务调度到分布式集群中的执行节点运行处理。随着任务的多样化,对任务调度策略的要求越来越高,如何更精准地调度任务,实现集群资源的负载均衡,提升计算资源的利用率需要更深入的探索。Distributed task scheduling is a task scheduling that runs in a distributed cluster environment. The scheduling node schedules tasks to the execution nodes in the distributed cluster for processing. With the diversification of tasks, the requirements for task scheduling strategies are getting higher and higher. How to schedule tasks more accurately, achieve load balancing of cluster resources, and improve the utilization of computing resources requires further exploration.

然而,很多任务不能够确定执行该任务需求的集群资源的资源量,调度节点将任务调度到执行节点,任务开始运行后可以导致节点资源不足,进而导致节点上运行任务的失败或者整个节点宕机的风险。However, many tasks cannot determine the amount of cluster resources required to execute the task. The scheduling node schedules the task to the execution node. After the task starts running, it may cause insufficient node resources, which in turn leads to the failure of running the task on the node or the risk of the entire node crashing.

发明内容Summary of the invention

本申请提供了一种任务调度方法,该方法可以根据运行态任务或任务分片的负载自适应调整任务分片数量,进而可以动态调整任务的资源使用,使得整体的集群资源达到负载均衡,避免执行节点被单一任务或任务分片占满,造成任务失败或者节点宕机的风险,提高可靠性和可用性。本申请还提供了与上述方法对应的调度系统、计算设备集群、计算机可读存储介质以及计算机程序产品。The present application provides a task scheduling method, which can adaptively adjust the number of task slices according to the load of the running task or task slice, and then dynamically adjust the resource usage of the task, so that the overall cluster resources can achieve load balancing, avoid the execution node being occupied by a single task or task slice, causing the risk of task failure or node downtime, and improve reliability and availability. The present application also provides a scheduling system, a computing device cluster, a computer-readable storage medium, and a computer program product corresponding to the above method.

第一方面,本申请提供一种任务调度方法。该方法可以由调度系统执行。调度系统可以是分布式任务调度系统,用于将任务调度至分布式集群,由分布式集群处理。调度系统可以是软件系统,软件系统可以是独立的软件系统,例如是独立的调度引擎,或者是集成于其他软件,例如是以插件、小程序等形式集成于其他软件。其中,软件系统可以以代码包的形式提供给用户,由用户自行部署,或者是以云服务的形式提供给用户使用。软件系统可以部署在计算设备集群中,计算设备集群执行软件系统的程序代码,从而执行本申请的任务调度方法。在一些示例中,任务调度系统也可以是硬件系统,例如是具有调度能力的计算设备集群。该硬件系统运行时,执行本申请的任务调度方法。In the first aspect, the present application provides a task scheduling method. The method can be performed by a scheduling system. The scheduling system can be a distributed task scheduling system for scheduling tasks to a distributed cluster for processing by the distributed cluster. The scheduling system can be a software system, and the software system can be an independent software system, such as an independent scheduling engine, or integrated into other software, such as integrated into other software in the form of plug-ins, applets, etc. Among them, the software system can be provided to the user in the form of a code package, which can be deployed by the user himself, or provided to the user in the form of a cloud service. The software system can be deployed in a computing device cluster, and the computing device cluster executes the program code of the software system, thereby executing the task scheduling method of the present application. In some examples, the task scheduling system can also be a hardware system, such as a computing device cluster with scheduling capabilities. When the hardware system is running, the task scheduling method of the present application is executed.

具体地,调度系统接收任务,将任务或任务的任务分片调度至分布式集群中的第一执行节点,第一执行节点为分布式集群中的至少一个执行节点。在第一执行节点执行任务或任务分片的过程中,调度系统接收第一执行节点发送的分片调整请求。分片调整请求用于请求增加任务的任务分片或缩减任务的任务分片,分片调整请求为第一执行节点根据第一执行节点上任务或任务分片的负载生成。调度系统根据分片调整请求,在分布式集群调整任务的任务分片。其中,调整任务的任务分片包括:在第二执行节点增加任务的任务分片,或者缩减第一执行节点中任务的任务分片。Specifically, the scheduling system receives the task, and schedules the task or the task slice of the task to the first execution node in the distributed cluster, and the first execution node is at least one execution node in the distributed cluster. In the process of the first execution node executing the task or the task slice, the scheduling system receives a slice adjustment request sent by the first execution node. The slice adjustment request is used to request to increase the task slice of the task or reduce the task slice of the task, and the slice adjustment request is generated by the first execution node according to the load of the task or the task slice on the first execution node. The scheduling system adjusts the task slice of the task in the distributed cluster according to the slice adjustment request. Among them, adjusting the task slice of the task includes: increasing the task slice of the task in the second execution node, or reducing the task slice of the task in the first execution node.

该方法可以根据运行态任务或任务分片的负载自适应调整任务分片数量,进而可以动态调整任务的资源使用,使得整体的集群资源达到负载均衡,避免执行节点被单一任务或任务分片占满,造成任务失败或者节点宕机的风险,提高可靠性和可用性。This method can adaptively adjust the number of task slices according to the load of the running tasks or task slices, and then dynamically adjust the resource usage of the tasks, so that the overall cluster resources can be load balanced, avoiding the execution nodes being occupied by a single task or task slice, causing the risk of task failure or node downtime, and improving reliability and availability.

在一些可能的实现方式中,第一执行节点上任务或任务分片的负载大于第一阈值时,分片调整请求用于请求增加任务的任务分片;或者,第一执行节点上任务的目标任务分片的负载小于第二阈值时,分片调整请求用于缩减任务的目标任务分片。 In some possible implementations, when the load of the task or task slice on the first execution node is greater than a first threshold, the slice adjustment request is used to request to increase the task slice of the task; or, when the load of the target task slice of the task on the first execution node is less than a second threshold, the slice adjustment request is used to reduce the target task slice of the task.

如此可以实现根据任务或任务分片的负载大小自适应调整任务分片的数量,进而动态调整任务的资源使用,使得集群资源达到负载均衡。In this way, the number of task slices can be adaptively adjusted according to the load size of the task or task slice, and the resource usage of the task can be dynamically adjusted to achieve load balancing of cluster resources.

在一些可能的实现方式中,调度系统包括调度节点和数据模块。数据模块检测到任务的任务分片发生调整时,将任务的处理数据重新分片获得至少一个数据分片,至少一个数据分片与调整后的任务分片一一对应。In some possible implementations, the scheduling system includes a scheduling node and a data module. When the data module detects that the task slice of the task is adjusted, the processing data of the task is re-sliced to obtain at least one data slice, and the at least one data slice corresponds to the adjusted task slice one by one.

该方法通过在检测到任务分片发生调整时,重新对任务的处理数据进行分片,例如是对处理数据进行再次负载均衡(rebalance),由此可以减轻负载较大的任务或任务分片所占资源负载,达到整体资源相对均衡的目的。The method re-slices the task processing data when it detects that the task slice has been adjusted, for example, re-load balances the processing data, thereby reducing the resource load occupied by tasks or task slices with heavier loads, thereby achieving the purpose of relatively balanced overall resources.

在一些可能的实现方式中,数据模块可以根据调整后的任务分片的数量,通过负载均衡,将任务的处理数据重新分片获得至少一个数据分片。其中,数据模块通过负载均衡对处理数据重新分片,保证每个任务分片的拉取数据量相对均衡,由此可以实现整体资源相对均衡。In some possible implementations, the data module can re-slice the task processing data to obtain at least one data slice through load balancing according to the adjusted number of task slices. The data module re-slices the processing data through load balancing to ensure that the amount of data pulled by each task slice is relatively balanced, thereby achieving relative balance of overall resources.

在一些可能的实现方式中,数据模块还可以记录任务的处理数据的拉取进度。相应地,数据模块可以根据任务的处理数据的拉取进度,确定剩余数据,然后数据模块可以对剩余数据重新分片获得至少一个数据分片。如此可以保障任务分片在变化时,能够完整地拉取处理数据,保证数据的唯一性和完整性。In some possible implementations, the data module can also record the progress of pulling the processing data of the task. Accordingly, the data module can determine the remaining data based on the progress of pulling the processing data of the task, and then the data module can re-shard the remaining data to obtain at least one data shard. In this way, it can be ensured that when the task shard changes, the processing data can be completely pulled and the uniqueness and integrity of the data can be guaranteed.

在一些可能的实现方式中,数据模块还可以向任务分片发送心跳消息,该心跳消息用于检测任务分片的活性。然后,数据模块根据任务分片的活性,调整数据分片。如此可以动态监测任务分片的状态,进而实现数据分片和任务分片的联动调整,保障整体资源的均衡。In some possible implementations, the data module can also send a heartbeat message to the task slice, which is used to detect the activity of the task slice. Then, the data module adjusts the data slice according to the activity of the task slice. In this way, the status of the task slice can be dynamically monitored, and then the linkage adjustment of the data slice and the task slice can be realized to ensure the balance of the overall resources.

在一些可能的实现方式中,任务或任务分片装配在数据体中。数据体记录运行的任务或任务分片的负载以及任务或任务分片的标识。该方法通过维护任务信息的数据体如cube,记录运行态任务的负载占用情况,可以快速做出每个运行态任务占用资源负载的决策,为是否进行任务的缩减提供参考。In some possible implementations, tasks or task slices are assembled in a data body. The data body records the load of the running task or task slice and the identification of the task or task slice. By maintaining a data body of task information such as a cube, the method records the load occupancy of the running task, and can quickly make a decision on the resource load occupied by each running task, providing a reference for whether to reduce the task.

在一些可能的实现方式中,任务包括日志加工任务。In some possible implementations, the task includes a log processing task.

第二方面,本申请提供一种调度系统。所述调度系统包括:In a second aspect, the present application provides a scheduling system. The scheduling system comprises:

调度节点,用于接收调度任务,将所述任务或所述任务的任务分片调度至分布式集群中的第一执行节点,所述第一执行节点为所述分布式集群中的至少一个执行节点,在所述第一执行节点执行所述任务或所述任务分片的过程中,接收所述第一执行节点发送的分片调整请求,所述分片调整请求用于请求增加所述任务的任务分片或缩减所述任务的任务分片,所述分片调整请求为所述第一执行节点根据所述第一执行节点上所述任务或所述任务的任务分片的负载生成;A scheduling node, used for receiving a scheduling task, scheduling the task or the task slice of the task to a first execution node in a distributed cluster, the first execution node being at least one execution node in the distributed cluster, and receiving a slice adjustment request sent by the first execution node during the process of the first execution node executing the task or the task slice, the slice adjustment request being used to request to increase the task slice of the task or reduce the task slice of the task, the slice adjustment request being generated by the first execution node according to the load of the task or the task slice of the task on the first execution node;

所述调度节点,还用于根据所述分片调整请求,在所述分布式集群调整所述任务的任务分片;The scheduling node is further used to adjust the task slice of the task in the distributed cluster according to the slice adjustment request;

其中,所述调整所述任务的任务分片包括:在第二执行节点增加所述任务的任务分片,或者缩减所述第一执行节点中所述任务的任务分片。The adjusting of the task slices of the task includes: increasing the task slices of the task in the second execution node, or reducing the task slices of the task in the first execution node.

在一些可能的实现方式中,所述第一执行节点上所述任务或所述任务的任务分片的负载大于第一阈值时,所述分片调整请求用于请求增加所述任务的任务分片;或者,所述第一执行节点上所述任务的目标任务分片的负载小于第二阈值时,所述分片调整请求用于缩减所述任务的所述目标任务分片。In some possible implementations, when the load of the task or the task slice of the task on the first execution node is greater than a first threshold, the slice adjustment request is used to request to increase the task slice of the task; or, when the load of the target task slice of the task on the first execution node is less than a second threshold, the slice adjustment request is used to reduce the target task slice of the task.

在一些可能的实现方式中,所述调度系统包括所述调度节点和数据模块,所述数据模块具体用于:In some possible implementations, the scheduling system includes the scheduling node and a data module, and the data module is specifically used to:

检测到所述任务的任务分片发生调整时,将所述任务的处理数据重新分片获得至少一个数据分片,所述至少一个数据分片与调整后的任务分片一一对应。When it is detected that the task slice of the task is adjusted, the processed data of the task is re-sliced to obtain at least one data slice, and the at least one data slice corresponds to the adjusted task slice one by one.

在一些可能的实现方式中,所述数据模块具体用于:In some possible implementations, the data module is specifically used to:

根据所述调整后的任务分片的数量,通过负载均衡,将所述任务的处理数据重新分片获得至少一个数据分片。According to the adjusted number of task slices, the processing data of the task is re-sliced to obtain at least one data slice through load balancing.

在一些可能的实现方式中,所述数据模块还用于:In some possible implementations, the data module is further used to:

记录所述任务的处理数据的拉取进度;Record the progress of pulling the processing data of the task;

所述数据模块具体用于:The data module is specifically used for:

根据所述任务的处理数据的拉取进度,确定剩余数据;Determining remaining data according to the pulling progress of the processing data of the task;

对所述剩余数据重新分片获得至少一个数据分片。The remaining data is re-sharded to obtain at least one data shard.

在一些可能的实现方式中,所述数据模块还用于:In some possible implementations, the data module is further used to:

向所述任务分片发送心跳消息,所述心跳消息用于检测所述任务分片的活性; Sending a heartbeat message to the task slice, wherein the heartbeat message is used to detect the activity of the task slice;

根据所述任务分片的活性,调整所述数据分片。The data shards are adjusted according to the activity of the task shards.

在一些可能的实现方式中,所述任务或所述任务分片装配在数据体中,所述数据体记录运行的所述任务或所述任务分片的负载以及所述任务或所述任务分片的标识。In some possible implementations, the task or the task slice is assembled in a data body, and the data body records the load of the running task or the task slice and the identification of the task or the task slice.

在一些可能的实现方式中,所述任务包括日志加工任务。日志加工任务可以是根据用户的任务配置信息,从相应的日志源读取原始日志,依据用户期望的规则进行处理,再将结果写入到预期目标中。需要说明的是,任务也可以是其他不确定任务需求资源大小或者运行态任务所占资源变化幅度大的任务。In some possible implementations, the task includes a log processing task. The log processing task can be to read the original log from the corresponding log source according to the user's task configuration information, process it according to the rules expected by the user, and then write the result to the expected target. It should be noted that the task can also be other tasks with uncertain task demand resource size or large changes in the resources occupied by running tasks.

该方法可以应用于不同任务的任务调度场景,具有较高可用性。This method can be applied to task scheduling scenarios of different tasks and has high availability.

第三方面,本申请提供一种计算设备集群。所述计算设备集群包括至少一台计算设备,所述至少一台计算设备包括至少一个处理器和至少一个存储器。所述至少一个处理器、所述至少一个存储器进行相互的通信。所述至少一个处理器用于执行所述至少一个存储器中存储的指令,以使得计算设备或计算设备集群执行如第一方面或第一方面的任一种实现方式所述的任务调度方法。In a third aspect, the present application provides a computing device cluster. The computing device cluster includes at least one computing device, and the at least one computing device includes at least one processor and at least one memory. The at least one processor and the at least one memory communicate with each other. The at least one processor is used to execute instructions stored in the at least one memory, so that the computing device or the computing device cluster executes the task scheduling method described in the first aspect or any implementation of the first aspect.

第四方面,本申请提供一种计算机可读存储介质,所述计算机可读存储介质中存储有指令,所述指令指示计算设备或计算设备集群执行上述第一方面或第一方面的任一种实现方式所述的任务调度方法。In a fourth aspect, the present application provides a computer-readable storage medium, wherein the computer-readable storage medium stores instructions, wherein the instructions instruct a computing device or a computing device cluster to execute the task scheduling method described in the first aspect or any one of the implementations of the first aspect.

第五方面,本申请提供了一种包含指令的计算机程序产品,当其在计算设备或计算设备集群上运行时,使得计算设备或计算设备集群执行上述第一方面或第一方面的任一种实现方式所述的任务调度方法。In a fifth aspect, the present application provides a computer program product comprising instructions, which, when executed on a computing device or a computing device cluster, enables the computing device or the computing device cluster to execute the task scheduling method described in the first aspect or any one of the implementations of the first aspect.

本申请在上述各方面提供的实现方式的基础上,还可以进行进一步组合以提供更多实现方式。Based on the implementations provided in the above aspects, this application can also be further combined to provide more implementations.

附图说明BRIEF DESCRIPTION OF THE DRAWINGS

为了更清楚地说明本申请实施例的技术方法,下面将对实施例中所需使用的附图作以简单地介绍。In order to more clearly illustrate the technical method of the embodiments of the present application, the drawings required for use in the embodiments are briefly introduced below.

图1为本申请提供的一种任务调度的流程示意图;FIG1 is a schematic diagram of a task scheduling process provided by the present application;

图2为本申请提供的一种调度系统的结构示意图;FIG2 is a schematic diagram of the structure of a scheduling system provided by the present application;

图3为本申请提供的一种任务调度方法的流程图;FIG3 is a flow chart of a task scheduling method provided by the present application;

图4为本申请提供的一种任务分片自适应调整的示意图;FIG4 is a schematic diagram of a task slice adaptive adjustment provided by the present application;

图5为本申请提供的一种数据分片随任务分片自适应调整的示意图;FIG5 is a schematic diagram of adaptive adjustment of data slicing with task slicing provided by the present application;

图6为本申请提供的一种任务分片自适应调整的流程示意图;FIG6 is a schematic diagram of a process flow of adaptive adjustment of task slices provided by the present application;

图7为本申请提供的一种任务调度方法应用于日志加工场景的示意图;FIG7 is a schematic diagram of a task scheduling method provided by the present application applied to a log processing scenario;

图8为本申请提供的一种调度节点的结构示意图;FIG8 is a schematic diagram of the structure of a scheduling node provided by the present application;

图9为本申请提供的一种计算设备的结构示意图;FIG9 is a schematic diagram of the structure of a computing device provided by the present application;

图10为本申请提供的一种计算设备集群的结构示意图;FIG10 is a schematic diagram of the structure of a computing device cluster provided by the present application;

图11为本申请提供的另一种计算设备集群的结构示意图;FIG11 is a schematic diagram of the structure of another computing device cluster provided by the present application;

图12为本申请提供的又一种计算设备集群的结构示意图。FIG. 12 is a schematic diagram of the structure of another computing device cluster provided in the present application.

具体实施方式DETAILED DESCRIPTION

本申请实施例中的术语“第一”、“第二”仅用于描述目的,而不能理解为指示或暗示相对重要性或者隐含指明所指示的技术特征的数量。由此,限定有“第一”、“第二”的特征可以明示或者隐含地包括一个或者更多个该特征。The terms "first" and "second" in the embodiments of the present application are used for descriptive purposes only and should not be understood as indicating or implying relative importance or implicitly indicating the number of the indicated technical features. Therefore, the features defined as "first" and "second" may explicitly or implicitly include one or more of the features.

首先对本申请实施例中所涉及到的一些技术术语进行介绍。First, some technical terms involved in the embodiments of the present application are introduced.

分布式任务调度,是一种运行在分布式集群环境下的任务调度。其中,分布式是指按照微服务架构风格拆分出不同业务进行部署,集群是指按照微服务架构风格拆分出的同一个业务在不同节点部署。基于此,分布式集群是指对分布式系统的每个节点进行集群设置,例如是对拆分的不同业务采用集群部署。具体实现时,调度节点可以将任务调度到分布式集群中的执行节点运行处理。其中,调度节点也可以称作主节点master,负责任务调度,具体是根据合理的调度策略,将任务调度至合理的执行节点。执行节点也可以称作工作节点worker,是实际运行任务的节点,根据用户编写的任务逻辑完成任务处理。Distributed task scheduling is a task scheduling that runs in a distributed cluster environment. Distributed means that different businesses are split and deployed according to the microservice architecture style, and cluster means that the same business split according to the microservice architecture style is deployed on different nodes. Based on this, a distributed cluster refers to the cluster setting of each node in a distributed system, for example, cluster deployment of different split businesses. In specific implementation, the scheduling node can schedule tasks to the execution node in the distributed cluster for operation and processing. Among them, the scheduling node can also be called the master node, which is responsible for task scheduling, specifically scheduling tasks to reasonable execution nodes according to reasonable scheduling strategies. The execution node can also be called the worker node, which is the node that actually runs the task and completes task processing according to the task logic written by the user.

针对很多任务不能够确定执行该任务需求的集群资源的资源量,调度节点将任务调度到执行节点,任务开始运行后可以导致节点资源不足,进而导致节点上运行任务的失败或者整个节点宕机的风险,业界提出了一种基于MapReduce的任务调度方式。 For many tasks, it is impossible to determine the amount of cluster resources required to execute the task. The scheduling node schedules the task to the execution node. After the task starts running, it may lead to insufficient node resources, which in turn leads to the failure of running the task on the node or the risk of the entire node crashing. The industry has proposed a task scheduling method based on MapReduce.

如图1所示,任务中心包括任务1(task 1)、任务2(task 2),调度中心包括多个主节点,例如包括master1和master2,调度中心用于将任务调度至任务执行器进行执行,任务执行器包括多个工作节点,例如是worker1至worker3。调度中心的master节点可以监控集群(工作节点形成的集群)的负载状态,动态监测各个工作节点执行过程中的信息。当任务1、任务2到达调度中心,master节点可以根据当前的集群的负载情况决策是否对任务进行拆分。该示例中,master节点可以决策对任务进行拆分,分别送往不同的工作节点进行处理。例如,task1可以拆分为task 1map和task1  reduce-1、task1  reduce 2,分别调度至worker1至worker3。一个MapReduce作业通常会把输入的数据集切分为若干独立的数据块,由map任务以完全并行的方式去处理数据块。map任务的输出可以作为reduce任务的输入。As shown in Figure 1, the task center includes task 1 (task 1) and task 2 (task 2), and the scheduling center includes multiple master nodes, such as master1 and master2. The scheduling center is used to schedule tasks to task executors for execution. The task executors include multiple working nodes, such as worker1 to worker3. The master node of the scheduling center can monitor the load status of the cluster (a cluster formed by working nodes) and dynamically monitor the information during the execution of each working node. When task 1 and task 2 arrive at the scheduling center, the master node can decide whether to split the tasks according to the current load of the cluster. In this example, the master node can decide to split the tasks and send them to different working nodes for processing. For example, task1 can be split into task 1map and task1  reduce-1, task1  reduce 2, and dispatched to worker1 to worker3 respectively. A MapReduce job usually divides the input data set into several independent data blocks, and the map task processes the data blocks in a completely parallel manner. The output of the map task can be used as the input of the reduce task.

基于MapReduce的任务调度方式可以实现动态分片的效果,上述任务分片可以更好地利用集群资源,但是在极端场景下,还是存在reduce任务在处理时将worker资源占满,导致任务执行失败或者节点宕机。上述分布式调度方式都只是在任务下发的时刻,将任务分配给特定的执行器去执行,不会关注后续各个执行器的工作负载情况。即使开放出分片策略接口给用户自定义,但是仍然避免不了整个逻辑是在任务下发时确定的,无法进行动态适配,如果后续有高能耗资源的任务被调度,在无法识别该任务的后续资源占用情况的前提下,调度引擎依然按照静态分片或者MapReduce动态分片方式将该分配到各个执行器中执行,有可能将整个执行器的资源被完全占用的风险,从而导致其他任务执行失败或整个节点宕机。The task scheduling method based on MapReduce can achieve the effect of dynamic sharding. The above task sharding can make better use of cluster resources, but in extreme scenarios, there is still a situation where the reduce task will occupy all the worker resources during processing, resulting in task execution failure or node downtime. The above distributed scheduling methods only assign tasks to specific executors for execution when the tasks are issued, and will not pay attention to the subsequent workload of each executor. Even if the sharding strategy interface is opened for user customization, it is still inevitable that the entire logic is determined when the task is issued and cannot be dynamically adapted. If a high-energy resource-consuming task is scheduled later, the scheduling engine will still allocate it to each executor for execution according to the static sharding or MapReduce dynamic sharding method without being able to identify the subsequent resource occupancy of the task. There is a risk that the resources of the entire executor will be fully occupied, resulting in the failure of other tasks to execute or the entire node to crash.

有鉴于此,本申请提供一种任务调度方法。该方法可以由调度系统执行。该调度系统可以是分布式任务调度系统。该任务调度系统适用于开始调度时不确定需求资源大小、不确定分片数量的调度场景、存在大量的批处理任务或者是运行态所需资源大小随时间不断变化,存在单个任务将执行节点资源占满的长稳任务场景。该调度系统可以是软件系统,软件系统可以是独立的软件系统,例如是独立的调度引擎,或者是集成于其他软件,例如是以插件、小程序等形式集成于其他软件。其中,软件系统可以以代码包的形式提供给用户,由用户自行部署,或者是以云服务的形式提供给用户使用。软件系统可以部署在计算设备集群中,计算设备集群执行软件系统的程序代码,从而执行本申请的任务调度方法。在一些示例中,任务调度系统也可以是硬件系统,例如是具有调度能力的计算设备集群。该硬件系统运行时,执行本申请的任务调度方法。In view of this, the present application provides a task scheduling method. The method can be executed by a scheduling system. The scheduling system can be a distributed task scheduling system. The task scheduling system is suitable for scheduling scenarios where the required resource size and the number of shards are uncertain at the beginning of scheduling, there are a large number of batch tasks or the resource size required for the running state changes over time, and there is a long-term stable task scenario where a single task will occupy the execution node resources. The scheduling system can be a software system, and the software system can be an independent software system, such as an independent scheduling engine, or integrated into other software, such as in the form of plug-ins, applets, etc. In which, the software system can be provided to the user in the form of a code package, deployed by the user, or provided to the user in the form of a cloud service. The software system can be deployed in a computing device cluster, and the computing device cluster executes the program code of the software system, thereby executing the task scheduling method of the present application. In some examples, the task scheduling system can also be a hardware system, such as a computing device cluster with scheduling capabilities. When the hardware system is running, the task scheduling method of the present application is executed.

具体地,调度系统可以接收任务,将任务或任务的任务分片调度至分布式集群中的第一执行节点,第一执行节点为分布式集群中的至少一个执行节点,然后调度系统在第一执行节点执行任务或任务分片的过程中,接收第一执行节点发送的分片调整请求,分片调整请求用于请求增加任务的任务分片或缩减任务的任务分片,分片调整请求为第一执行节点根据第一执行节点上任务或任务的任务分片的负载生成,调度系统根据分片调整请求,在分布式集群调整任务的任务分片。其中,调整任务的任务分片包括在第二执行节点增加任务的任务分片,或者缩减第一执行节点中任务的任务分片。Specifically, the scheduling system can receive a task, and schedule the task or the task slice of the task to the first execution node in the distributed cluster. The first execution node is at least one execution node in the distributed cluster. Then, during the process of the first execution node executing the task or the task slice, the scheduling system receives a slice adjustment request sent by the first execution node. The slice adjustment request is used to request to increase the task slice of the task or reduce the task slice of the task. The slice adjustment request is generated by the first execution node according to the load of the task or the task slice of the task on the first execution node. The scheduling system adjusts the task slice of the task in the distributed cluster according to the slice adjustment request. Among them, adjusting the task slice of the task includes increasing the task slice of the task in the second execution node, or reducing the task slice of the task in the first execution node.

该方法可以根据运行态任务或任务分片的负载自适应调整任务分片数量,进而可以动态调整任务的资源使用,使得整体的集群资源达到负载均衡,避免执行节点被单一任务或任务分片占满,造成任务失败或者节点宕机的风险,提高可靠性和可用性。This method can adaptively adjust the number of task slices according to the load of the running tasks or task slices, and then dynamically adjust the resource usage of the tasks, so that the overall cluster resources can be load balanced, avoiding the execution nodes being occupied by a single task or task slice, causing the risk of task failure or node downtime, and improving reliability and availability.

为了使得本申请的技术方案更加清楚、易于理解,下面结合附图对本申请的系统架构进行介绍。In order to make the technical solution of the present application clearer and easier to understand, the system architecture of the present application is introduced below with reference to the accompanying drawings.

参见图2所示的一种调度系统的架构示意图,调度系统10包括调度节点100,考虑到可靠性、或者负载均衡,调度系统10可以包括多个调度节点100,该调度节点100可以是master。其中,调度节点可以形成调度中心。调度系统10分别与任务中心20、任务执行器30对接,任务执行器30可以包括多个执行节点形成的分布式集群。调度系统10用于将任务中心20的任务调度至任务执行器30中的执行节点进行执行。其中,执行节点可以为工作节点worker。进一步地,调度系统10还可以包括数据模块200。Referring to the schematic diagram of the architecture of a scheduling system shown in FIG2 , the scheduling system 10 includes a scheduling node 100. Considering reliability or load balancing, the scheduling system 10 may include multiple scheduling nodes 100, and the scheduling node 100 may be a master. Among them, the scheduling nodes may form a scheduling center. The scheduling system 10 is connected to the task center 20 and the task executor 30 respectively, and the task executor 30 may include a distributed cluster formed by multiple execution nodes. The scheduling system 10 is used to schedule the tasks of the task center 20 to the execution nodes in the task executor 30 for execution. Among them, the execution node may be a working node worker. Further, the scheduling system 10 may also include a data module 200.

具体地,调度节点100用于接收任务,将任务或任务的任务分片调度至分布式集群中的第一执行节点,第一执行节点为分布式集群中的至少一个执行节点,在第一执行节点执行任务或任务分片的过程中,接收第一执行节点发送的分片调整请求。该分片调整请求用于请求增加任务的任务分片或缩减任务的任务分片,分片调整请求为第一执行节点根据第一执行节点上任务或任务的任务分片的负载生成。在一些示例中,分片调整请求可以包括调整类型和调整对象的标识,调整类型可以为增加任务分片或缩减任务 分片,调整对象的标识可以包括任务标识或者任务分片的标识中的至少一项。在另一些示例中,分片调整请求可以包括任务或任务分片的负载。调度节点还用于根据分片调整请求,在分布式集群调整任务的任务分片。其中,调整任务的任务分片包括在第二执行节点增加任务的任务分片,或者缩减第一执行节点中任务的任务分片。Specifically, the scheduling node 100 is used to receive tasks, and schedule the tasks or task slices of the tasks to the first execution node in the distributed cluster. The first execution node is at least one execution node in the distributed cluster. During the process of the first execution node executing the tasks or task slices, the scheduling node 100 receives a slice adjustment request sent by the first execution node. The slice adjustment request is used to request to increase the task slices of the task or reduce the task slices of the task. The slice adjustment request is generated by the first execution node according to the load of the task or the task slice of the task on the first execution node. In some examples, the slice adjustment request may include an adjustment type and an identifier of the adjustment object. The adjustment type may be to increase the task slices or reduce the task slices. The identifier of the adjustment object may include at least one of the task identifier or the identifier of the task slice. In other examples, the slice adjustment request may include the load of the task or the task slice. The scheduling node is also used to adjust the task slice of the task in the distributed cluster according to the slice adjustment request. Wherein, adjusting the task slice of the task includes adding the task slice of the task in the second execution node, or reducing the task slice of the task in the first execution node.

其中,数据模块200存储有执行任务所需要的数据,该数据也称作任务的处理数据。以图2示例说明,数据模块200可以存储执行任务1所需的数据1(如data 1)和执行任务2所需的数据2(如data2)。执行节点上运行的任务或任务分片可以从数据模块200拉取任务的处理数据,以执行任务。The data module 200 stores data required for executing tasks, which are also called task processing data. As shown in FIG2 , the data module 200 can store data 1 (such as data 1) required for executing task 1 and data 2 (such as data 2) required for executing task 2. Tasks or task slices running on the execution node can pull task processing data from the data module 200 to execute the tasks.

第一执行节点用于感知任务或任务分片的负载,根据该负载决策是否调整任务分片的数量,进而根据决策结果生成分片调整请求。在一些可能的实现方式中,调度系统可以设置用于决策是否调整任务分片的数量的阈值,例如是第一阈值或第二阈值。第一执行节点上任务或任务的任务分片的负载大于第一阈值时,分片调整请求用于请求增加任务的任务分片。或者,第一执行节点上任务的目标任务分片的负载小于第二阈值时,分片调整请求用于缩减任务的目标任务分片。其中,第一阈值或第二阈值可以根据经验设置,第一阈值可以等于第二阈值,也可以不等于第二阈值,本实施例对此不作限制。The first execution node is used to sense the load of the task or task slice, decide whether to adjust the number of task slices based on the load, and then generate a slice adjustment request based on the decision result. In some possible implementations, the scheduling system can set a threshold for deciding whether to adjust the number of task slices, such as a first threshold or a second threshold. When the load of the task or the task slice of the task on the first execution node is greater than the first threshold, the slice adjustment request is used to request to increase the task slice of the task. Alternatively, when the load of the target task slice of the task on the first execution node is less than the second threshold, the slice adjustment request is used to reduce the target task slice of the task. Among them, the first threshold or the second threshold can be set according to experience, the first threshold can be equal to the second threshold, and it can also be unequal to the second threshold, and this embodiment does not limit this.

仍以图2示例说明,任务1在开始调度时被调度节点100(如master1)调度至第一执行节点(如工作节点1,记作worker1),当worker1感知到任务1的负载大于第一阈值,可以向调度节点100发送分片调整请求,分片调整请求用于请求在第二执行节点增加任务1的任务分片。其中,第二执行节点可以是分布式集群中除第一执行节点外的执行节点,例如为worker3。基于此,task 1可以分为如下任务分片:任务1-1(task1-1)、任务1-2(task1-2)。其中,task1-2可以是新增的任务分片,task1-1为原有的任务1分担一部分负载至task1-2后形成的任务分片。Still using the example of Figure 2, when task 1 starts scheduling, it is scheduled by the scheduling node 100 (such as master1) to the first execution node (such as working node 1, denoted as worker1). When worker1 senses that the load of task 1 is greater than the first threshold, it can send a shard adjustment request to the scheduling node 100. The shard adjustment request is used to request to add a task shard of task 1 to the second execution node. Among them, the second execution node can be an execution node other than the first execution node in the distributed cluster, such as worker3. Based on this, task 1 can be divided into the following task shards: task 1-1 (task1-1), task 1-2 (task1-2). Among them, task1-2 can be a newly added task shard, and task1-1 is a task shard formed after the original task 1 shares part of the load to task1-2.

数据模块200用于检测到任务的任务分片发生调整时,将任务的处理数据重新分片获得至少一个数据分片。至少一个数据分片与调整后的任务分片一一对应。仍以图2示例说明,任务1被分为2个任务分片时,数据1可以重新分片为2个数据分片。例如,数据1中编号为0至3的数据块被分为一个数据分片,编号为4至7的数据块被分为另一个数据分片。The data module 200 is used to re-slice the processing data of the task to obtain at least one data slice when it is detected that the task slice of the task is adjusted. The at least one data slice corresponds to the adjusted task slice one by one. Still using FIG. 2 as an example, when task 1 is divided into 2 task slices, data 1 can be re-sliced into 2 data slices. For example, data blocks numbered 0 to 3 in data 1 are divided into one data slice, and data blocks numbered 4 to 7 are divided into another data slice.

其中,任务分片可以从相应的数据分片拉取数据,进而根据该数据分片进行任务处理。执行节点执行任务或任务分片的结果可以向调度节点100上报。The task slices may pull data from the corresponding data slices, and then process the task according to the data slices. The results of the execution nodes executing the tasks or task slices may be reported to the scheduling node 100.

基于前述的调度系统10,本申请还提供一种任务调度方法。下面结合实施例对本申请的任务调度方法进行详细说明。Based on the aforementioned scheduling system 10, the present application further provides a task scheduling method. The task scheduling method of the present application is described in detail below in conjunction with an embodiment.

参见图3所示的一种任务调度方法的流程图,该方法可以由调度系统10执行,调度系统10包括调度节点100,进一步地,调度系统10还可以包括数据模块200,该方法具体包括如下步骤:Referring to the flowchart of a task scheduling method shown in FIG3 , the method may be executed by a scheduling system 10, the scheduling system 10 includes a scheduling node 100, and further, the scheduling system 10 may also include a data module 200. The method specifically includes the following steps:

S301、调度节点100接收任务。S301: The scheduling node 100 receives a task.

任务是指需要调度节点100调度至执行节点执行的任务。该任务为不确定需求资源大小、不确定分片数量、运行态所需资源大小随时间不断变化的任务或批处理任务。在一些示例中,任务可以是日志加工任务。日志加工任务可以是根据用户的任务配置信息,从相应的日志源读取原始日志,依据用户期望的规则进行处理,再将结果写入到预期目标中。在另一些示例中,任务也可以是其他不确定任务需求资源大小或者运行态任务所占资源变化幅度大的任务。A task refers to a task that needs to be scheduled by the scheduling node 100 to the execution node for execution. The task is a task or a batch processing task with uncertain resource requirements, uncertain number of shards, and resource size that changes over time in the running state. In some examples, the task may be a log processing task. The log processing task may be based on the user's task configuration information, read the original log from the corresponding log source, process it according to the rules expected by the user, and then write the result to the expected target. In other examples, the task may also be other tasks with uncertain resource requirements or tasks with large changes in the resources occupied by running tasks.

S302、调度节点100将任务或任务的任务分片调度至分布式集群中的第一执行节点。S302 , the scheduling node 100 schedules the task or the task slice of the task to the first execution node in the distributed cluster.

第一执行节点为分布式集群中的至少一个执行节点。其中,调度节点100可以根据分布式集群中执行节点的负载,从分布式集群中确定第一执行节点。例如,调度节点100可以将负载最小或小于设定值的节点确定为第一执行节点。然后,调度节点100可以将任务调度至上述第一执行节点。The first execution node is at least one execution node in the distributed cluster. The scheduling node 100 may determine the first execution node from the distributed cluster according to the load of the execution node in the distributed cluster. For example, the scheduling node 100 may determine the node with the smallest load or less than a set value as the first execution node. Then, the scheduling node 100 may schedule the task to the above-mentioned first execution node.

在一些可能的实现方式中,调度节点100还可以将任务进行分片,从而获得多个任务分片。调度节点100可以将多个任务分片,分别调度至不同的第一执行节点。例如,调度节点100接收到task1,可以根据任务初始要求,将task1分为多个任务分片,具体包括task1-1、task1-2,将task1-1、task1-2调度至不同的第一执行节点,其中,task1-1被调度至worker1,task1-2被调度至worker2。In some possible implementations, the scheduling node 100 may also slice the task to obtain multiple task slices. The scheduling node 100 may schedule multiple task slices to different first execution nodes. For example, when the scheduling node 100 receives task1, it may divide task1 into multiple task slices, including task1-1 and task1-2, according to the initial requirements of the task, and schedule task1-1 and task1-2 to different first execution nodes, wherein task1-1 is scheduled to worker1 and task1-2 is scheduled to worker2.

S304、调度节点100在第一执行节点执行任务或任务分片过程中,接收第一执行节点发送的分片调整请求。 S304. When the first execution node is executing a task or a task slice, the scheduling node 100 receives a slice adjustment request sent by the first execution node.

分片调整请求用于请求增加任务的任务分片或缩减任务的任务分片。分片调整请求为第一执行节点根据第一执行节点上任务或任务的任务分片的负载生成。调度至第一执行节点的任务或任务分片可以拉取与该任务或任务分片对应的处理数据,以执行任务或任务分片。在第一执行节点执行任务或任务分片的过程中,第一执行节点可以监控任务或任务分片的负载,其中,任务或任务分片的负载是指任务或任务分片的资源负载(resource load),以计算资源示例说明,任务或任务分片的资源负载可以是处理器在一段时间内正在处理以及等待处理的进程数之和。第一执行节点可以根据任务或任务分片的负载生成分片调整请求,并向调度节点100发送分片调整请求。相应地,调度节点100接收到第一执行节点在执行任务或任务分片过程中发送的分片调整请求,可以执行后续的分片调整操作。The slicing adjustment request is used to request to increase the task slice of a task or reduce the task slice of a task. The slicing adjustment request is generated by the first execution node according to the load of the task or the task slice of the task on the first execution node. The task or task slice scheduled to the first execution node can pull the processing data corresponding to the task or task slice to execute the task or task slice. In the process of the first execution node executing the task or task slice, the first execution node can monitor the load of the task or task slice, wherein the load of the task or task slice refers to the resource load of the task or task slice. Taking the computing resource as an example, the resource load of the task or task slice can be the sum of the number of processes being processed and waiting to be processed by the processor within a period of time. The first execution node can generate a slicing adjustment request according to the load of the task or task slice, and send the slicing adjustment request to the scheduling node 100. Accordingly, the scheduling node 100 receives the slicing adjustment request sent by the first execution node during the execution of the task or task slice, and can perform subsequent slicing adjustment operations.

下面对分片调整请求的生成过程进行详细说明。The following is a detailed description of the generation process of the shard adjustment request.

第一执行节点将任务或任务的任务分片的负载(如资源负载)与设定的阈值进行比较。具体地,当任务或任务分片的负载大于第一阈值,第一执行节点可以生成用于请求增加任务分片的分片调整请求。当任务分片的负载小于第二阈值,第一执行节点可以生成用于请求缩减任务分片的分片调整请求。The first execution node compares the load (such as resource load) of the task or the task slice of the task with a set threshold. Specifically, when the load of the task or the task slice is greater than the first threshold, the first execution node can generate a slice adjustment request for requesting to increase the task slice. When the load of the task slice is less than the second threshold, the first execution node can generate a slice adjustment request for requesting to reduce the task slice.

以图4示例说明,针对任务1(如task1),调度中心将task1调度到工作节点1(如worker1),worker1监测到task1的负载大于第一阈值,向调度中心发送分片调整请求,该分片调整请求用于请求增加task1的任务分片,例如是在工作节点3(如worker3)增加一个task1的任务分片,例如为任务1-2(task1-2),相应地,worker1上运行的task1可以变为task1的一个任务分片,例如为任务1-1(如task1-1)。针对任务2(如task2),调度中心将task2分为3个任务分片,例如是任务2-1(如task2-1)、任务2-2(如task2-2)、任务2-3(如task2-3)分别调度至worker1、工作节点2(如worker2)和worker3。worker2监测到执行task2的任务分片task2-2的负载小于第二阈值,可以向调度中心发送分片调整请求,该分片调整请求用于请求缩容task2的任务分片task2-2。As shown in Figure 4, for task 1 (such as task1), the scheduling center schedules task1 to working node 1 (such as worker1). Worker1 detects that the load of task1 is greater than the first threshold, and sends a slicing adjustment request to the scheduling center. The slicing adjustment request is used to request to increase the task slicing of task1, for example, to add a task slicing of task1 to working node 3 (such as worker3), such as task 1-2 (task1-2). Accordingly, task1 running on worker1 can become a task slicing of task1, such as task 1-1 (such as task1-1). For task 2 (such as task2), the scheduling center divides task2 into three task slicings, such as task 2-1 (such as task2-1), task 2-2 (such as task2-2), and task 2-3 (such as task2-3), which are respectively scheduled to worker1, working node 2 (such as worker2), and worker3. When worker2 detects that the load of task slice task2-2 executing task2 is less than the second threshold, it can send a slice adjustment request to the scheduling center, where the slice adjustment request is used to request to shrink the task slice task2-2 of task2.

在一些可能的实现方式中,任务或任务分片可以装配在数据体中。该数据体可以是承载运行的任务或任务分片的cube。其中,数据体还可以记录运行的任务或任务分片的负载以及任务或任务分片的标识。任务分片为任务的子任务,基于此,数据体可以记录任务或子任务的负载以及任务或子任务的标识。第一执行节点可以针对每个调度至该执行节点的任务或任务分片,将任务或任务分片以及将任务的标识、任务分片的标识、资源负载封装在cube中。为了便于理解,以图4示例说明。该示例中,worker1可以在一个cube装配task1,并在cube中记录任务的标识(记作taskId)、任务分片的标识(记作Sub TaskId)和任务或任务分片的负载(记作ResourceLoad)。类似地,worker2可以在一个cube中装配task2-2,并在cube中记录task2的标识、task2-2的标识、task2-2的负载。In some possible implementations, a task or a task slice can be assembled in a data body. The data body can be a cube that carries the running task or task slice. Among them, the data body can also record the load of the running task or task slice and the identifier of the task or task slice. A task slice is a subtask of a task, based on which the data body can record the load of a task or a subtask and the identifier of a task or a subtask. The first execution node can encapsulate the task or task slice and the identifier of the task, the identifier of the task slice, and the resource load in a cube for each task or task slice scheduled to the execution node. For ease of understanding, an example is given in Figure 4. In this example, worker1 can assemble task1 in a cube and record the identifier of the task (recorded as taskId), the identifier of the task slice (recorded as Sub TaskId), and the load of the task or task slice (recorded as ResourceLoad) in the cube. Similarly, worker2 can assemble task2-2 in a cube and record the identifier of task2, the identifier of task2-2, and the load of task2-2 in the cube.

调度节点100可以接收第一执行节点根据任务或任务分片的负载生成的分片调整请求。在一些示例中,分片调整请求可以包括调整类型和调整对象的标识,调整类型可以为增加任务分片或缩减任务分片,调整对象的标识可以包括任务标识或者任务分片的标识中的至少一项。在另一些示例中,分片调整请求可以包括任务或任务分片的负载。The scheduling node 100 may receive a slice adjustment request generated by the first execution node according to the load of the task or task slice. In some examples, the slice adjustment request may include an adjustment type and an identifier of an adjustment object, the adjustment type may be to increase a task slice or to reduce a task slice, and the identifier of the adjustment object may include at least one of a task identifier or an identifier of a task slice. In other examples, the slice adjustment request may include the load of the task or task slice.

S306、调度节点100根据分片调整请求,在分布式集群调整任务的任务分片。S306 . The scheduling node 100 adjusts the task slices of the task in the distributed cluster according to the slice adjustment request.

其中,调整任务的任务分片包括在第二执行节点增加任务的任务分片,或者缩减第一执行节点中任务的任务分片。第二执行节点可以是分布式集群中除第一执行节点之外的执行节点。Wherein, adjusting the task slice of the task includes increasing the task slice of the task in the second execution node, or reducing the task slice of the task in the first execution node. The second execution node may be an execution node other than the first execution node in the distributed cluster.

分片调整请求指示有调整类型或调整对象。调整类型为增加任务分片时,调度节点可以根据分片调整请求中任务的标识,在第二执行节点增加任务的任务分片。其中,调度节点可以采用与确定第一执行节点类似的方式确定第二执行节点。调度节点可以获取分布式集群中各执行节点的负载,根据执行节点的负载确定第二执行节点。该第二执行节点可以为分布式集群中除第一执行节点之外其他执行节点中负载最小的执行节点,或者是负载小于设定值的执行节点。调整类型为缩减任务分片时,调度节点可以根据目标任务分片的标识,例如是分片调整请求中目标任务分片的标识,缩减第一执行节点中的目标任务分片。The slicing adjustment request indicates an adjustment type or an adjustment object. When the adjustment type is to add task slicing, the scheduling node can add the task slicing of the task in the second execution node according to the identifier of the task in the slicing adjustment request. Among them, the scheduling node can determine the second execution node in a similar manner to determining the first execution node. The scheduling node can obtain the load of each execution node in the distributed cluster and determine the second execution node according to the load of the execution node. The second execution node can be the execution node with the smallest load among the other execution nodes in the distributed cluster except the first execution node, or the execution node with a load less than a set value. When the adjustment type is to reduce task slicing, the scheduling node can reduce the target task slicing in the first execution node according to the identifier of the target task slicing, such as the identifier of the target task slicing in the slicing adjustment request.

S308、数据模块200检测到任务的任务分片发生调整时,将任务的处理数据重新分片获得至少一个数据分片。S308: When the data module 200 detects that the task slice of the task is adjusted, the processing data of the task is re-sliced to obtain at least one data slice.

数据模块200存储有任务的处理数据,数据模块200可以根据任务的分片数量,对任务的处理数据进行分片,获得至少一个数据分片。新加入的任务分片可以自动注册到数据模块200,如此,数据模块 200可以检测到任务的任务分片发生调整,数据模块200可以对任务的处理数据重新分片获得至少一个数据分片。以图5示例说明,task1包括如下任务分片task1-1和task1-2,当调度节点100增加了一个任务分片task1-3,该任务分片可以自动注册到数据模块200,数据模块200检测到任务分片增加,可以对任务的处理数据重新分片,由2个数据分片增加为3个数据分片。例如数据分片1中编号为3的数据块和数据分片2中编号为4、5的数据块可以从原数据分片剥离,合并为一个新的数据分片。The data module 200 stores the task processing data. The data module 200 can slice the task processing data according to the number of slices of the task to obtain at least one data slice. The newly added task slice can be automatically registered to the data module 200. 200 can detect that the task slice of the task has been adjusted, and the data module 200 can re-slice the processing data of the task to obtain at least one data slice. Taking Figure 5 as an example, task1 includes the following task slices task1-1 and task1-2. When the scheduling node 100 adds a task slice task1-3, the task slice can be automatically registered to the data module 200. The data module 200 detects the increase in task slices and can re-slice the processing data of the task from 2 data slices to 3 data slices. For example, the data block numbered 3 in data slice 1 and the data blocks numbered 4 and 5 in data slice 2 can be stripped from the original data slices and merged into a new data slice.

以数据分片为shard示例说明,任务的处理数据如日志数据可以存储在一个个shard。shard可以支持建立索引、数据查询等操作。数据模块200监测到新加入任务分片时,进行rebalance操作,对shard重新分片,保证每个任务分片的拉取数据量相对均衡。Taking data shard as an example, the processing data of the task, such as log data, can be stored in shards. Shard can support operations such as indexing and data query. When the data module 200 detects a newly added task shard, it performs a rebalance operation to re-shard the shard to ensure that the amount of data pulled by each task shard is relatively balanced.

在一些可能的实现方式中,数据模块200还可以记录任务的处理数据的拉取进度,例如是数据分片的拉取进度,相应地,数据模块200在对任务的处理数据重新分片时可以根据任务的处理数据的拉取进度,确定剩余数据,然后对剩余数据重新分片获得至少一个数据分片。该方法通过记录每个数据分片(如shard)的拉取进度,可以保障数据拉取的不重复。In some possible implementations, the data module 200 may also record the progress of pulling the processing data of the task, such as the progress of pulling the data shards. Accordingly, when the data module 200 re-shards the processing data of the task, it may determine the remaining data according to the progress of pulling the processing data of the task, and then re-shard the remaining data to obtain at least one data shard. By recording the progress of pulling each data shard (such as shard), this method can ensure that data pulling is not repeated.

在一些可能的实现方式中,数据模块200还可以向任务分片发送心跳消息,该心跳消息用于检测任务分片的活性。其中,任务分片的活性用于表征任务分片的当前状态,例如是正常运行状态或者是失活状态。相应地,数据模块200还可以根据任务分片的活性,调整数据分片。例如,数据模块200检测到任务分片失活,可以重新调整数据分片,缩减数据分片的数量。该方法中,数据模块200和任务分片之间维持心跳,通过监测保证任务分片分配的shard实时更新,保证数据拉取的完整性。In some possible implementations, the data module 200 can also send a heartbeat message to the task slice, and the heartbeat message is used to detect the activity of the task slice. Among them, the activity of the task slice is used to characterize the current state of the task slice, such as a normal operating state or an inactivated state. Correspondingly, the data module 200 can also adjust the data slice according to the activity of the task slice. For example, if the data module 200 detects that the task slice is inactivated, the data slice can be readjusted to reduce the number of data slices. In this method, a heartbeat is maintained between the data module 200 and the task slice, and the shard assigned to the task slice is guaranteed to be updated in real time through monitoring to ensure the integrity of data pulling.

需要说明的是,上述S308为本申请实施例的可选步骤,执行本申请实施例的任务调度方法可以不执行上述It should be noted that the above S308 is an optional step of the embodiment of the present application. The task scheduling method of the embodiment of the present application may not execute the above step.

基于上述内容描述,本申请提供一种任务调度方法。该方法通过在接收到任务,将任务或任务的任务分片调度到分布式集群中的第一执行节点后,在第一执行节点执行任务或任务分片的过程中,接收第一执行节点根据该节点上任务或任务分片的负载生成并发送的分片调整请求,根据分片调整请求在分布式集群调整任务的任务分片,从而实现对运行态的任务或任务分片的负载进行监控,根据任务或任务分片的负载自适应调整任务或任务分片的数量,进而可以动态调整任务的资源使用,使得整体的集群资源达到负载均衡,避免执行节点被单一任务占满,造成任务失败或者节点宕机的风险,提高可靠性和可用性。Based on the above description, the present application provides a task scheduling method. The method receives a task and schedules the task or the task slice of the task to the first execution node in the distributed cluster. During the process of the first execution node executing the task or the task slice, the first execution node receives a slice adjustment request generated and sent according to the load of the task or the task slice on the node, and adjusts the task slice of the task in the distributed cluster according to the slice adjustment request, thereby monitoring the load of the running task or the task slice, adaptively adjusting the number of tasks or task slices according to the load of the task or the task slice, and then dynamically adjusting the resource usage of the task, so that the overall cluster resources are load balanced, avoiding the execution node being occupied by a single task, causing the risk of task failure or node downtime, and improving reliability and availability.

下面结合附图对任务分片自适应调整的流程进行说明。如图6所示,任务分片自适应调整可以分为任务调度、运行监测、动态调整多个阶段。The process of task slice adaptive adjustment is described below with reference to the accompanying drawings. As shown in Figure 6, task slice adaptive adjustment can be divided into multiple stages: task scheduling, operation monitoring, and dynamic adjustment.

在任务调度阶段,调度节点100接收到新任务,如task1和task2,可以根据调度策略,将新任务调度到资源相对富裕的执行节点运行。在图6的示例中,调度节点100可以将task1调度到worker1,将task2调度到worker2。In the task scheduling stage, the scheduling node 100 receives new tasks, such as task1 and task2, and can schedule the new tasks to execution nodes with relatively abundant resources according to the scheduling strategy. In the example of Figure 6, the scheduling node 100 can schedule task1 to worker1 and task2 to worker2.

在运行阶段,任务每拉取一批处理数据,worker可以判断任务的资源负载情况。需要说明的是,当任务被分为任务分片调度至不同worker时,worker可以判断任务的任务分片的资源负载情况。During the running phase, each time a task pulls a batch of processing data, the worker can determine the resource load of the task. It should be noted that when a task is divided into task slices and scheduled to different workers, the worker can determine the resource load of the task slices.

在动态调整阶段,worker可以根据监测的任务负载,在任务负载超过所设置的阈值,例如超过第一阈值时,向调度节点100发送分片调整请求,以请求增加任务分片。新增的任务分片可以触发数据模块200进行rebalance。新增的任务分片可以从rebalance后的数据分片进行数据拉取,分摊已有任务的资源负载,达到降低已有任务所在worker资源占用的目的,当任务负载低于所设置的阈值,例如是低于第二阈值时,worker可以向master发送分片调整请求,以请求缩减任务分片,当分布式集群中该任务的任务分片不唯一时,调度节点100可以进行任务分片缩减,剩余任务分片分摊该任务分片对应的数据分片。In the dynamic adjustment stage, the worker can send a shard adjustment request to the scheduling node 100 to request the addition of task shards based on the monitored task load when the task load exceeds the set threshold, for example, exceeds the first threshold. The newly added task shards can trigger the data module 200 to rebalance. The newly added task shards can pull data from the rebalanced data shards to share the resource load of the existing tasks, so as to reduce the resource occupation of the worker where the existing tasks are located. When the task load is lower than the set threshold, for example, lower than the second threshold, the worker can send a shard adjustment request to the master to request the reduction of the task shards. When the task shards of the task in the distributed cluster are not unique, the scheduling node 100 can reduce the task shards, and the remaining task shards share the data shards corresponding to the task shards.

为了使得本申请的技术方案更加清楚、易于理解,本申请还提供了应用场景进行示例说明。In order to make the technical solution of the present application clearer and easier to understand, the present application also provides application scenarios for example illustration.

参见图7所示的一种任务调度方法应用于日志加工场景的示意图,该场景中,任务可以是日志加工任务,用户可以创建日志加工任务,调度节点100可以根据初始任务要求,将task1分为2个任务分片,具体为task1-1和task1-2,并调度两个任务分片,分别分配在worker1和worker2上。两个任务分片分别 在日志拉取模块拉取对应日志1(如log1日志流)的数据,中,task1-1拉取0-2shard上的日志数据,task1-2拉取3-5shard上的日志数据,将日志数据输入到函数(function)模块按用户所设规则进行加工,输出加工好的日志。其中,用户所设规则可以是基于领域特定语言(Domain-specific language,DSL)的规则。领域特定语言是在特定领域下用于特定上下文的语言,通常针对特定类问题优化,包含更高级抽象的编程语言。DSL可以使用来自专业或领域的概念和规则,对日志数据进行加工,输出加工好的日志。Referring to FIG. 7 , a schematic diagram of a task scheduling method applied to a log processing scenario is shown. In this scenario, the task may be a log processing task. The user can create a log processing task. The scheduling node 100 can divide task1 into two task slices, specifically task1-1 and task1-2, according to the initial task requirements, and schedule the two task slices, which are respectively allocated to worker1 and worker2. In the log pulling module, the data of the corresponding log 1 (such as log1 log stream) is pulled. Task 1-1 pulls the log data on shard 0-2, and task 1-2 pulls the log data on shard 3-5. The log data is input into the function module for processing according to the rules set by the user, and the processed log is output. The rules set by the user can be rules based on a domain-specific language (DSL). A domain-specific language is a language used in a specific context in a specific domain, usually optimized for a specific class of problems, and includes a higher-level abstract programming language. DSL can use concepts and rules from a profession or domain to process log data and output processed logs.

在日志加工任务运行过程中,当监测到任务分片task1-1的负载(或占用资源)超过所设置的阈值(例如是第一阈值),自动请求调度节点100增加任务1(task1)的分片,调度节点100将任务分片task1-3调度到worker3上,日志拉取模块监测到新加入的log1对应的任务分片,重新rebalance,将0-1shard分配task1-1,2-3shard分配至task1-2,4-5shard分配至task1-3,从而达到减轻task1-1所占资源负载,整体资源相对均衡的目的。During the operation of the log processing task, when it is monitored that the load (or occupied resources) of the task shard task1-1 exceeds the set threshold (for example, the first threshold), the scheduling node 100 is automatically requested to increase the shard of task 1 (task1), and the scheduling node 100 schedules the task shard task1-3 to worker3. The log pulling module monitors the task shard corresponding to the newly added log1, and rebalances, assigning 0-1shard to task1-1, 2-3shard to task1-2, and 4-5shard to task1-3, thereby achieving the purpose of reducing the resource load occupied by task1-1 and relatively balancing the overall resources.

基于前述的任务调度方法,本申请还提供一种调度系统。如图2所示,该调度系统10包括:Based on the above-mentioned task scheduling method, the present application also provides a scheduling system. As shown in FIG2 , the scheduling system 10 includes:

调度节点100,用于接收调度任务,将任务或任务的任务分片调度至分布式集群中的第一执行节点,第一执行节点为分布式集群中的至少一个执行节点,在第一执行节点执行任务或任务分片的过程中,接收第一执行节点发送的分片调整请求,分片调整请求用于请求增加任务的任务分片或缩减任务的任务分片,分片调整请求为第一执行节点根据第一执行节点上任务或任务的任务分片的负载生成;The scheduling node 100 is used to receive a scheduling task, schedule the task or the task slice of the task to the first execution node in the distributed cluster, the first execution node is at least one execution node in the distributed cluster, and during the process of the first execution node executing the task or the task slice, receive a slice adjustment request sent by the first execution node, the slice adjustment request is used to request to increase the task slice of the task or reduce the task slice of the task, and the slice adjustment request is generated by the first execution node according to the load of the task or the task slice of the task on the first execution node;

调度节点100,还用于根据分片调整请求,在分布式集群调整任务的任务分片。其中,调整任务的任务分片包括:在第二执行节点增加任务的任务分片,或者缩减第一执行节点中任务的任务分片。The scheduling node 100 is further used to adjust the task slices of the task in the distributed cluster according to the slice adjustment request, wherein adjusting the task slices of the task includes: increasing the task slices of the task in the second execution node, or reducing the task slices of the task in the first execution node.

上述调度节点100可以通过软件实现,或者通过硬件实现。当通过硬件实现时,调度节点100可以包括至少一个计算设备,如服务器。当通过软件实现时,调度节点100可以是运行在计算设备上的应用程序,例如是虚拟化后的应用程序。The above-mentioned scheduling node 100 can be implemented by software or by hardware. When implemented by hardware, the scheduling node 100 may include at least one computing device, such as a server. When implemented by software, the scheduling node 100 may be an application running on a computing device, such as a virtualized application.

进一步地,参见图8,该调度节点100可以包括如下功能模块:Further, referring to FIG8 , the scheduling node 100 may include the following functional modules:

交互模块102,用于接收调度任务;Interaction module 102, used for receiving scheduling tasks;

调度模块104,用于将任务或任务的任务分片调度至分布式集群中的第一执行节点,第一执行节点为分布式集群中的至少一个执行节点;A scheduling module 104, configured to schedule a task or a task slice of a task to a first execution node in a distributed cluster, where the first execution node is at least one execution node in the distributed cluster;

交互模块102,还用于在第一执行节点执行任务或任务分片的过程中,接收第一执行节点发送的分片调整请求,分片调整请求用于请求增加任务的任务分片或缩减任务的任务分片,分片调整请求为第一执行节点根据第一执行节点上任务或任务的任务分片的负载生成;The interaction module 102 is further used to receive a slicing adjustment request sent by the first execution node during the process of the first execution node executing the task or the task slice, the slicing adjustment request is used to request to increase the task slice of the task or reduce the task slice of the task, and the slicing adjustment request is generated by the first execution node according to the load of the task or the task slice of the task on the first execution node;

调整模块106,用于根据分片调整请求,在分布式集群调整任务的任务分片。The adjustment module 106 is used to adjust the task slices of the task in the distributed cluster according to the slice adjustment request.

其中,交互模块102、调度模块104或调整模块106可以通过软件实现,或者通过硬件实现。The interaction module 102, the scheduling module 104 or the adjustment module 106 may be implemented by software or by hardware.

当通过软件实现时,交互模块102、调度模块104或调整模块106可以是运行在计算设备上的应用程序,如计算引擎等。该应用程序可以通过虚拟化服务提供给用户使用。虚拟化服务可以包括虚拟机(virtual machine,VM)服务、裸金属服务器(bare metal server,BMS)服务以及容器(container)服务。其中,VM服务可以是通过虚拟化技术在多个物理主机上虚拟出虚拟机(virtual machine,VM)资源池以为用户按需提供VM进行使用的服务。BMS服务是在多个物理主机上虚拟出BMS资源池以为用户按需提供BMS进行使用的服务。容器服务是在多个物理主机上虚拟出容器资源池以为用户按需提供容器进行使用的服务。VM是模拟出来的一台虚拟的计算机,也即逻辑上的一台计算机。BMS是一种可弹性伸缩的高性能计算服务,计算性能与传统物理机无差别,具有安全物理隔离的特点。容器是一种内核虚拟化技术,可以提供轻量级的虚拟化,以达到隔离用户空间、进程和资源的目的。应理解,上述虚拟化服务中的VM服务、BMS服务以及容器服务仅仅是作为具体的事例,在实际应用中,虚拟化服务还可以其他轻量级或者重量级的虚拟化服务,此处不作具体限定。When implemented by software, the interaction module 102, the scheduling module 104 or the adjustment module 106 may be an application running on a computing device, such as a computing engine. The application may be provided to users through virtualization services. Virtualization services may include virtual machine (VM) services, bare metal server (BMS) services and container services. Among them, VM services may be services that use virtualization technology to virtualize virtual machine (VM) resource pools on multiple physical hosts to provide users with VMs on demand for use. BMS services are services that virtualize BMS resource pools on multiple physical hosts to provide users with BMS on demand for use. Container services are services that virtualize container resource pools on multiple physical hosts to provide users with containers on demand for use. VM is a simulated virtual computer, that is, a logical computer. BMS is a high-performance computing service that can be elastically scalable, and its computing performance is no different from that of a traditional physical machine, and it has the characteristics of secure physical isolation. Containers are a kernel virtualization technology that can provide lightweight virtualization to achieve the purpose of isolating user space, processes and resources. It should be understood that the VM service, BMS service and container service in the above-mentioned virtualization services are only specific examples. In actual applications, virtualization services can also be other lightweight or heavyweight virtualization services, which are not specifically limited here.

当通过硬件实现时,交互模块102、调度模块104或调整模块106中可以包括至少一个计算设备,如服务器等。或者,交互模块102、调度模块104或调整模块106也可以是利用专用集成电路(application-specific integrated circuit,ASIC)实现、或可编程逻辑器件(programmable logic device,PLD)实现的设备等。其中,上述PLD可以是复杂程序逻辑器件(complex programmable logical device,CPLD)、现场可编程门阵列(field-programmable gate array,FPGA)、通用阵列逻辑(generic array logic,GAL) 或其任意组合实现。When implemented by hardware, the interaction module 102, the scheduling module 104 or the adjustment module 106 may include at least one computing device, such as a server, etc. Alternatively, the interaction module 102, the scheduling module 104 or the adjustment module 106 may also be implemented by an application-specific integrated circuit (ASIC) or a programmable logic device (PLD). The PLD may be a complex programmable logical device (CPLD), a field-programmable gate array (FPGA), a generic array logic (GAL), or a FPGA. or any combination thereof.

在一些可能的实现方式中,第一执行节点上任务或任务的任务分片的负载大于第一阈值时,分片调整请求用于请求增加任务的任务分片;或者,第一执行节点上任务的目标任务分片的负载小于第二阈值时,分片调整请求用于缩减任务的目标任务分片。In some possible implementations, when the load of a task or a task slice of a task on a first execution node is greater than a first threshold, a slice adjustment request is used to request an increase in the task slice of the task; or, when the load of a target task slice of a task on the first execution node is less than a second threshold, a slice adjustment request is used to reduce the target task slice of the task.

在一些可能的实现方式中,调度系统10包括调度节点100和数据模块200,数据模块200具体用于:In some possible implementations, the scheduling system 10 includes a scheduling node 100 and a data module 200, where the data module 200 is specifically used for:

检测到任务的任务分片发生调整时,将任务的处理数据重新分片获得至少一个数据分片,至少一个数据分片与调整后的任务分片一一对应。When it is detected that the task slice of the task is adjusted, the processing data of the task is re-sliced to obtain at least one data slice, and the at least one data slice corresponds to the adjusted task slice one by one.

与交互模块102、调度模块104或调整模块106类似,数据模块200可以通过软件实现,或者通过硬件实现。Similar to the interaction module 102 , the scheduling module 104 or the adjustment module 106 , the data module 200 may be implemented by software or by hardware.

当通过软件实现时,数据模块200可以是运行在计算设备上的应用程序,如计算引擎等。该应用程序可以通过VM服务、BMS服务或container服务等虚拟化服务提供给用户使用。当通过硬件实现时,数据模块200中可以包括至少一个计算设备,如服务器等。或者,数据模块200也可以是利用ASIC实现、或PLD实现的设备等。When implemented by software, the data module 200 may be an application running on a computing device, such as a computing engine. The application may be provided to users through virtualization services such as VM services, BMS services, or container services. When implemented by hardware, the data module 200 may include at least one computing device, such as a server. Alternatively, the data module 200 may also be a device implemented by ASIC or PLD.

在一些可能的实现方式中,数据模块200具体用于:In some possible implementations, the data module 200 is specifically used to:

根据调整后的任务分片的数量,通过负载均衡,将任务的处理数据重新分片获得至少一个数据分片。According to the adjusted number of task shards, the task processing data is re-sharded through load balancing to obtain at least one data shard.

在一些可能的实现方式中,数据模块200还用于:In some possible implementations, the data module 200 is further used to:

记录任务的处理数据的拉取进度;Record the progress of pulling the task's processing data;

数据模块200具体用于:The data module 200 is specifically used for:

根据任务的处理数据的拉取进度,确定剩余数据;Determine the remaining data based on the progress of pulling the processing data of the task;

对剩余数据重新分片获得至少一个数据分片。The remaining data is re-sharded to obtain at least one data shard.

在一些可能的实现方式中,数据模块200还用于:In some possible implementations, the data module 200 is further used to:

向任务分片发送心跳消息,心跳消息用于检测任务分片的活性;Send heartbeat messages to task shards. Heartbeat messages are used to detect the activity of task shards.

根据任务分片的活性,调整数据分片。Adjust data sharding based on the activity of task sharding.

在一些可能的实现方式中,任务或任务分片装配在数据体中,数据体记录运行的任务或任务分片的负载以及任务或任务分片的标识。In some possible implementations, tasks or task slices are assembled in a data body, and the data body records the load of the running tasks or task slices and the identification of the tasks or task slices.

在一些可能的实现方式中,任务包括日志加工任务。In some possible implementations, the task includes a log processing task.

本申请还提供一种计算设备900。如图9所示,计算设备900包括:总线902、处理器904、存储器906和通信接口908。处理器904、存储器906和通信接口908之间通过总线902通信。计算设备900可以是服务器或终端设备。应理解,本申请不限定计算设备900中的处理器、存储器的个数。The present application also provides a computing device 900. As shown in FIG9 , the computing device 900 includes: a bus 902, a processor 904, a memory 906, and a communication interface 908. The processor 904, the memory 906, and the communication interface 908 communicate with each other through the bus 902. The computing device 900 can be a server or a terminal device. It should be understood that the present application does not limit the number of processors and memories in the computing device 900.

总线902可以是外设部件互连标准(peripheral component interconnect,PCI)总线或扩展工业标准结构(extended industry standard architecture,EISA)总线等。总线可以分为地址总线、数据总线、控制总线等。为便于表示,图9中仅用一条线表示,但并不表示仅有一根总线或一种类型的总线。总线902可包括在计算设备900各个部件(例如,存储器906、处理器904、通信接口908)之间传送信息的通路。The bus 902 may be a peripheral component interconnect (PCI) bus or an extended industry standard architecture (EISA) bus, etc. The bus may be divided into an address bus, a data bus, a control bus, etc. For ease of representation, FIG. 9 is represented by only one line, but does not mean that there is only one bus or one type of bus. The bus 902 may include a path for transmitting information between various components of the computing device 900 (e.g., the memory 906, the processor 904, and the communication interface 908).

处理器904可以包括中央处理器(central processing unit,CPU)、图形处理器(graphics processing unit,GPU)、微处理器(micro processor,MP)或者数字信号处理器(digital signal processor,DSP)等处理器中的任意一种或多种。Processor 904 may include any one or more of a central processing unit (CPU), a graphics processing unit (GPU), a microprocessor (MP) or a digital signal processor (DSP).

存储器906可以包括易失性存储器(volatile memory),例如随机存取存储器(random access memory,RAM)。存储器906还可以包括非易失性存储器(non-volatile memory),例如只读存储器(read-only memory,ROM),快闪存储器,机械硬盘(hard disk drive,HDD)或固态硬盘(solid state drive,SSD)。存储器906中存储有可执行的程序代码,处理器904执行该可执行的程序代码以实现前述任务调度方法。具体的,存储器906上存有调度系统10用于执行任务调度方法的指令。例如,存储器906可以存储调度节点100中交互模块102、调度模块104、调整模块106的指令,进一步地,存储器906还可以存储数据模块200的指令。The memory 906 may include a volatile memory (volatile memory), such as a random access memory (RAM). The memory 906 may also include a non-volatile memory (non-volatile memory), such as a read-only memory (ROM), a flash memory, a hard disk drive (HDD) or a solid state drive (SSD). The memory 906 stores executable program code, and the processor 904 executes the executable program code to implement the aforementioned task scheduling method. Specifically, the memory 906 stores instructions for the scheduling system 10 to execute the task scheduling method. For example, the memory 906 may store instructions of the interaction module 102, the scheduling module 104, and the adjustment module 106 in the scheduling node 100. Further, the memory 906 may also store instructions of the data module 200.

通信接口908使用例如但不限于网络接口卡、收发器一类的收发模块,来实现计算设备900与 其他设备或通信网络之间的通信。The communication interface 908 uses a transceiver module such as, but not limited to, a network interface card or a transceiver to implement communication between the computing device 900 and the computing device 900. Communications between other devices or communications networks.

本申请实施例还提供了一种计算设备集群。该计算设备集群包括至少一台计算设备。该计算设备可以是服务器,例如是中心服务器、边缘服务器,或者是本地数据中心中的本地服务器。在一些实施例中,计算设备也可以是台式机、笔记本电脑或者智能手机等终端设备。The embodiment of the present application also provides a computing device cluster. The computing device cluster includes at least one computing device. The computing device can be a server, such as a central server, an edge server, or a local server in a local data center. In some embodiments, the computing device can also be a terminal device such as a desktop computer, a laptop computer, or a smart phone.

如图10所示,计算设备集群包括至少一个计算设备900。计算设备集群中的一个或多个计算设备900中的存储器906中可以存有相同的调度系统10用于执行任务调度方法的指令。As shown in Fig. 10, the computing device cluster includes at least one computing device 900. The memory 906 in one or more computing devices 900 in the computing device cluster may store the same instructions of the scheduling system 10 for executing the task scheduling method.

在一些可能的实现方式中,该计算设备集群中的一个或多个计算设备900也可以用于执行调度系统10用于执行任务调度方法的部分指令。换言之,一个或多个计算设备900的组合可以共同执行调度系统10用于执行任务调度方法的指令。In some possible implementations, one or more computing devices 900 in the computing device cluster may also be used to execute some instructions of the scheduling system 10 for executing the task scheduling method. In other words, a combination of one or more computing devices 900 may jointly execute instructions of the scheduling system 10 for executing the task scheduling method.

需要说明的是,计算设备集群中的不同的计算设备900中的存储器906可以存储不同的指令,用于执行调度系统10的部分功能。It should be noted that the memory 906 in different computing devices 900 in the computing device cluster can store different instructions for executing partial functions of the scheduling system 10 .

图11示出了一种可能的实现方式。如图11所示,两个计算设备900A和900B通过通信接口908实现连接。计算设备900A中的存储器上存有用于执行调度节点100的功能的指令,例如存储器上存有用于执行交互模块102、调度模块104和调整模块106的功能的指令。进一步地,计算设备900B中的存储器上存有用于执行数据模块200的功能的指令。换言之,计算设备900A和900B的存储器906共同存储了调度系统10用于执行任务调度方法的指令。FIG11 shows a possible implementation. As shown in FIG11 , two computing devices 900A and 900B are connected via a communication interface 908. The memory in the computing device 900A stores instructions for executing the functions of the scheduling node 100, for example, the memory stores instructions for executing the functions of the interaction module 102, the scheduling module 104, and the adjustment module 106. Further, the memory in the computing device 900B stores instructions for executing the functions of the data module 200. In other words, the memory 906 of the computing devices 900A and 900B jointly stores instructions for the scheduling system 10 to execute the task scheduling method.

图11所示的计算设备集群之间的连接方式可以是考虑到本申请提供的任务调度方法需要较多资源检测任务分片是否调整。因此,考虑将数据模块200实现的功能交由计算设备900B执行。The connection mode between the computing device clusters shown in Figure 11 may be considered to be that the task scheduling method provided by the present application requires more resources to detect whether the task slices are adjusted. Therefore, it is considered to hand over the functions implemented by the data module 200 to the computing device 900B for execution.

应理解,图11中示出的计算设备900A的功能也可以由多个计算设备900完成。同样,计算设备900B的功能也可以由多个计算设备900完成。It should be understood that the functions of the computing device 900A shown in FIG11 may also be completed by multiple computing devices 900. Similarly, the functions of the computing device 900B may also be completed by multiple computing devices 900.

在一些可能的实现方式中,计算设备集群中的一个或多个计算设备可以通过网络连接。其中,所述网络可以是广域网或局域网等等。图12示出了一种可能的实现方式。如图12所示,两个计算设备900C和900D之间通过网络进行连接。具体地,通过各个计算设备中的通信接口与所述网络进行连接。在这一类可能的实现方式中,计算设备900C中的存储器906中存有执行调度节点100的功能的指令,例如是存储执行交互模块102、调度模块104和调整模块106的功能的指令。同时,计算设备900D中的存储器906中存有执行数据模块200的功能的指令。In some possible implementations, one or more computing devices in the computing device cluster can be connected via a network. The network may be a wide area network or a local area network, etc. FIG. 12 shows a possible implementation. As shown in FIG. 12 , two computing devices 900C and 900D are connected via a network. Specifically, the network is connected via a communication interface in each computing device. In this type of possible implementation, the memory 906 in the computing device 900C stores instructions for executing the functions of the scheduling node 100, such as instructions for storing the functions of the interaction module 102, the scheduling module 104, and the adjustment module 106. At the same time, the memory 906 in the computing device 900D stores instructions for executing the functions of the data module 200.

图12所示的计算设备集群之间的连接方式可以是考虑到本申请提供的任务调度方法需要耗费较多资源进行任务分片检测,因此考虑将数据模块200实现的功能交由计算设备900D执行。The connection method between the computing device clusters shown in Figure 12 can be considered to be that the task scheduling method provided in this application requires more resources to perform task slicing detection, so it is considered to hand over the functions implemented by the data module 200 to the computing device 900D for execution.

应理解,图12中示出的计算设备900C的功能也可以由多个计算设备900完成。同样,计算设备900D的功能也可以由多个计算设备900完成。It should be understood that the functions of the computing device 900C shown in FIG12 may also be completed by multiple computing devices 900. Similarly, the functions of the computing device 900D may also be completed by multiple computing devices 900.

本申请实施例还提供了一种计算机可读存储介质。所述计算机可读存储介质可以是计算设备能够存储的任何可用介质或者是包含一个或多个可用介质的数据中心等数据存储设备。所述可用介质可以是磁性介质,(例如,软盘、硬盘、磁带)、光介质(例如,DVD)、或者半导体介质(例如固态硬盘)等。该计算机可读存储介质包括指令,所述指令指示计算设备执行上述应用于调度系统10的任务调度方法。The embodiment of the present application also provides a computer-readable storage medium. The computer-readable storage medium can be any available medium that can be stored by the computing device or a data storage device such as a data center containing one or more available media. The available medium can be a magnetic medium (e.g., a floppy disk, a hard disk, a tape), an optical medium (e.g., a DVD), or a semiconductor medium (e.g., a solid-state hard disk). The computer-readable storage medium includes instructions that instruct the computing device to execute the above-mentioned task scheduling method applied to the scheduling system 10.

本申请实施例还提供了一种包含指令的计算机程序产品。所述计算机程序产品可以是包含指令的,能够运行在计算设备上或被储存在任何可用介质中的软件或程序产品。当所述计算机程序产品在至少一个计算设备上运行时,使得至少一个计算设备执行上述任务调度方法。The embodiment of the present application also provides a computer program product including instructions. The computer program product may be software or a program product including instructions that can be run on a computing device or stored in any available medium. When the computer program product is run on at least one computing device, the at least one computing device executes the above-mentioned task scheduling method.

最后应说明的是:以上实施例仅用以说明本发明的技术方案,而非对其限制;尽管参照前述实施例对本发明进行了详细的说明,本领域的普通技术人员应当理解:其依然可以对前述各实施例所记载的技术方案进行修改,或者对其中部分技术特征进行等同替换;而这些修改或者替换,并不使相应技术方案的本质脱离本发明各实施例技术方案的保护范围。 Finally, it should be noted that the above embodiments are only used to illustrate the technical solutions of the present invention, rather than to limit it. Although the present invention has been described in detail with reference to the aforementioned embodiments, those skilled in the art should understand that they can still modify the technical solutions described in the aforementioned embodiments, or make equivalent replacements for some of the technical features therein. However, these modifications or replacements do not cause the essence of the corresponding technical solutions to deviate from the protection scope of the technical solutions of the embodiments of the present invention.

Claims (19)

一种任务调度方法,其特征在于,所述方法包括:A task scheduling method, characterized in that the method comprises: 调度系统接收任务;The scheduling system receives the task; 所述调度系统将所述任务或所述任务的任务分片调度至分布式集群中的第一执行节点,所述第一执行节点为所述分布式集群中的至少一个执行节点;The scheduling system schedules the task or the task slice of the task to a first execution node in the distributed cluster, where the first execution node is at least one execution node in the distributed cluster; 在所述第一执行节点执行所述任务或所述任务分片的过程中,所述调度系统接收所述第一执行节点发送的分片调整请求,所述分片调整请求用于请求增加所述任务的任务分片或缩减所述任务的任务分片,所述分片调整请求为所述第一执行节点根据所述第一执行节点上所述任务或所述任务分片的负载生成;During the process of the first execution node executing the task or the task slice, the scheduling system receives a slice adjustment request sent by the first execution node, the slice adjustment request is used to request to increase the task slice of the task or reduce the task slice of the task, and the slice adjustment request is generated by the first execution node according to the load of the task or the task slice on the first execution node; 所述调度系统根据所述分片调整请求,在所述分布式集群调整所述任务的任务分片;The scheduling system adjusts the task slice of the task in the distributed cluster according to the slice adjustment request; 其中,所述调整所述任务的任务分片包括:在第二执行节点增加所述任务的任务分片,或者缩减所述第一执行节点中所述任务的任务分片。The adjusting of the task slices of the task includes: increasing the task slices of the task in the second execution node, or reducing the task slices of the task in the first execution node. 根据权利要求1所述的方法,其特征在于,所述第一执行节点上所述任务或所述任务分片的负载大于第一阈值时,所述分片调整请求用于请求增加所述任务的任务分片;或者,所述第一执行节点上所述任务的目标任务分片的负载小于第二阈值时,所述分片调整请求用于缩减所述任务的所述目标任务分片。The method according to claim 1 is characterized in that when the load of the task or the task slice on the first execution node is greater than a first threshold, the slice adjustment request is used to request to increase the task slice of the task; or, when the load of the target task slice of the task on the first execution node is less than a second threshold, the slice adjustment request is used to reduce the target task slice of the task. 根据权利要求1或2所述的方法,其特征在于,所述调度系统包括调度节点和数据模块,所述方法还包括:The method according to claim 1 or 2, characterized in that the scheduling system includes a scheduling node and a data module, and the method further includes: 所述数据模块检测到所述任务的任务分片发生调整时,将所述任务的处理数据重新分片获得至少一个数据分片,所述至少一个数据分片与调整后的任务分片一一对应。When the data module detects that the task slice of the task is adjusted, the processing data of the task is re-sliced to obtain at least one data slice, and the at least one data slice corresponds to the adjusted task slice one by one. 根据权利要求3所述的方法,其特征在于,所述数据模块将所述任务的处理数据重新分片获得至少一个数据分片,包括:The method according to claim 3, characterized in that the data module re-shards the processing data of the task to obtain at least one data shard, comprising: 所述数据模块根据所述调整后的任务分片的数量,通过负载均衡,将所述任务的处理数据重新分片获得至少一个数据分片。The data module re-slices the processing data of the task to obtain at least one data slice through load balancing according to the adjusted number of task slices. 根据权利要求3所述的方法,其特征在于,所述方法还包括:The method according to claim 3, characterized in that the method further comprises: 所述数据模块记录所述任务的处理数据的拉取进度;The data module records the progress of pulling the processing data of the task; 所述数据模块将所述任务的处理数据重新分片获得至少一个数据分片,包括:The data module re-shards the processed data of the task to obtain at least one data shard, including: 所述数据模块根据所述任务的处理数据的拉取进度,确定剩余数据;The data module determines the remaining data according to the pulling progress of the processing data of the task; 所述数据模块对所述剩余数据重新分片获得至少一个数据分片。The data module re-shards the remaining data to obtain at least one data shard. 根据权利要求3至5任一项所述的方法,其特征在于,所述还包括:The method according to any one of claims 3 to 5, characterized in that it also includes: 所述数据模块向所述任务分片发送心跳消息,所述心跳消息用于检测所述任务分片的活性;The data module sends a heartbeat message to the task slice, and the heartbeat message is used to detect the activity of the task slice; 所述数据模块根据所述任务分片的活性,调整所述数据分片。The data module adjusts the data slice according to the activity of the task slice. 根据权利要求1至6任一项所述的方法,其特征在于,所述任务或所述任务分片装配在数据体中,所述数据体记录运行的所述任务或所述任务分片的负载以及所述任务或所述任务分片的标识。The method according to any one of claims 1 to 6 is characterized in that the task or the task slice is assembled in a data body, and the data body records the load of the running task or the task slice and the identification of the task or the task slice. 根据权利要求1至7任一项所述的方法,其特征在于,所述任务包括日志加工任务。The method according to any one of claims 1 to 7, characterized in that the task comprises a log processing task. 一种调度系统,其特征在于,所述调度系统包括:A scheduling system, characterized in that the scheduling system comprises: 调度节点,用于接收调度任务,将所述任务或所述任务的任务分片调度至分布式集群中的第一执行节点,所述第一执行节点为所述分布式集群中的至少一个执行节点,在所述第一执行节点执行所述任务或所述任务分片的过程中,接收所述第一执行节点发送的分片调整请求,所述分片调整请求用于请求增加所述任务的任务分片或缩减所述任务的任务分片,所述分片调整请求为所述第一执行节点根据所述第一执行节点上所述任务或所述任务的任务分片的负载生成;A scheduling node, used for receiving a scheduling task, scheduling the task or the task slice of the task to a first execution node in a distributed cluster, the first execution node being at least one execution node in the distributed cluster, and receiving a slice adjustment request sent by the first execution node during the process of the first execution node executing the task or the task slice, the slice adjustment request being used to request to increase the task slice of the task or reduce the task slice of the task, the slice adjustment request being generated by the first execution node according to the load of the task or the task slice of the task on the first execution node; 所述调度节点,还用于根据所述分片调整请求,在所述分布式集群调整所述任务的任务分片;The scheduling node is further used to adjust the task slice of the task in the distributed cluster according to the slice adjustment request; 其中,所述调整所述任务的任务分片包括:在第二执行节点增加所述任务的任务分片,或者缩减所述第一执行节点中所述任务的任务分片。The adjusting of the task slices of the task includes: increasing the task slices of the task in the second execution node, or reducing the task slices of the task in the first execution node. 根据权利要求9所述的系统,其特征在于,所述第一执行节点上所述任务或所述任务的任务分片的负载大于第一阈值时,所述分片调整请求用于请求增加所述任务的任务分片;或者,所述第一执行 节点上所述任务的目标任务分片的负载小于第二阈值时,所述分片调整请求用于缩减所述任务的所述目标任务分片。The system according to claim 9 is characterized in that when the load of the task or the task slice of the task on the first execution node is greater than a first threshold, the slice adjustment request is used to request to increase the task slice of the task; or When the load of the target task slice of the task on the node is less than a second threshold, the slice adjustment request is used to reduce the target task slice of the task. 根据权利要求9或10所述的系统,其特征在于,所述调度系统包括所述调度节点和数据模块,所述数据模块具体用于:The system according to claim 9 or 10, characterized in that the scheduling system comprises the scheduling node and a data module, and the data module is specifically used for: 检测到所述任务的任务分片发生调整时,将所述任务的处理数据重新分片获得至少一个数据分片,所述至少一个数据分片与调整后的任务分片一一对应。When it is detected that the task slice of the task is adjusted, the processed data of the task is re-sliced to obtain at least one data slice, and the at least one data slice corresponds to the adjusted task slice one by one. 根据权利要求11所述的系统,其特征在于,所述数据模块具体用于:The system according to claim 11, characterized in that the data module is specifically used for: 根据所述调整后的任务分片的数量,通过负载均衡,将所述任务的处理数据重新分片获得至少一个数据分片。According to the adjusted number of task slices, the processing data of the task is re-sliced to obtain at least one data slice through load balancing. 根据权利要求11所述的系统,其特征在于,所述数据模块还用于:The system according to claim 11, characterized in that the data module is further used for: 记录所述任务的处理数据的拉取进度;Record the progress of pulling the processing data of the task; 所述数据模块具体用于:The data module is specifically used for: 根据所述任务的处理数据的拉取进度,确定剩余数据;Determining remaining data according to the progress of pulling the processing data of the task; 对所述剩余数据重新分片获得至少一个数据分片。The remaining data is re-sharded to obtain at least one data shard. 根据权利要求9至13任一项所述的系统,其特征在于,所述数据模块还用于:The system according to any one of claims 9 to 13, characterized in that the data module is further used for: 向所述任务分片发送心跳消息,所述心跳消息用于检测所述任务分片的活性;Sending a heartbeat message to the task slice, wherein the heartbeat message is used to detect the activity of the task slice; 根据所述任务分片的活性,调整所述数据分片。The data shards are adjusted according to the activity of the task shards. 根据权利要求9至14任一项所述的系统,其特征在于,所述任务或所述任务分片装配在数据体中,所述数据体记录运行的所述任务或所述任务分片的负载以及所述任务或所述任务分片的标识。The system according to any one of claims 9 to 14 is characterized in that the task or the task slice is assembled in a data body, and the data body records the load of the running task or the task slice and the identification of the task or the task slice. 根据权利要求9至15任一项所述的系统,其特征在于,所述任务包括日志加工任务。The system according to any one of claims 9 to 15, characterized in that the task comprises a log processing task. 一种计算设备集群,其特征在于,所述计算设备集群包括至少一台计算设备,所述至少一台计算设备包括至少一个处理器和至少一个存储器,所述至少一个存储器中存储有计算机可读指令;所述至少一个处理器执行所述计算机可读指令,以使得所述计算设备集群执行如权利要求1至8中任一项所述的任务调度方法。A computing device cluster, characterized in that the computing device cluster includes at least one computing device, the at least one computing device includes at least one processor and at least one memory, and the at least one memory stores computer-readable instructions; the at least one processor executes the computer-readable instructions so that the computing device cluster executes the task scheduling method described in any one of claims 1 to 8. 一种计算机可读存储介质,其特征在于,包括计算机可读指令;所述计算机可读指令用于实现权利要求1至8任一项所述的任务调度方法。A computer-readable storage medium, characterized in that it includes computer-readable instructions; the computer-readable instructions are used to implement the task scheduling method described in any one of claims 1 to 8. 一种计算机程序产品,其特征在于,包括计算机可读指令;所述计算机可读指令用于实现权利要求1至8任一项所述的任务调度方法。 A computer program product, characterized in that it includes computer-readable instructions; the computer-readable instructions are used to implement the task scheduling method described in any one of claims 1 to 8.
PCT/CN2024/109813 2023-12-05 2024-08-05 Task scheduling method and related device Pending WO2025118656A1 (en)

Applications Claiming Priority (4)

Application Number Priority Date Filing Date Title
CN202311677310.2 2023-12-05
CN202311677310 2023-12-05
CN202410284789.1A CN120104264A (en) 2023-12-05 2024-03-12 A task scheduling method and related equipment
CN202410284789.1 2024-03-12

Publications (1)

Publication Number Publication Date
WO2025118656A1 true WO2025118656A1 (en) 2025-06-12

Family

ID=95886352

Family Applications (1)

Application Number Title Priority Date Filing Date
PCT/CN2024/109813 Pending WO2025118656A1 (en) 2023-12-05 2024-08-05 Task scheduling method and related device

Country Status (2)

Country Link
CN (1) CN120104264A (en)
WO (1) WO2025118656A1 (en)

Citations (4)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US20140245298A1 (en) * 2013-02-27 2014-08-28 Vmware, Inc. Adaptive Task Scheduling of Hadoop in a Virtualized Environment
CN110597626A (en) * 2019-08-23 2019-12-20 第四范式(北京)技术有限公司 Method, device and system for allocating resources and tasks in a distributed system
CN111708627A (en) * 2020-06-22 2020-09-25 中国平安财产保险股份有限公司 Task scheduling method and device based on distributed scheduling framework
CN116069461A (en) * 2022-12-06 2023-05-05 兴业银行股份有限公司 Adaptive task scheduling method and system for dynamic sharding

Patent Citations (4)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US20140245298A1 (en) * 2013-02-27 2014-08-28 Vmware, Inc. Adaptive Task Scheduling of Hadoop in a Virtualized Environment
CN110597626A (en) * 2019-08-23 2019-12-20 第四范式(北京)技术有限公司 Method, device and system for allocating resources and tasks in a distributed system
CN111708627A (en) * 2020-06-22 2020-09-25 中国平安财产保险股份有限公司 Task scheduling method and device based on distributed scheduling framework
CN116069461A (en) * 2022-12-06 2023-05-05 兴业银行股份有限公司 Adaptive task scheduling method and system for dynamic sharding

Also Published As

Publication number Publication date
CN120104264A (en) 2025-06-06

Similar Documents

Publication Publication Date Title
US12481531B2 (en) Hyper-convergence with scheduler extensions for software-defined container storage solutions
US10944722B2 (en) Using activities to manage multi-tenant firewall configuration
US11258761B2 (en) Self-service firewall configuration
US9569245B2 (en) System and method for controlling virtual-machine migrations based on processor usage rates and traffic amounts
US9019826B2 (en) Hierarchical allocation of network bandwidth for quality of service
EP2698711A1 (en) Method for dispatching central processing unit of hotspot domain virtual machine and virtual machine system
US20070168525A1 (en) Method for improved virtual adapter performance using multiple virtual interrupts
US7681196B2 (en) Providing optimal number of threads to applications performing multi-tasking using threads
US20160266918A1 (en) Data assignment and data scheduling for physical machine in a virtual machine environment
US10579416B2 (en) Thread interrupt offload re-prioritization
CN116157778A (en) System and method for hybrid centralized and distributed scheduling on shared physical host
WO2025176140A1 (en) In-process resource control method and apparatus, and device and storage medium
US20170142202A1 (en) Filesystem i/o scheduler
US20250181385A1 (en) Latency service level agreement based scheduling of operating system threads at cloud services
US11954534B2 (en) Scheduling in a container orchestration system utilizing hardware topology hints
CN114448909A (en) OVS-based network card queue polling method, device, computer equipment and medium
WO2025118656A1 (en) Task scheduling method and related device
EP4621570A1 (en) Resource allocation method and apparatus based on cloud service
US20240086225A1 (en) Container group scheduling methods and apparatuses
US12164505B2 (en) Two-phase commit using reserved log sequence values
WO2024239710A1 (en) Deployment method and deployment apparatus for virtual network function
US10673937B2 (en) Dynamic record-level sharing (RLS) provisioning inside a data-sharing subsystem
CN117806802A (en) Task scheduling method based on containerized distributed system
CN116680044A (en) Task scheduling method, device, electronic device, and computer-readable storage medium
CN115701585A (en) Example Migration Method, Device and Related Equipment

Legal Events

Date Code Title Description
121 Ep: the epo has been informed by wipo that ep was designated in this application

Ref document number: 24899306

Country of ref document: EP

Kind code of ref document: A1