[go: up one dir, main page]

CN120822815A - A workflow orchestration system for operation and maintenance tasks in a distributed graph database - Google Patents

A workflow orchestration system for operation and maintenance tasks in a distributed graph database

Info

Publication number
CN120822815A
CN120822815A CN202511317559.1A CN202511317559A CN120822815A CN 120822815 A CN120822815 A CN 120822815A CN 202511317559 A CN202511317559 A CN 202511317559A CN 120822815 A CN120822815 A CN 120822815A
Authority
CN
China
Prior art keywords
task
maintenance
execution
maintenance task
graph
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Granted
Application number
CN202511317559.1A
Other languages
Chinese (zh)
Other versions
CN120822815B (en
Inventor
汪洋
叶小萌
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Hangzhou Yueshu Technology Co ltd
Original Assignee
Hangzhou Yueshu Technology Co ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Hangzhou Yueshu Technology Co ltd filed Critical Hangzhou Yueshu Technology Co ltd
Priority to CN202511317559.1A priority Critical patent/CN120822815B/en
Publication of CN120822815A publication Critical patent/CN120822815A/en
Application granted granted Critical
Publication of CN120822815B publication Critical patent/CN120822815B/en
Active legal-status Critical Current
Anticipated expiration legal-status Critical

Links

Classifications

    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06QINFORMATION AND COMMUNICATION TECHNOLOGY [ICT] SPECIALLY ADAPTED FOR ADMINISTRATIVE, COMMERCIAL, FINANCIAL, MANAGERIAL OR SUPERVISORY PURPOSES; SYSTEMS OR METHODS SPECIALLY ADAPTED FOR ADMINISTRATIVE, COMMERCIAL, FINANCIAL, MANAGERIAL OR SUPERVISORY PURPOSES, NOT OTHERWISE PROVIDED FOR
    • G06Q10/00Administration; Management
    • G06Q10/06Resources, workflows, human or project management; Enterprise or organisation planning; Enterprise or organisation modelling
    • G06Q10/063Operations research, analysis or management
    • G06Q10/0633Workflow analysis
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F16/00Information retrieval; Database structures therefor; File system structures therefor
    • G06F16/20Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
    • G06F16/27Replication, distribution or synchronisation of data between databases or within a distributed database system; Distributed database system architectures therefor
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F8/00Arrangements for software engineering
    • G06F8/60Software deployment
    • G06F8/65Updates
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F8/00Arrangements for software engineering
    • G06F8/70Software maintenance or management
    • G06F8/71Version control; Configuration management
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06QINFORMATION AND COMMUNICATION TECHNOLOGY [ICT] SPECIALLY ADAPTED FOR ADMINISTRATIVE, COMMERCIAL, FINANCIAL, MANAGERIAL OR SUPERVISORY PURPOSES; SYSTEMS OR METHODS SPECIALLY ADAPTED FOR ADMINISTRATIVE, COMMERCIAL, FINANCIAL, MANAGERIAL OR SUPERVISORY PURPOSES, NOT OTHERWISE PROVIDED FOR
    • G06Q10/00Administration; Management
    • G06Q10/10Office automation; Time management
    • G06Q10/103Workflow collaboration or project management
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06QINFORMATION AND COMMUNICATION TECHNOLOGY [ICT] SPECIALLY ADAPTED FOR ADMINISTRATIVE, COMMERCIAL, FINANCIAL, MANAGERIAL OR SUPERVISORY PURPOSES; SYSTEMS OR METHODS SPECIALLY ADAPTED FOR ADMINISTRATIVE, COMMERCIAL, FINANCIAL, MANAGERIAL OR SUPERVISORY PURPOSES, NOT OTHERWISE PROVIDED FOR
    • G06Q10/00Administration; Management
    • G06Q10/20Administration of product repair or maintenance

Landscapes

  • Engineering & Computer Science (AREA)
  • Business, Economics & Management (AREA)
  • Theoretical Computer Science (AREA)
  • Human Resources & Organizations (AREA)
  • Strategic Management (AREA)
  • Physics & Mathematics (AREA)
  • General Physics & Mathematics (AREA)
  • Entrepreneurship & Innovation (AREA)
  • General Engineering & Computer Science (AREA)
  • Economics (AREA)
  • Software Systems (AREA)
  • Quality & Reliability (AREA)
  • Marketing (AREA)
  • Operations Research (AREA)
  • Tourism & Hospitality (AREA)
  • General Business, Economics & Management (AREA)
  • Databases & Information Systems (AREA)
  • Data Mining & Analysis (AREA)
  • Computer Security & Cryptography (AREA)
  • Computing Systems (AREA)
  • Development Economics (AREA)
  • Educational Administration (AREA)
  • Game Theory and Decision Science (AREA)
  • Management, Administration, Business Operations System, And Electronic Commerce (AREA)

Abstract

本申请涉及一种分布式图数据库中运维任务工作流编排系统,其中,该系统包括:工作流编排层,用于对用户输入的工作流规范进行递归解析,以生成可执行的DAG运维任务图;任务管理层,用于采用两阶段提交协议来确保DAG运维任务图中各项运维任务在易失性内存与持久化存储中的原子一致性,在内存数据丢失的情况下,基于原子一致性从持久化存储中对未完成的运维任务进行故障恢复;条件任务执行层,用于创建安全隔离沙箱以供DAG运维任务图中各项运维任务的执行,在当前运维任务的执行过程中,基于动态更新的条件函数来评估任务执行结果,以确定后续运维任务的执行路径。通过本申请,解决了如何提高图数据库中运维任务的工作流效能的问题。

The present application relates to a workflow orchestration system for operation and maintenance tasks in a distributed graph database, wherein the system includes: a workflow orchestration layer for recursively parsing the workflow specification input by the user to generate an executable DAG operation and maintenance task graph; a task management layer for using a two-phase commit protocol to ensure the atomic consistency of each operation and maintenance task in the DAG operation and maintenance task graph in volatile memory and persistent storage, and in the event of memory data loss, performing fault recovery on unfinished operation and maintenance tasks from persistent storage based on atomic consistency; a conditional task execution layer for creating a secure isolation sandbox for the execution of each operation and maintenance task in the DAG operation and maintenance task graph, and during the execution of the current operation and maintenance task, evaluating the task execution result based on a dynamically updated conditional function to determine the execution path of subsequent operation and maintenance tasks. Through this application, the problem of how to improve the workflow efficiency of operation and maintenance tasks in a graph database is solved.

Description

Operation and maintenance task workflow arrangement system in distributed graph database
Technical Field
The application relates to the technical field of graph databases, in particular to an operation and maintenance task workflow arrangement system in a distributed graph database.
Background
In modern enterprise-level applications, distributed graph database systems typically require management of hundreds of service nodes, involving complex service dependencies and operational flows. In the process of the development and growth of enterprises, the requirement for large-scale graph data processing is also increased, the operation and maintenance management of a distributed graph database system faces more and more challenges, and the traditional task scheduling scheme has the following technical problems:
The method has the advantages that an execution path is difficult to dynamically select according to a real-time running state, so that task execution efficiency is low, dependency relationship among tasks is complex, effective management is difficult to perform in serial and parallel task mixed execution, task execution fails or overtime, and an intelligent abnormal recovery and state management mechanism is lacked. In addition, there are problems such as difficulty in efficiently managing task resource allocation and load balancing in a distributed environment, lack of real-time state tracking and visual management of task execution processes, and the like.
At present, no effective solution is proposed for improving the workflow efficiency of the operation and maintenance tasks in the graph database in the related art.
Disclosure of Invention
The embodiment of the application provides an operation and maintenance task workflow arrangement system in a distributed graph database, which at least solves the problem of how to improve the workflow efficiency of operation and maintenance tasks in the graph database in the related technology.
In a first aspect, an embodiment of the present application provides an operation and maintenance task workflow arrangement system in a distributed graph database, where the system includes a workflow arrangement layer, a task management layer, and a conditional task execution layer;
The workflow arrangement layer is used for receiving an operation and maintenance task workflow specification input by a user, and recursively analyzing the workflow specification to generate an executable DAG operation and maintenance task graph;
The task management layer is used for ensuring the atomic consistency of each operation and maintenance task in the DAG operation and maintenance task graph in volatile memory and persistent storage by adopting a two-stage submission protocol, and performing fault recovery on the unfinished operation and maintenance task from the persistent storage based on the atomic consistency under the condition that the memory data is lost due to faults;
The conditional task execution layer is used for creating a security isolation sandbox for executing each operation and maintenance task in the DAG operation and maintenance task graph, and in the process of executing the current operation and maintenance task, the task execution result is evaluated based on the dynamically updated conditional function so as to determine the execution path of the subsequent operation and maintenance task.
In some embodiments, the workflow orchestration layer is configured to receive an operation and maintenance task workflow specification input by a user, abstract serial branches, parallel branches and conditional branches in the workflow specification into a directed acyclic graph in a unified manner, and analyze a dependency relationship and an execution sequence of the operation and maintenance task through a Kahn algorithm to topologically sort the directed acyclic graph to generate an executable DAG operation and maintenance task graph.
In some of these embodiments, the workflow orchestration layer is configured to introduce multi-version concurrency control through an MVCC version control mechanism to support workflow specification updates at the time of execution of an operation and maintenance task:
In the executing process of the operation and maintenance task, if the workflow specification is updated, copying and recursively analyzing to generate a new version of DAG operation and maintenance task graph, and then carrying out version atomic update through the atomic SwapPointer, so that the executing operation and maintenance task continuously uses the old version of DAG operation and maintenance task graph, and the subsequent unexecuted operation and maintenance task uses the new version of DAG operation and maintenance task graph.
In some embodiments, the task management layer is configured to, when a task state update occurs for an operation task in the DAG operation task graph, write a preparation update log into a distributed storage during a preparation phase of a two-phase commit protocol, and then update a task state of the operation task in a volatile memory and the persistent storage during a commit phase of the two-phase commit protocol, and delete the preparation update log after the update is successful;
And under the condition that the failure causes the memory data to be lost, performing failure recovery on the unfinished operation and maintenance task from the persistent storage based on the characteristics of the atomic update.
In some embodiments, the task management layer is configured to support high concurrency execution of each operation and maintenance task in the DAG operation and maintenance task graph through a task concurrency execution mechanism based on Go coroutines.
In some embodiments, the conditional task execution layer is configured to create Wasm a security isolation sandbox through a dynamically loaded Wasm plug-in, or create Cgroups a security isolation sandbox through a dynamically loaded Go plug-in, for execution of each operation and maintenance task in the DAG operation and maintenance task graph.
In some embodiments, the conditional task execution layer is configured to evaluate a task execution result based on a dynamically updated conditional function during an execution process of a current operation and maintenance task to determine an execution path of a subsequent operation and maintenance task, where the conditional function supports complex logic combination and high-order function programming, the update mode of the conditional function is a thermal update, and the writing language of the conditional function includes Rust, c++, and TinyGo.
In some of these embodiments, the system includes a user interface layer;
The user interface layer is used for providing a plurality of interaction modes for a user to input operation and maintenance task workflow specifications, wherein the interaction modes comprise a Web console, a REST API and a command line tool.
In some of these embodiments, the system includes a storage layer and a resource management layer, the storage layer containing persistent distributed storage units;
the storage layer is used for storing relevant data of the operation and maintenance task in a lasting mode and supporting a transaction log and a check point mechanism;
and the resource management layer is used for intelligently scheduling hardware resources for executing the operation and maintenance tasks so as to carry out load balancing.
In some of these embodiments, the system includes a graph database cluster that is a cluster of NebulaGraph distributed graph databases.
Compared with the related art, the operation and maintenance task workflow layout system in the distributed graph database comprises a workflow layout layer, a task management layer, a conditional task execution layer and a conditional task execution layer, wherein the workflow layout layer is used for recursively analyzing workflow specifications input by a user to generate an executable DAG operation and maintenance task graph, the task management layer is used for adopting a two-stage submitting protocol to ensure the atomic consistency of each operation and maintenance task in the DAG operation and maintenance task graph in volatile memory and persistent storage, under the condition that internal data are lost, fault recovery is carried out on each incomplete operation and maintenance task in the persistent storage based on the atomic consistency, the conditional task execution layer is used for creating a safe isolation sandbox for executing each operation and maintenance task in the DAG operation and maintenance task graph, in the execution process of the current operation and maintenance task, the task execution result is evaluated based on a dynamically updated conditional function to determine the execution path of the subsequent operation and maintenance task, the automatic state recovery of the operation and maintenance task after the fault restart is realized, global rollback is avoided, the sandbox execution environment effectively prevents the damage of malicious codes, the conditional function is used for dynamically selecting the execution path based on the running state of the conditional function to improve the operation and maintenance task graph, and the performance of the task database is improved.
Drawings
The accompanying drawings, which are included to provide a further understanding of the application and are incorporated in and constitute a part of this specification, illustrate embodiments of the application and together with the description serve to explain the application and do not constitute a limitation on the application. In the drawings:
FIG. 1 is a block diagram of a configuration of an operation and maintenance task workflow orchestration system according to embodiments of the present application;
FIG. 2 is a workflow diagram of a workflow orchestration layer according to an embodiment of the application;
FIG. 3 is a workflow diagram of a task management layer according to an embodiment of the application;
FIG. 4 is a workflow diagram of a conditional task execution layer according to an embodiment of the application;
FIG. 5 is a block diagram of an operation and maintenance task workflow orchestration system according to specific embodiments of the present application;
fig. 6 is a schematic diagram of an internal structure of an electronic device according to an embodiment of the application.
Detailed Description
The present application will be described and illustrated with reference to the accompanying drawings and examples in order to make the objects, technical solutions and advantages of the present application more apparent. It should be understood that the specific embodiments described herein are for purposes of illustration only and are not intended to limit the scope of the application. All other embodiments, which can be made by a person of ordinary skill in the art based on the embodiments provided by the present application without making any inventive effort, are intended to fall within the scope of the present application.
It is apparent that the drawings in the following description are only some examples or embodiments of the present application, and it is possible for those of ordinary skill in the art to apply the present application to other similar situations according to these drawings without inventive effort. Moreover, it should be appreciated that while such a development effort might be complex and lengthy, it would nevertheless be a routine undertaking of design, fabrication, or manufacture for those of ordinary skill having the benefit of this disclosure, and thus should not be construed as having the benefit of this disclosure.
Reference in the specification to "an embodiment" means that a particular feature, structure, or characteristic described in connection with the embodiment may be included in at least one embodiment of the application. The appearances of such phrases in various places in the specification are not necessarily all referring to the same embodiment, nor are separate or alternative embodiments mutually exclusive of other embodiments. It is to be expressly and implicitly understood by those of ordinary skill in the art that the described embodiments of the application can be combined with other embodiments without conflict.
Unless defined otherwise, technical or scientific terms used herein should be given the ordinary meaning as understood by one of ordinary skill in the art to which this application belongs. The terms "a," "an," "the," and similar referents in the context of the application are not to be construed as limiting the quantity, but rather as singular or plural. The terms "comprises," "comprising," "includes," "including," or any other variation thereof, are intended to cover a non-exclusive inclusion, such that a process, method, system, article, or apparatus that comprises a list of steps or modules (elements) is not limited to only those steps or elements but may include other steps or elements not expressly listed or inherent to such process, method, article, or apparatus. The terms "connected," "coupled," and the like in connection with the present application are not limited to physical or mechanical connections, but may include electrical connections, whether direct or indirect. The term "plurality" as used herein means two or more. "and/or" describes the association relationship of the association object, and indicates that three relationships may exist, for example, "a and/or B" may indicate that a exists alone, a and B exist simultaneously, and B exists alone. The character "/" generally indicates that the context-dependent object is an "or" relationship. The terms "first," "second," "third," and the like, as used herein, are merely distinguishing between similar objects and not representing a particular ordering of objects.
The embodiment of the application provides an operation and maintenance task workflow arranging system in a distributed graph database, and fig. 1 is a structural block diagram of the operation and maintenance task workflow arranging system according to the embodiment of the application, and as shown in fig. 1, the system comprises a workflow arranging layer, a task management layer (TASKMANAGER) and a conditional task execution layer (ConditionalTask);
The workflow arrangement layer is used for receiving the operation and maintenance task workflow specification input by a user and recursively analyzing the workflow specification to generate an executable DAG operation and maintenance task graph;
The workflow orchestration layer is used for receiving an operation and maintenance task workflow specification (WorkflowSpec) input by a user, unifying and abstracting serial branches, parallel branches and conditional branches in the workflow specification into a directed acyclic graph, and analyzing the dependency relationship and the execution sequence of the operation and maintenance task by using a Kahn algorithm to topologically sort the directed acyclic graph so as to generate an executable DAG operation and maintenance task graph.
And the multi-version concurrency control is introduced through an MVCC version control mechanism to support the updating of the workflow specification when the operation and maintenance task is executed, if the workflow specification is updated in the operation and maintenance task execution process, the new version of DAG operation and maintenance task graph is generated by copying and recursively analyzing, and then the atomic updating of the version is carried out through the atomic SwapPointer, so that the operation and maintenance task which is being executed continuously uses the old version of DAG operation and maintenance task graph, and the follow-up operation and maintenance task which is not being executed uses the new version of DAG operation and maintenance task graph.
In addition, most of existing workflow engines rely on statically defined task graphs when processing dependencies. While some systems support runtime dynamic adjustment of workflows, such as by returning a nested WorkflowDAG or adding new dependencies to the running task using the "rewrite" mechanism, these operations tend to be heavy-weight and lack a set of lightweight dynamic dependency parsing and execution algorithms that are tightly integrated with resource scheduling, condition judgment. The workflow arrangement layer in the embodiment of the application realizes efficient topological ordering and scheduling combined with heterogeneous resources aiming at mixed serial, parallel, conditional branch and other scenes. FIG. 2 is a workflow diagram of a workflow orchestrator layer according to an embodiment of the application, as shown in FIG. 2, where the workflow orchestrator in the workflow orchestrator layer is the brain of the system, responsible for parsing complex workflow definitions into executable task graphs and performing intelligent scheduling. The method has the technical innovation points that serial, parallel and conditional branches are abstracted into directed acyclic graphs based on unified modeling of DAG, topological sorting of Kahn algorithm, efficient analysis of task dependency and execution sequence, MVCC version control, support of dynamic update of workflow definition in operation, adoption of copy-on-write mechanism, HEFT heuristic scheduling, combination of heterogeneous resource scheduling algorithm of data locality awareness, incremental dependency analysis, support of dynamic addition or modification of task dependency in operation.
It should be further noted that, as shown in fig. 2, the workflow analysis is a recursive analysis WorkflowSpec, and a complete DAG graph is constructed. The Kahn algorithm topologically sorts, namely calculating the degree of incidence of all nodes, adding the node with the degree of incidence of 0 into a ready queue, circularly processing the ready queue, executing executable tasks in parallel, and dynamically updating the degree of incidence of the subsequent nodes. MVCC version control mechanism to support runtime workflow update, multi-version concurrency control is introduced, copy-on-Write (Copy-on-Write), creating new versions when modifying the workflow without affecting the task being executed, atomic version switching, implementing atomic update of versions by atomic SwapPointer, snapshot reading, the task being executed continues to use the old version DAG, new tasks use the new versions.
Preferably, the core architecture code of the workflow orchestration layer is as follows:
type WorkflowEngine struct {
DAGMANAGER DAGMANAGER// DAG manager
TopologyManager TopologyManager// topology ordering manager
VersionManager MVCCVersionManager// version control manager
Schedulers HEFTScheduler// HEFT scheduler
TASKMANAGER TASKMANAGER// task manager
logger logx.Logger
}
type WorkflowSpec struct {
ID string// workflow unique identification
Version int 64// Version number
Type WorkflowType // "serial" | "parallel" | "conditional" | "dag"
Rollback pool// Rollback support
Task [ ] TaskSpec// subtask list
DEPENDENCIES MAP [ string ] [ string// task dependency graph
METADATA MAP [ string ] interface { }// metadata
}
type DAGNode struct {
TASKID STRING// task ID
TaskSpec. TaskSpec// task Specification
DEPENDENCIES [ ] string// front-end dependency
Successors [ ] string// follow-up task
Status DAGNodeStatus// node states
Priority int// Priority (upward rank)
EstCost time duration/estimated execution time
}
type DAGManager struct {
Nodes map [ string ]. DAGNode// all nodes
AdjList map [ string ] [ string// adjacency list
INDEGREE MAP [ string ] int// inbound statistics
Version int 64// current version
IsImmutable bool// whether or not it is an immutable graph
}
Preferably, the implementation code of the topological sorting based on the Kahn algorithm is as follows:
type TopologyManager struct {
dagManager *DAGManager
readyQueue chan *DAGNode
logger logx.Logger
}
Implementation of the/(Kahn algorithm
func (tm *TopologyManager) ExecuteTopologicalSort(dag *DAG) error {
Calculating the degree of ingress of all nodes
inDegree := make(map[string]int)
for nodeID, node := range dag.nodes {
inDegree[nodeID] = len(node.Dependencies)
}
Input degree 0 node is added into ready queue
readyNodes := make([]*DAGNode, 0)
for nodeID, degree := range inDegree {
if degree == 0 {
readyNodes = append(readyNodes, dag.nodes[nodeID])
}
}
Starting the topology ordering process
processedCount := 0
totalNodes := len(dag.nodes)
for len(readyNodes) > 0 {
All nodes currently ready for parallel processing
currentBatch := readyNodes
readyNodes = make([]*DAGNode, 0)
Computing priorities for tasks of a current batch and arranging the priorities
tm.sortByPriority(currentBatch)
Execution of tasks of a current batch in parallel
err := tm.executeBatchTasks(currentBatch)
if err != nil {
return fmt.Errorf("batch execution failed: %w", err)
}
processedCount += len(currentBatch)
Input degree of subsequent node of/update
for _, completedNode := range currentBatch {
for _, successorID := range completedNode.Successors {
inDegree[successorID]--
if inDegree[successorID] == 0 {
readyNodes = append(readyNodes, dag.nodes[successorID])
}
}
}
}
Checking if all nodes are processed (detecting loop dependencies)
if processedCount != totalNodes {
return fmt.Errorf("cycle detected in DAG: processed %d of %d nodes",
processedCount, totalNodes)
}
tm.logger.Infof("Successfully executed DAG with %d nodes", totalNodes)
return nil
}
Prioritized tasks
func (tm *TopologyManager) sortByPriority(nodes []*DAGNode) {
sort.Slice(nodes, func(i, j int) bool {
Return nodes [ i ] Priority > nodes [ j ] Priority// high Priority
})
}
Execution of batch tasks in parallel
func (tm *TopologyManager) executeBatchTasks(nodes []*DAGNode) error {
var wg sync.WaitGroup
errChan := make(chan error, len(nodes))
for _, node := range nodes {
wg.Add(1)
go func(n *DAGNode) {
defer wg.Done()
tm.logger.Infof("Executing task: %s (priority: %d)", n.TaskID, n.Priority)
if err := tm.executeTask(n); err != nil {
errChan <- fmt.Errorf("task %s failed: %w", n.TaskID, err)
return
}
tm.logger.Infof("Task completed: %s", n.TaskID)
}(node)
}
wg.Wait()
close(errChan)
Check for errors
for err := range errChan {
if err != nil {
return err
}
}
return nil
}
Preferably, the implementation code of MVCC version control is as follows:
type MVCCVersionManager struct {
currentVersion atomic value// DAG pointing to the current version
VersionHistory map [ int64 ]. Times.DAG// version history
mutex sync.RWMutex
logger logx.Logger
}
Copy-on-write update workflow
func (vm *MVCCVersionManager) UpdateWorkflowCopyOnWrite(modifications *WorkflowModification) error {
vm.mutex.Lock()
defer vm.mutex.Unlock()
Obtaining the current version
currentDAG := vm.currentVersion.Load().(*DAG)
Creation of depth copies
newDAG := vm.deepCopyDAG(currentDAG)
newVersion := time.Now().UnixNano()
newDAG.version = newVersion
Application modification
if err := vm.applyModifications(newDAG, modifications); err != nil {
return fmt.Errorf("failed to apply modifications: %w", err)
}
Verification of the legitimacy of the New version
if err := vm.validateDAG(newDAG); err != nil {
return fmt.Errorf("invalid DAG after modifications: %w", err)
}
Per 5 atom update version pointer
vm.currentVersion.Store(newDAG)
vm.versionHistory[newVersion] = newDAG
Cleaning old version (keep last 3 versions)
vm.cleanupOldVersions()
vm.logger.Infof("Workflow updated to version %d with copy-on-write", newVersion)
return nil
}
Per/depth copy DAG
func (vm *MVCCVersionManager) deepCopyDAG(originalDAG *DAG) *DAG {
newDAG := &DAG{
nodes: make(map[string]*DAGNode),
adjList: make(map[string][]string),
inDegree: make(map[string]int),
}
All nodes are/copied
for nodeID, originalNode := range originalDAG.nodes {
newNode := &DAGNode{
TaskID: originalNode.TaskID,
TaskSpec Orignalinode. TaskSpec,// task Specification can be shared
Dependencies: make([]string, len(originalNode.Dependencies)),
Successors: make([]string, len(originalNode.Successors)),
Status: originalNode.Status,
Priority: originalNode.Priority,
EstCost: originalNode.EstCost,
}
copy(newNode.Dependencies, originalNode.Dependencies)
copy(newNode.Successors, originalNode.Successors)
newDAG.nodes[nodeID] = newNode
}
Record/copy adjacency table and entry information
for nodeID, neighbors := range originalDAG.adjList {
newDAG.adjList[nodeID] = make([]string, len(neighbors))
copy(newDAG.adjList[nodeID], neighbors)
}
for nodeID, degree := range originalDAG.inDegree {
newDAG.inDegree[nodeID] = degree
}
return newDAG
}
The task management layer is used for ensuring the atomic consistency of each operation and maintenance task in the DAG operation and maintenance task graph in the volatile memory and the persistent storage by adopting a two-stage submission protocol, and performing fault recovery on the unfinished operation and maintenance task from the persistent storage based on the atomic consistency under the condition that the memory data is lost due to the occurrence of faults;
The task management layer is used for writing a preparation update log into the distributed storage in the preparation stage process of the two-stage submission protocol under the condition that the task state of the operation and maintenance task in the DAG operation and maintenance task graph is updated, and then updating the task state of the operation and maintenance task in the volatile memory and the persistent storage in the submission stage process of the two-stage submission protocol, and deleting the preparation update log after the update is successful;
And under the condition that the failure causes the memory data to be lost, performing failure recovery on the incomplete operation and maintenance task from the persistent storage based on the characteristics of the atomic update.
And the method is also used for supporting high concurrency execution of each operation and maintenance task in the DAG operation and maintenance task graph through a task concurrency execution mechanism based on the Go coroutine.
It should be noted that, in conventional graph computing systems, such as early Pregel, although employing a checkpointing mechanism, they are generally globally blocking, and recovery may require all nodes to roll back to the last checkpoint, resulting in a significant waste of computing resources. Subsequent optimizations, such as incremental checkpoints or lightweight checkpoints (LWCP) that decouple vertex updates, are often directed to specific computational models. The task management layer in the embodiment of the application realizes a general atomic state recovery scheme which is combined with complex workflow arrangement and supports mixed tasks. FIG. 3 is a workflow diagram of a task management layer according to an embodiment of the application, as shown in FIG. 3, where an asynchronous task manager (TASKMANAGER) is a basic stone for system stable operation, responsible for lifecycle management, state persistence, and failure recovery of tasks. The method has the technical innovation points that a task concurrency execution mechanism based on the Go coroutine supports high concurrency task processing, a two-stage commit (2 PC) protocol is utilized to ensure the atomic consistency of a memory and a persistence state, a consistency check point (Coordinated Checkpointing) mechanism supports distributed fault recovery, a HEFT heuristic scheduling algorithm is combined with data local sensing to realize intelligent resource allocation, and a limited recovery (Confined Recovery) mechanism avoids global rollback and improves recovery efficiency.
Preferably, the core architecture code of the task management layer is as follows:
type WorkflowEngine struct {
DAGMANAGER DAGMANAGER// DAG manager
TopologyManager TopologyManager// topology ordering manager
VersionManager MVCCVersionManager// version control manager
Schedulers HEFTScheduler// HEFT scheduler
TASKMANAGER TASKMANAGER// task manager
logger logx.Logger
}
type WorkflowSpec struct {
ID string// workflow unique identification
Version int 64// Version number
Type WorkflowType // "serial" | "parallel" | "conditional" | "dag"
Rollback pool// Rollback support
Task [ ] TaskSpec// subtask list
DEPENDENCIES MAP [ string ] [ string// task dependency graph
METADATA MAP [ string ] interface { }// metadata
}
type DAGNode struct {
TASKID STRING// task ID
TaskSpec. TaskSpec// task Specification
DEPENDENCIES [ ] string// front-end dependency
Successors [ ] string// follow-up task
Status DAGNodeStatus// node states
Priority int// Priority (upward rank)
EstCost time duration/estimated execution time
}
type DAGManager struct {
Nodes map [ string ]. DAGNode// all nodes
AdjList map [ string ] [ string// adjacency list
INDEGREE MAP [ string ] int// inbound statistics
Version int 64// current version
IsImmutable bool// whether or not it is an immutable graph
}
Preferably, the implementation code of the two-phase commit protocol is as follows:
Two-phase commit implementation of a// state update
func (tm *TaskManager) UpdateTaskStateAtomic(taskID string, newStatus TaskStatus, result *TaskResult) error {
Preparation phase-writing transaction Log
commitLog := &CommitLog{
TaskID: taskID,
FinalStatus: newStatus,
Result: result,
Timestamp: time.Now(),
LogID: utils.GenerateUUID(),
}
if err := tm.taskRepo.WriteCommitLog(commitLog); err != nil {
return fmt.Errorf("prepare phase failed: %w", err)
}
I/O stage 2 commit stage-atomic update memory and persistent storage
tm.mutex.Lock()
defer tm.mutex.Unlock()
Memory state/update
if taskState, exists := tm.runningTasks.Load(taskID); exists {
state := taskState.(*TaskState)
state.Status = newStatus
state.UpdateTime = time.Now()
state.Result = result
tm.runningTasks.Store(taskID, state)
}
The final state of the// persistence
if err := tm.taskRepo.CommitFinalState(commitLog); err != nil {
Failure of// commit, handling dependent recovery mechanisms
tm.logger.Errorf("Commit phase failed, will be recovered: %v", err)
return err
}
Transaction log of// cleaning
tm.taskRepo.DeleteCommitLog(commitLog.LogID)
return nil
}
Recovery logic at boot-up of a system
func (tm *TaskManager) RecoverFromCommitLogs() error {
unfinishedLogs, err := tm.taskRepo.GetUnfinishedCommitLogs()
if err != nil {
return fmt.Errorf("failed to get unfinished commit logs: %w", err)
}
for _, log := range unfinishedLogs {
tm.logger.Infof("Recovering task %s from commit log", log.TaskID)
The// re-execution commit phase
if err := tm.taskRepo.CommitFinalState(log); err != nil {
tm.logger.Errorf("Failed to recover task %s: %v", log.TaskID, err)
continue
}
Logging of/clean recovery
tm.taskRepo.DeleteCommitLog(log.LogID)
tm.logger.Infof("Successfully recovered task %s", log.TaskID)
}
return nil
}
Preferably, the implementation code of the coordinated checkpoint mechanism is as follows:
type CheckpointManager struct {
nodeManager *NodeManager
logger logx.Logger
checkpointDir string
mutex sync.RWMutex
}
initiating global consistency checkpoints
func (cm *CheckpointManager) InitiateGlobalCheckpoint(workflowID string) error {
cm.logger.Infof("Initiating global checkpoint for workflow %s", workflowID)
Transmitting a ready checkpoint signal to all participating nodes
nodes := cm.nodeManager.GetActiveNodes()
prepareChan := make(chan error, len(nodes))
for _, node := range nodes {
go func(n *NodeInfo) {
prepareChan <- cm.sendPrepareCheckpoint(n, workflowID)
}(node)
}
Waiting for all nodes to prepare to complete
for i := 0; i < len(nodes); i++ {
if err := <-prepareChan; err != nil {
return fmt.Errorf("node prepare failed: %w", err)
}
}
// 3. All nodes are ready to complete, send commit checkpoint signals
checkpointID := fmt.Sprintf("cp_%s_%d", workflowID, time.Now().Unix())
commitChan := make(chan error, len(nodes))
for _, node := range nodes {
go func(n *NodeInfo) {
commitChan <- cm.sendCommitCheckpoint(n, checkpointID)
}(node)
}
Wait for all nodes to commit to complete
for i := 0; i < len(nodes); i++ {
if err := <-commitChan; err != nil {
return fmt.Errorf("node commit failed: %w", err)
}
}
Record global checkpoints
return cm.recordGlobalCheckpoint(checkpointID, workflowID)
}
Limited recovery implementation
func (cm *CheckpointManager) PerformConfinedRecovery(failedNodeID string) error {
cm.logger.Infof("Performing confined recovery for node %s", failedNodeID)
Obtaining the latest global checkpoints
latestCheckpoint, err := cm.getLatestGlobalCheckpoint()
if err != nil {
return fmt.Errorf("failed to get latest checkpoint: %w", err)
}
I/2. Recovery of the state of the failed node only
if err := cm.restoreNodeFromCheckpoint(failedNodeID, latestCheckpoint); err != nil {
return fmt.Errorf("failed to restore node: %w", err)
}
I/O3 rescheduling running tasks on failed nodes
runningTasks, err := cm.getRunningTasksOnNode(failedNodeID, latestCheckpoint)
if err != nil {
return fmt.Errorf("failed to get running tasks: %w", err)
}
for _, task := range runningTasks {
if err := cm.rescheduleTask(task); err != nil {
cm.logger.Errorf("Failed to reschedule task %s: %v", task.ID, err)
}
}
cm.logger.Infof("Confined recovery completed for node %s", failedNodeID)
return nil
}
Preferably, implementation codes of HEFT scheduling algorithms are as follows:
type HEFTScheduler struct {
nodeManager *NodeManager
graphDB GraphDBInterface
logger logx.Logger
localityWeight float 64// data locality weights
}
Task ordering stage of the// HEFT algorithm
func (hs *HEFTScheduler) CalculateUpwardRank(dag *DAG) map[string]int {
upwardRank := make(map[string]int)
visited := make(map[string]bool)
Higher ranking for// depth first traversal calculation
var dfs func(nodeID string) int
dfs = func(nodeID string) int {
if visited[nodeID] {
return upwardRank[nodeID]
}
visited[nodeID] = true
node := dag.GetNode(nodeID)
maxSuccessorRank := 0
for _, successorID := range node.Successors {
successorRank := dfs(successorID)
if successorRank > maxSuccessorRank {
maxSuccessorRank = successorRank
}
}
// Upward ranking = current task estimated time + maximum subsequent path weight
upwardRank[nodeID] = int(node.EstCost.Seconds()) + maxSuccessorRank
return upwardRank[nodeID]
}
Computing an upward ranking of all nodes
for nodeID := range dag.nodes {
dfs(nodeID)
}
return upwardRank
}
Processor selection phase of the/(HEFT) algorithm
func (hs *HEFTScheduler) SelectOptimalNode(task *TaskSpec, availableNodes []*NodeInfo) (*NodeInfo, error) {
var bestNode *NodeInfo
var minFinishTime time.Duration = time.Duration(math.MaxInt64)
for _, node := range availableNodes {
Estimated execution time of a task on the node
execTime := hs.estimateExecutionTime(task, node)
Calculating data transmission time
transferTime := hs.calculateDataTransferTime(task, node)
Data locality weighting for/(and/or computation)
localityBonus := hs.calculateLocalityBonus(task, node)
Hybrid cost = execution time + transmission time-local rewards
totalCost := execTime + transferTime - time.Duration(float64(localityBonus)*hs.localityWeight)
Calculating the earliest completion time
earliestStartTime := hs.getEarliestStartTime(node)
finishTime := earliestStartTime + totalCost
if finishTime < minFinishTime {
minFinishTime = finishTime
bestNode = node
}
}
if bestNode == nil {
return nil, fmt.Errorf("no suitable node found for task %s", task.ID)
}
hs.logger.Infof("Selected node %s for task %s (finish time: %v)",
bestNode.ID, task.ID, minFinishTime)
return bestNode, nil
}
Data local awareness
func (hs *HEFTScheduler) calculateLocalityBonus(task *TaskSpec, node *NodeInfo) time.Duration {
bonus := time.Duration(0)
Metadata of query graph database, obtaining data fragment information
shardInfo, err := hs.graphDB.GetShardInfo(task.DataRequirements)
if err != nil {
hs.logger.Warnf("Failed to get shard info: %v", err)
return bonus
}
Calculating local data proportion
localDataRatio := hs.calculateLocalDataRatio(shardInfo, node)
Localized rewards = local data proportion × maximum rewards time
MaxBonus =time duration (10. Time second)// max 10 seconds prize
bonus = time.Duration(float64(maxBonus) * localDataRatio)
hs.logger.Debugf("Node %s locality bonus for task %s: %v (ratio: %.2f)",
node.ID, task.ID, bonus, localDataRatio)
return bonus
}
And the conditional task execution layer is used for creating a security isolation sandbox for executing each operation and maintenance task in the DAG operation and maintenance task graph, and evaluating a task execution result based on a dynamically updated conditional function in the execution process of the current operation and maintenance task so as to determine the execution path of the subsequent operation and maintenance task.
Specifically, the conditional task execution layer is configured to create Wasm a security isolation sandbox through a dynamically loaded Wasm plug-in, or create Cgroups a security isolation sandbox through a dynamically loaded Go plug-in, so as to execute each operation and maintenance task in the DAG operation and maintenance task graph.
And in the execution process of the current operation and maintenance task, evaluating the task execution result based on a dynamically updated conditional function to determine the execution path of the subsequent operation and maintenance task, wherein the conditional function supports complex logic combination and high-order function programming, the updating mode of the conditional function is thermal updating, and the writing language of the conditional function comprises Rust, C++ and TinyGo.
It should be noted that the number of the substrates,
Conventional graph computing systems introduce conditional branching into the workflow, many of which are implemented by simple if-else logic or dedicated branching operators (e.g., branchPythonOperator of Airflow). However, this presents two deep problems, namely that if the condition judgment logic is provided by a user, the code may contain malicious operations or defects, and serious security risks are brought about by directly executing on the master node or the working node, and that the traditional way usually hard-codes the condition logic in the workflow definition, and lacks the ability to dynamically load, update and inject new condition logic at runtime. The conditional task execution layer in the embodiment of the application realizes dynamic update and flexible expansion of conditional logic through the plug-in architecture, sandboxed execution effectively reduces the potential safety hazard of user-defined codes, supports WebAssembly multi-language programming, and has high flexibility. FIG. 4 is a schematic workflow diagram of a conditional task execution layer according to an embodiment of the application, as shown in FIG. 4, a conditional task executor (ConditionalTask) in the conditional task execution layer is a core component of a system intelligent decision, supporting secure dynamic condition judgment and branch selection, and is technically innovative in that a plug-in architecture-supporting the runtime dynamic loading of WebAssembly (Wasm) and Go native plug-ins, sandboxed execution-a secure isolation execution environment based on Linux namespaces and Cgroups, a functional programming paradigm-supporting complex logic combinations (AND, OR, NOT) and high-order functions, a hot update mechanism-supporting runtime update of conditional logic without system restart, and multilingual support-conditional functions can be written in languages such as Rust, C++, tinyGo.
Further to this, for sandboxed execution mechanisms ConditionalTask, when executing user-defined conditional functions, they will be placed in a strictly restricted sandboxed environment, namespace isolation-creation of independent PID, network, user, mount namespaces, resource restriction-restriction of CPU, memory and I/O usage by Cgroups, system call filtering-restriction of executable system calls by seccomp-bpf, file system isolation-read-only mount of necessary system files, and access to sensitive directories is prohibited. While for dynamic plug-in loading-WebAssembly runtime-using Wasmtime to provide cross-platform, high-performance sandboxed execution-Go native plug-ins-using plug in package dynamic loading-so sharing library files.
Preferably, the core architecture code of the conditional task execution layer is as follows:
type ConditionalParams struct {
Host string// target Host
Condition ConditionFunc// Condition evaluation function
CHECKTASK tasks of TaskSpec// inspection
TrueTask execution when TaskSpec// condition is true
FALSETASK execution when TaskSpec// condition is false
Description string// task description
PluginPath string// plug-in path (Wasm or so)
SandboxConfig SandboxConfig// sandbox configuration
ResourceLimits. ResourceLimits// resource limitation
}
Type definition of/(Condition function)
type ConditionFunc func(*agent.ShellResult) bool
A/(sandbox configuration
type SandboxConfig struct {
EnableNetworkIsolation bool// network isolation
EnableFileSystemIsolation bool// file system isolation
AllowedSyscalls [ ] string// allowed System calls
TimeoutSeconds int// execution timeout
}
Interface for a sand box actuator
type SandboxExecutor interface {
Execute(ctx context.Context, pluginPath string, input []byte) ([]byte, error)
CreateSandbox(config SandboxConfig) (*Sandbox, error)
DestroySandbox(sandbox *Sandbox) error
}
Preferably, the implementation code of the sandboxed execution mechanism of the conditional task is as follows:
Sandbox implementation based on Cgroups and Namespace
type CgroupSandbox struct {
cgroupPath string
sandboxDir string
config SandboxConfig
logger logx.Logger
}
func (s *CgroupSandbox) Execute(ctx context.Context, pluginPath string, input []byte) ([]byte, error) {
sandboxID := utils.GenerateUUID()
Creation and configuration of Cgroup
cgroupName := fmt.Sprintf("sandbox-%s", sandboxID)
if err := s.createCgroup(cgroupName); err != nil {
return nil, fmt.Errorf("failed to create cgroup: %w", err)
}
defer s.cleanupCgroup(cgroupName)
Setting resource limits
if err := s.configureCgroupLimits(cgroupName); err != nil {
return nil, fmt.Errorf("failed to configure cgroup limits: %w", err)
}
Creation of sandboxed Process
cmd := exec.CommandContext(ctx, pluginPath)
Setting independent namespaces
cmd.SysProcAttr = &syscall.SysProcAttr{
Cloneflags syscall. CLONE_ NEWPID |// Process namespaces
Syscall. CLONE_ NEWNET |// network namespace
Syscall. CLONE_ NEWUSER |// user namespace
Syscall. CLONE_ NEWNS |// mount namespace
Syscall. CLONE_ NEWUTS |// UTS namespace
Syscall.CLONE_ NEWIPC,// IPC namespaces
User namespace mapping (run with non-root users)
UidMappings: []syscall.SysProcIDMap{
{ ContainerID:0, hostID:65534, size:1 }// mapped to nobody user
},
GidMappings: []syscall.SysProcIDMap{
{ ContainerID:0, hostID:65534, size:1 }// maps to nobody groups
},
}
Configuration seccomp System call Filter
if err := s.configureSeccomp(cmd); err != nil {
return nil, fmt.Errorf("failed to configure seccomp: %w", err)
}
Setting input/output
cmd.Stdin = bytes.NewReader(input)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
Starting the process and adding it to Cgroup
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start sandbox process: %w", err)
}
if err := s.addProcessToCgroup(cgroupName, cmd.Process.Pid); err != nil {
cmd.Process.Kill()
return nil, fmt.Errorf("failed to add process to cgroup: %w", err)
}
// 7 Wait for execution to complete
if err := cmd.Wait(); err != nil {
s.logger.Errorf("Sandbox execution failed: %v, stderr: %s", err, stderr.String())
return nil, fmt.Errorf("sandbox execution failed: %w", err)
}
return stdout.Bytes(), nil
}
Creation of Cgroup
func (s *CgroupSandbox) createCgroup(name string) error {
cgroupPath := filepath.Join("/sys/fs/cgroup", name)
Creation of Cgroup directory
if err := os.MkdirAll(cgroupPath, 0755); err != nil {
return fmt.Errorf("failed to create cgroup directory: %w", err)
}
s.cgroupPath = cgroupPath
return nil
}
Configuring Cgroup resource limits
func (s *CgroupSandbox) configureCgroupLimits(name string) error {
cgroupPath := s.cgroupPath
Setting memory limit (default 64 MB)
memoryLimit := s.config.MemoryLimitMB
if memoryLimit == 0 {
memoryLimit = 64
}
if err := s.writeCgroupFile(cgroupPath, "memory.limit_in_bytes",
fmt.Sprintf("%d", memoryLimit*1024*1024)); err != nil {
return err
}
Setting CPU limit (default 0.1 core)
cpuQuota := s.config.CPUQuotaPercent
if cpuQuota == 0 {
cpuQuota = 10 // 10%
}
if err := s.writeCgroupFile(cgroupPath, "cpu.cfs_quota_us",
fmt.Sprintf("%d", cpuQuota*1000)); err != nil {
return err
}
if err := s.writeCgroupFile(cgroupPath, "cpu.cfs_period_us", "100000"); err != nil {
return err
}
Setting IO limit
if err := s.writeCgroupFile(cgroupPath, "blkio.throttle.read_bps_device",
"8:0 10485760"); Err? MB/s read limitations
return err
}
return nil
}
Preferably, webAssembly plug-ins provide higher level sandboxed protection at run time, with the modern code as follows:
type WasmSandbox struct {
engine *wasmtime.Engine
store *wasmtime.Store
config WasmConfig
logger logx.Logger
}
func (ws *WasmSandbox) Execute(ctx context.Context, wasmPath string, input []byte) ([]byte, error) {
configuration Wasm Engine
config := wasmtime.NewConfig()
Config. SetCraneliftOptLevel (wasmtime. OptLevelNone)// disable optimizations to improve security
Config. SetConsumeFuel (true)/enable Fuel consumption mechanism
engine := wasmtime.NewEngineWithConfig(config)
store := wasmtime.NewStore(engine)
Setting resource limits
Store.AddFuel (ws.config.MaxInstructions)// maximum instruction number limit
Load Wasm module
wasmBytes, err := os.ReadFile(wasmPath)
if err != nil {
return nil, fmt.Errorf("failed to read wasm file: %w", err)
}
module, err := wasmtime.NewModule(engine, wasmBytes)
if err != nil {
return nil, fmt.Errorf("failed to compile wasm module: %w", err)
}
Creation of a restricted import function
imports := ws.createLimitedImports(store)
Instantiation module
instance, err := wasmtime.NewInstance(store, module, imports)
if err != nil {
return nil, fmt.Errorf("failed to instantiate wasm module: %w", err)
}
Obtaining derivative conditional function of (2)
conditionFunc := instance.GetFunc(store, "evaluate_condition")
if conditionFunc == nil {
return nil, fmt.Errorf("condition function not found in wasm module")
}
Setting timeout execution
resultChan := make(chan []byte, 1)
errorChan := make(chan error, 1)
go func() {
defer func() {
if r := recover(); r != nil {
errorChan <- fmt.Errorf("wasm execution panic: %v", r)
}
}()
Execution condition function
result, err := ws.executeConditionFunction(store, conditionFunc, input)
if err != nil {
errorChan <- err
return
}
resultChan <- result
}()
Wait for results or timeout
select {
case result := <-resultChan:
return result, nil
case err := <-errorChan:
return nil, err
case <-ctx.Done():
return nil, fmt.Errorf("wasm execution timeout")
}
}
Creation of a restricted import function
func (ws *WasmSandbox) createLimitedImports(store *wasmtime.Store) []*wasmtime.Extern {
var imports []*wasmtime.Extern
// Provide secure log function only
logFunc := wasmtime.NewFunc(store, wasmtime.NewFuncType(
[]*wasmtime.ValType{wasmtime.NewValType(wasmtime.KindI32)},
[]*wasmtime.ValType{},
), func(caller *wasmtime.Caller, args []wasmtime.Val) ([]wasmtime.Val, *wasmtime.Trap) {
Logging implementation of/(security), preventing malicious output
ws.logger.Infof("Wasm log: %d", args[0].I32())
return []wasmtime.Val{}, nil
})
imports = append(imports, logFunc.AsExtern())
Other secure import functions may be added as needed
return imports
}
Preferably, the implementation code of the dynamic plug-in hot update mechanism is as follows:
type PluginManager struct {
pluginStore map[string]*Plugin
versionStore map[string]int64
updateChan chan PluginUpdateEvent
mutex sync.RWMutex
logger logx.Logger
}
Heat update plug-in
func (pm *PluginManager) UpdatePlugin(pluginID string, newVersion []byte) error {
pm.mutex.Lock()
defer pm.mutex.Unlock()
Verification of New plug-ins
if err := pm.validatePlugin(newVersion); err != nil {
return fmt.Errorf("plugin validation failed: %w", err)
}
Creation of new versions
newVersionID := time.Now().Unix()
tempPath := fmt.Sprintf("/tmp/plugin_%s_v%d", pluginID, newVersionID)
if err := os.WriteFile(tempPath, newVersion, 0644); err != nil {
return fmt.Errorf("failed to write plugin file: %w", err)
}
Uper 3 atomic updates
oldPlugin := pm.pluginStore[pluginID]
newPlugin := &Plugin{
ID: pluginID,
Version: newVersionID,
Path: tempPath,
LoadTime: time.Now(),
}
pm.pluginStore[pluginID] = newPlugin
pm.versionStore[pluginID] = newVersionID
Cleaning up old versions
if oldPlugin != nil {
go func() {
Time.sleep (30 times. Second)// wait for completion of running task
os.Remove(oldPlugin.Path)
pm.logger.Infof("Cleaned up old plugin version: %s", oldPlugin.Path)
}()
}
pm.logger.Infof("Plugin %s updated to version %d", pluginID, newVersionID)
return nil
}
In some of these embodiments, FIG. 5 is a block diagram of an operation and maintenance task workflow orchestration system according to embodiments of the present application, including a user interface layer, a storage layer, a resource management layer, and a graph database cluster, as shown in FIG. 5;
And the user interface layer is used for providing a plurality of interaction modes for a user to input the operation and maintenance task workflow specification, wherein the interaction modes comprise a Web console, a REST API and a command line tool.
The storage layer comprises a persistent distributed storage unit, the storage layer is used for storing relevant data of the operation and maintenance task in a persistent mode and supporting a transaction log and a check point mechanism, and the resource management layer is used for intelligently scheduling hardware resources for execution of the operation and maintenance task to perform load balancing.
The Graph database cluster is a NebulaGraph distributed Graph database cluster and comprises a Meta node, a Graph node and a Storage node.
The present embodiment provides an electronic device comprising a memory having a computer program stored therein and a processor arranged to run the computer program to perform the steps of any of the system embodiments described above.
Optionally, the electronic apparatus may further include a transmission device and an input/output device, where the transmission device is connected to the processor, and the input/output device is connected to the processor.
Optionally, the electronic device may further comprise a processor, a memory, a network interface, a display screen and an input device connected by a system bus. Wherein the processor of the electronic device is configured to provide computing and control capabilities. The memory of the electronic device includes a nonvolatile storage medium and an internal memory. The non-volatile storage medium stores an operating system and a computer program. The internal memory provides an environment for the operation of the operating system and computer programs in the non-volatile storage media. The network interface of the electronic device is used for communicating with an external terminal through a network connection. The computer program when executed by a processor implements an operation and maintenance task workflow orchestration system. The display screen of the electronic device can be a liquid crystal display screen or an electronic ink display screen, the input device of the electronic device can be a touch layer covered on the display screen, can also be a key, a track ball or a touch pad arranged on the shell of the electronic device, and can also be an external keyboard, a touch pad or a mouse and the like.
It should be noted that, specific examples in this embodiment may refer to examples described in the foregoing embodiments and alternative implementations, and this embodiment is not repeated herein.
In addition, in combination with the operation and maintenance task workflow scheduling system in the above embodiment, the embodiment of the present application may be implemented by providing a storage medium. The storage medium has stored thereon a computer program which, when executed by a processor, implements any of the operation and maintenance task workflow orchestration systems of the above embodiments.
In one embodiment, fig. 6 is a schematic diagram of an internal structure of an electronic device according to an embodiment of the present application, and as shown in fig. 6, an electronic device, which may be a server, is provided, and an internal structure diagram thereof may be as shown in fig. 6. The electronic device includes a processor, a network interface, an internal memory, and a non-volatile memory connected by an internal bus, wherein the non-volatile memory stores an operating system, computer programs, and a database. The processor is used for providing computing and control capabilities, the network interface is used for communicating with an external terminal through a network connection, the internal memory is used for providing an environment for the operation of an operating system and a computer program, the computer program is executed by the processor to realize an operation and maintenance task workflow arranging system, and the database is used for storing data.
It will be appreciated by those skilled in the art that the structure shown in fig. 6 is merely a block diagram of a portion of the structure associated with the present inventive arrangements and is not limiting of the electronic device to which the present inventive arrangements are applied, and that a particular electronic device may include more or fewer components than shown, or may combine certain components, or have a different arrangement of components.
Those skilled in the art will appreciate that implementing all or part of the above described embodiment system may be accomplished by way of a computer program stored on a non-volatile computer readable storage medium, which when executed, may comprise the steps of the embodiments of the methods described above. Any reference to memory, storage, database, or other medium used in embodiments provided herein may include non-volatile and/or volatile memory. The nonvolatile memory can include Read Only Memory (ROM), programmable ROM (PROM), electrically Programmable ROM (EPROM), electrically Erasable Programmable ROM (EEPROM), or flash memory. Volatile memory can include Random Access Memory (RAM) or external cache memory. By way of illustration and not limitation, RAM is available in a variety of forms such as Static RAM (SRAM), dynamic RAM (DRAM), synchronous DRAM (SDRAM), double Data Rate SDRAM (DDRSDRAM), enhanced SDRAM (ESDRAM), synchronous link (SYNCHLINK) DRAM (SLDRAM), memory bus (Rambus) direct RAM (RDRAM), direct memory bus dynamic RAM (DRDRAM), and memory bus dynamic RAM (RDRAM), among others.
It should be understood by those skilled in the art that the technical features of the above-described embodiments may be combined in any manner, and for brevity, all of the possible combinations of the technical features of the above-described embodiments are not described, however, they should be considered as being within the scope of the description provided herein, as long as there is no contradiction between the combinations of the technical features.
The above examples illustrate only a few embodiments of the application, which are described in detail and are not to be construed as limiting the scope of the application. It should be noted that it will be apparent to those skilled in the art that several variations and modifications can be made without departing from the spirit of the application, which are all within the scope of the application. Accordingly, the scope of protection of the present application is to be determined by the appended claims.

Claims (10)

1. An operation and maintenance task workflow arranging system in a distributed graph database is characterized by comprising a workflow arranging layer, a task management layer and a conditional task execution layer;
The workflow arrangement layer is used for receiving an operation and maintenance task workflow specification input by a user, and recursively analyzing the workflow specification to generate an executable DAG operation and maintenance task graph;
The task management layer is used for ensuring the atomic consistency of each operation and maintenance task in the DAG operation and maintenance task graph in volatile memory and persistent storage by adopting a two-stage submission protocol, and performing fault recovery on the unfinished operation and maintenance task from the persistent storage based on the atomic consistency under the condition that the memory data is lost due to faults;
The conditional task execution layer is used for creating a security isolation sandbox for executing each operation and maintenance task in the DAG operation and maintenance task graph, and in the process of executing the current operation and maintenance task, the task execution result is evaluated based on the dynamically updated conditional function so as to determine the execution path of the subsequent operation and maintenance task.
2. The system of claim 1, wherein the workflow orchestration layer is configured to receive an operation and maintenance task workflow specification input by a user, uniformly abstract serial branches, parallel branches and conditional branches in the workflow specification into a directed acyclic graph, and analyze dependency relationships and execution sequences of operation and maintenance tasks through a Kahn algorithm to topologically sort the directed acyclic graph to generate an executable DAG operation and maintenance task graph.
3. The system of claim 2, wherein the workflow orchestration layer is configured to introduce multi-version concurrency control via an MVCC version control mechanism to support workflow specification updates at execution of an operation and maintenance task:
In the executing process of the operation and maintenance task, if the workflow specification is updated, copying and recursively analyzing to generate a new version of DAG operation and maintenance task graph, and then carrying out version atomic update through the atomic SwapPointer, so that the executing operation and maintenance task continuously uses the old version of DAG operation and maintenance task graph, and the subsequent unexecuted operation and maintenance task uses the new version of DAG operation and maintenance task graph.
4. The system of claim 1, wherein the task management layer is configured to, when a task state update occurs for an operation task in the DAG operation task graph, write a preparation update log into a distributed storage during a preparation phase of a two-phase commit protocol, and update a task state of the operation task in a volatile memory and the persistent storage during a commit phase of the two-phase commit protocol, and delete the preparation update log after the update is successful;
And under the condition that the failure causes the memory data to be lost, performing failure recovery on the unfinished operation and maintenance task from the persistent storage based on the characteristics of the atomic update.
5. The system according to claim 4, wherein the task management layer is configured to support high concurrency execution of each operation and maintenance task in the DAG operation and maintenance task graph through a Go coroutine-based task concurrency execution mechanism.
6. The system of claim 1, wherein the conditional task execution layer is configured to create Wasm a security isolation sandbox via a dynamically loaded Wasm plug-in or Cgroups a security isolation sandbox via a dynamically loaded Go plug-in for execution of the various operation and maintenance tasks in the DAG operation and maintenance task graph.
7. The system of claim 6, wherein the conditional task execution layer is configured to evaluate task execution results based on dynamically updated conditional functions to determine execution paths of subsequent operation and maintenance tasks during execution of the current operation and maintenance tasks, wherein the conditional functions support complex logic combinations and high-order function programming, wherein the conditional functions are updated in a hot update manner, and wherein the programming language of the conditional functions includes Rust, c++, and TinyGo.
8. The system of claim 1, wherein the system comprises a user interface layer;
The user interface layer is used for providing a plurality of interaction modes for a user to input operation and maintenance task workflow specifications, wherein the interaction modes comprise a Web console, a REST API and a command line tool.
9. The system of claim 1, wherein the system comprises a storage layer and a resource management layer, the storage layer comprising persistent distributed storage units;
the storage layer is used for storing relevant data of the operation and maintenance task in a lasting mode and supporting a transaction log and a check point mechanism;
and the resource management layer is used for intelligently scheduling hardware resources for executing the operation and maintenance tasks so as to carry out load balancing.
10. The system of claim 1, wherein the system comprises a graph database cluster that is a cluster of NebulaGraph distributed graph databases.
CN202511317559.1A 2025-09-16 2025-09-16 A workflow orchestration system for operation and maintenance tasks in a distributed graph database Active CN120822815B (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN202511317559.1A CN120822815B (en) 2025-09-16 2025-09-16 A workflow orchestration system for operation and maintenance tasks in a distributed graph database

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN202511317559.1A CN120822815B (en) 2025-09-16 2025-09-16 A workflow orchestration system for operation and maintenance tasks in a distributed graph database

Publications (2)

Publication Number Publication Date
CN120822815A true CN120822815A (en) 2025-10-21
CN120822815B CN120822815B (en) 2025-12-16

Family

ID=97371995

Family Applications (1)

Application Number Title Priority Date Filing Date
CN202511317559.1A Active CN120822815B (en) 2025-09-16 2025-09-16 A workflow orchestration system for operation and maintenance tasks in a distributed graph database

Country Status (1)

Country Link
CN (1) CN120822815B (en)

Citations (10)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN112214649A (en) * 2020-10-21 2021-01-12 北京航空航天大学 Distributed transaction solution system of temporal graph database
CN112379995A (en) * 2021-01-11 2021-02-19 北京江融信科技有限公司 DAG-based unitized distributed scheduling system and method
CN113254010A (en) * 2021-07-09 2021-08-13 广州光点信息科技有限公司 Visual DAG workflow task scheduling system and operation method thereof
WO2022056735A1 (en) * 2020-09-16 2022-03-24 深圳晶泰科技有限公司 Cloud high-performance scientific calculation workflow design control system and graphical user interface
CN116302381A (en) * 2022-09-08 2023-06-23 上海数禾信息科技有限公司 Parallel topology scheduling component and method, task scheduling method and task processing method
US20230409386A1 (en) * 2022-06-15 2023-12-21 International Business Machines Corporation Automatically orchestrating a computerized workflow
CN118740588A (en) * 2024-07-24 2024-10-01 中移(苏州)软件技术有限公司 An automated operation and maintenance system, method, device, medium and program product
CN119356886A (en) * 2024-12-25 2025-01-24 吉贝克信息技术(北京)有限公司 Large model workflow arrangement method, device, equipment and storage medium
US20250103990A1 (en) * 2023-09-27 2025-03-27 Disney Enterprises, Inc. Workflow orchestration using a universal state manager
CN120371483A (en) * 2025-06-26 2025-07-25 齐鲁工业大学(山东省科学院) Computing-storage flow joint scheduling optimization method and system based on DDQN and heuristic strategy

Patent Citations (10)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
WO2022056735A1 (en) * 2020-09-16 2022-03-24 深圳晶泰科技有限公司 Cloud high-performance scientific calculation workflow design control system and graphical user interface
CN112214649A (en) * 2020-10-21 2021-01-12 北京航空航天大学 Distributed transaction solution system of temporal graph database
CN112379995A (en) * 2021-01-11 2021-02-19 北京江融信科技有限公司 DAG-based unitized distributed scheduling system and method
CN113254010A (en) * 2021-07-09 2021-08-13 广州光点信息科技有限公司 Visual DAG workflow task scheduling system and operation method thereof
US20230409386A1 (en) * 2022-06-15 2023-12-21 International Business Machines Corporation Automatically orchestrating a computerized workflow
CN116302381A (en) * 2022-09-08 2023-06-23 上海数禾信息科技有限公司 Parallel topology scheduling component and method, task scheduling method and task processing method
US20250103990A1 (en) * 2023-09-27 2025-03-27 Disney Enterprises, Inc. Workflow orchestration using a universal state manager
CN118740588A (en) * 2024-07-24 2024-10-01 中移(苏州)软件技术有限公司 An automated operation and maintenance system, method, device, medium and program product
CN119356886A (en) * 2024-12-25 2025-01-24 吉贝克信息技术(北京)有限公司 Large model workflow arrangement method, device, equipment and storage medium
CN120371483A (en) * 2025-06-26 2025-07-25 齐鲁工业大学(山东省科学院) Computing-storage flow joint scheduling optimization method and system based on DDQN and heuristic strategy

Non-Patent Citations (2)

* Cited by examiner, † Cited by third party
Title
QINGYUN MENG ET AL: "A workflow orchestration method for NFV integration deployment", 2023 IEEE 3RD INTERNATIONAL CONFERENCE ON CCAI, 3 August 2023 (2023-08-03), pages 399 - 404 *
向志华: "DAG多级相关节点聚类的多目标工作流调度", 控制工程, vol. 27, no. 9, 30 September 2020 (2020-09-30), pages 1595 - 1602 *

Also Published As

Publication number Publication date
CN120822815B (en) 2025-12-16

Similar Documents

Publication Publication Date Title
KR101687213B1 (en) Dynamically loading graph-based computations
US8490082B2 (en) System and method for representing user processes as software packages in a software package management system
US8533705B2 (en) Registry emulation
CN111176804A (en) Automatic infrastructure update in a clustered environment including containers
US20160342488A1 (en) Mechanism for providing virtual machines for use by multiple users
US20140108365A1 (en) Performance Of RCU-Based Searches And Updates Of Cyclic Data Structures
US8380660B2 (en) Database system, database update method, database, and database update program
CA2477783A1 (en) Iterative software development environment with prioritized build rules
CN103853595A (en) Method and system for REPLACING VIRTUAL MACHINE DISKS
US7779402B2 (en) System and method for fine grain method update of an application to provide continuous availability
Pellegrini et al. Autonomic state management for optimistic simulation platforms
Burckhardt et al. Serverless workflows with durable functions and netherite
Memishi et al. Fault tolerance in MapReduce: A survey
Ma et al. Efficient scheduler live update for linux kernel with modularization
US20160179570A1 (en) Parallel Computing Without Requiring Antecedent Code Deployment
Asghar et al. Analysis and implementation of reactive fault tolerance techniques in Hadoop: a comparative study: H. Asghar, B. Nazir
CN120822815B (en) A workflow orchestration system for operation and maintenance tasks in a distributed graph database
Tešanovic et al. Embedded databases for embedded real-time systems: A component-based approach
Nagavaram et al. A cloud-based dynamic workflow for mass spectrometry data analysis
Weissman et al. Fault tolerant scheduling in distributed networks
Fouladi et al. A thunk to remember: make-j1000 (and other jobs) on functions-as-a-service infrastructure
Baird et al. Checkpointing kernel executions of MPI+ CUDA applications
Xing et al. Occam: A Programming System for Reliable Network Management
US7823141B1 (en) Using a concurrent partial inspector loop with speculative parallelism
Kukreti et al. CloneHadoop: Process Cloning to Reduce Hadoop's Long Tail

Legal Events

Date Code Title Description
PB01 Publication
PB01 Publication
SE01 Entry into force of request for substantive examination
SE01 Entry into force of request for substantive examination
GR01 Patent grant
GR01 Patent grant