CN120407124B - 一种数据链路优化处理方法及数据链路优化处理装置 - Google Patents
一种数据链路优化处理方法及数据链路优化处理装置Info
- Publication number
- CN120407124B CN120407124B CN202510874890.7A CN202510874890A CN120407124B CN 120407124 B CN120407124 B CN 120407124B CN 202510874890 A CN202510874890 A CN 202510874890A CN 120407124 B CN120407124 B CN 120407124B
- Authority
- CN
- China
- Prior art keywords
- node
- data
- acyclic graph
- directed acyclic
- data link
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
Classifications
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/48—Program initiating; Program switching, e.g. by interrupt
- G06F9/4806—Task transfer initiation or dispatching
- G06F9/4812—Task transfer initiation or dispatching by interrupt, e.g. masked
Landscapes
- Engineering & Computer Science (AREA)
- Software Systems (AREA)
- Theoretical Computer Science (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
本申请提供了一种数据链路优化处理方法及数据链路优化处理装置,根据数据链路配置生成有向无环图,基于有向无环图表示数据框架内的数据关系,其数据执行的逻辑和顺序完全可控,且有向无环图的实现支持数据的异步和并行执行。有向无环图生成后,将数据处理业务的待处理数据输入到有向无环图的头节点中,以使每个节点按照流式顺序对接收到的数据执行对应的数据处理。仅当到达节点的数据触发节点的运行条件时,中继单元才控制节点执行数据处理,这样可以实现每个节点在数据处理过程中保持有向无环图的流式顺序,即使在多数据并发的情况下,也能保证数据处理的顺序与规则符合有向无环图配置,最后通过有向无环图的尾节点获取最终的数据处理结果。
Description
技术领域
本申请涉及数据处理技术领域,尤其是涉及一种数据链路优化处理方法及数据链路优化处理装置。
背景技术
有向无环图(Directed Acyclic Graph,DAG)是一种广泛应用于数据处理和任务调度等领域的数据结构。其核心特点是通过节点和边的组合表示任务或数据之间的依赖关系,且图中不存在任何环路,从而确保任务或数据可以按照一定的顺序执行或处理。在分布式计算、数据处理和并行计算等场景中,DAG被广泛用于描述复杂的数据处理流程和任务依赖关系。
在现有技术中,DAG通常用于处理大规模数据流或高频率事件。例如,在流式数据处理系统中,数据被划分为多个事件或任务,并通过DAG中的节点依次处理。每个节点可能依赖于前一个节点的输出结果,以确保数据的正确性、稳定性和一致性。然而,当数据量特别大或处理频率特别高时,尤其是当节点的每个输入数据的获取频率相差较大时,常规配置的有向无环图可能会出现数据乱序问题,而如果数据顺序错误或对应不同输入数据的中间值相互混淆,下游节点可能会接收到错误或不完整的输入数据,从而导致计算结果错误或数据链路崩溃。
发明内容
有鉴于此,本申请的目的在于提供一种数据链路优化处理方法及数据链路优化处理装置,可以实现每个节点在数据处理过程中保持有向无环图的流式顺序,即使在多数据并发的情况下,也能保证数据处理的顺序与规则符合有向无环图配置,最后通过有向无环图的尾节点获取最终的数据处理结果。
第一方面,本申请实施例提供了一种数据链路优化处理方法,所述数据链路优化处理方法基于有向无环图实现,所述数据链路优化处理方法包括:
根据至少一份数据链路配置生成至少一个有向无环图;其中,每个有向无环图中包括至少两个节点,每个节点之间通过有向边连接,每个节点包括至少一个中继单元,所述中继单元用于存储和管理到达该节点的数据,当到达节点的数据触发节点的运行条件时,所述中继单元控制节点执行数据处理,若到达节点的数据不满足节点的运行条件,所述中继单元控制节点不执行数据处理 。
进一步的,每个节点的中继单元中包括至少一个输入数据队列和至少一个输出数据队列,节点的运行条件至少包括:节点对应的所有输入数据队列中均存在预设输入数据 。
进一步的,该节点的输入数据队列的数量根据与该节点相连的输入有向边的数量确定,该节点的输出数据队列的数量根据与该节点相连的输出有向边的数量确定,其中,每个输出数据队列的数据均相同。
进一步的,响应于数据链路配置中存在数据执行条件配置,则相应有向边配置数据处理表达式,所述数据链路优化处理方法还包括:
响应于有传输数据经过第一有向边,则基于所述数据处理表达式对传输数据进行处理,所述第一有向边为存在数据处理表达式的有向边。
进一步的,响应于数据链路配置中存在数据输出条件配置,则相应节点配置数据输出条件,所述数据链路优化处理方法还包括:
响应于第一节点输出数据处理结果,则基于所述数据处理结果和所述数据输出条件,从第一节点的至少一个下游节点中确定目标节点,并通过输出数据队列仅将处理后的数据处理结果传输至所述目标节点 ,所述第一节点为存在数据输出条件的节点。
进一步的,所述数据链路优化处理方法还包括:
通过输出数据队列仅将处理后的数据处理结果传输至所述目标节点,且将第二信号传输至其他下游节点,所述第二信号为空信号 。
进一步的,所述有向无环图中的节点中包括至少一个基础节点;所述数据链路优化处理方法还包括:
响应于所述基础节点出现异常,则向出现异常的基础节点的下游节点传递第一异常信号,下游节点继续按有向边方向传输第一异常信号;所述基础节点表示所述有向无环图实现主要数据处理功能的必要节点,第一异常信号为基础节点出现异常的异常信号。
进一步的,响应于所述数据链路配置中存在数据增强处理步骤,则所述有向无环图中还包括增强节点;所述数据链路优化处理方法还包括:
响应于所述增强节点出现异常,则向出现异常的增强节点的下游增强节点传递第二异常信号,下游增强节点继续按有向边方向传输第二异常信号;所述增强节点表示所述有向无环图实现数据增强处理步骤的节点,所述第二异常信号为增强节点出现异常的异常信号。
进一步的,所述数据链路优化处理方法还包括:
响应于节点接收到第三指示信号时,节点中断运行,并根据所述有向无环图的有向边顺序将所述第三指示信号向下游节点传递;
且,响应于管理单元检测到任一节点接收到第三指示信号,将所述第三指示信号输入头节点,以根据所述有向无环图的有向边将所述第三指示信号传输至所有节点,以中断所述有向无环图的运行;
其中,管理单元用于控制、管理所述有向无环图,第三指示信号用于指示至少一个节点中断运行,头节点表示有向无环图中接收初始输入的节点。
第二方面,本申请实施例还提供了一种数据链路优化处理装置,所述数据链路优化处理装置基于有向无环图实现,所述数据链路优化处理装置包括:
有向无环图生成模块,用于根据至少一份数据链路配置生成至少一个有向无环图;其中,每个有向无环图中包括至少两个节点,每个节点之间通过有向边连接,每个节点包括至少一个中继单元,所述中继单元用于存储和管理到达该节点的数据,当到达节点的数据触发节点的运行条件时,所述中继单元控制节点执行数据处理,若到达节点的数据不满足节点的运行条件,所述中继单元控制节点不执行数据处理。
本申请实施例提供的一种数据链路优化处理方法及数据链路优化处理装置,根据数据链路配置生成有向无环图,基于有向无环图表示数据框架内的数据关系,其数据执行的逻辑和顺序完全可控,且有向无环图的实现支持数据的异步和并行执行。有向无环图生成后,将数据处理业务的待处理数据输入到有向无环图的头节点中,中继单元能够保证每个节点、每条有向边按照流式顺序对接收到的数据稳定执行对应的数据处理,无论待处理数据的形式为流式形式还是非流式形式,仅当到达节点的数据触发节点的运行条件时,中继单元才控制节点执行数据处理,这样可以实现每个节点在数据处理过程中保持有向无环图的流式顺序,即使在多数据并发的情况下,也能保证数据处理的顺序与规则符合有向无环图配置,最后通过有向无环图的尾节点获取最终的数据处理结果。
为使本申请的上述目的、特征和优点能更明显易懂,下文特举较佳实施例,并配合所附附图,作详细说明如下。
附图说明
为了更清楚地说明本申请实施例的技术方案,下面将对实施例中所需要使用的附图作简单地介绍,应当理解,以下附图仅示出了本申请的某些实施例,因此不应被看作是对范围的限定,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他相关的附图。
图1为本申请实施例所提供的两种常见的有向无环图的示意图;
图2为本申请实施例所提供的一种带有中继单元的有向无环图的示意图;
图3为本申请实施例所提供的一种电子设备的结构示意图。
具体实施方式
为使本申请实施例的目的、技术方案和优点更加清楚,下面将结合本申请实施例中附图,对本申请实施例中的技术方案进行清楚、完整地描述,显然,所描述的实施例仅仅是本申请一部分实施例,而不是全部的实施例。通常在此处附图中描述和示出的本申请实施例的组件可以以各种不同的配置来布置和设计。因此,以下对在附图中提供的本申请的实施例的详细描述并非旨在限制要求保护的本申请的范围,而是仅仅表示本申请的选定实施例。基于本申请的实施例,本领域技术人员在没有做出创造性劳动的前提下所获得的每个其他实施例,都属于本申请保护的范围。
首先,对本申请可适用的应用场景进行介绍。本申请可应用于数据处理技术领域。
有向无环图(Directed Acyclic Graph,DAG)是一种广泛应用于数据处理和任务调度等领域的数据结构。其核心特点是通过节点和边的组合表示任务或数据之间的依赖关系,且图中不存在任何环路,从而确保任务或数据可以按照一定的顺序执行或处理。在分布式计算、数据处理和并行计算等场景中,DAG被广泛用于描述复杂的数据处理流程和任务依赖关系。
请参阅图1,图1为本申请实施例所提供的两种常见的有向无环图的示意图。如图1所示,有向无环图中包括若干个节点,每个节点之间由有向边连接,节点可以是一个功能或一个函数,节点之间的有向边表示各个节点之间的依赖关系,例如可以是数据传递的顺序或关系,例如步骤B的处理必须要依赖A处理完的数据,则其有向边表示是A→B,若干个节点及节点之间的依赖关系组成了至少一个场景下的数据处理链路。根据业务实际情况,一个节点可以和若干个节点建立依赖关系,对于每一条依赖关系(也就是每一条边),每个节点需配置至少一个输入关系和至少一个输出关系。其中,有向无环图中不存在环路,即不存在从某个节点出发,经过若干个节点后又回到原节点的路径。这保证了图中没有循环,可以按照一定的顺序进行处理或计算。
有向无环图的数据处理形式分为流式形式和非流式形式,假设一个数据逻辑中,各个节点的依赖关系是A→B→C→D→E→F,如果是非流式场景,仅有一个待处理数据,且该待处理数据仅需经过一次从节点A到节点F的流程;如果是流式场景,会有大量的数据并行地重复执行A→B→C→D→E→F的流程,直到所有数据处理完毕,且流式之中,各个数据之间的流程是互相独立、异步执行的,A节点接收到了输入数据1,由A节点处理后形成输出A1,传递到B节点处理,后续A节点又接受了输入数据2,继续执行A→B→C→D→E→F,输入数据2的执行与输入数据1相互独立,即流式场景下每一个输入数据的处理流程均是相互独立的。非流式可以视为只需要运行一次的流式数据,因此支持流式数据处理的有向无环图配置一定支持非流式场景的数据处理。
经研究发现,在现有技术中,DAG通常用于处理大规模数据流或高频率事件。例如,在流式数据处理系统中,数据被划分为多个事件或任务,并通过DAG中的节点依次处理。每个节点可能依赖于前一个节点的输出结果,以确保数据的正确性、稳定性和一致性。然而,当数据量特别大或处理频率特别高时,尤其是当节点的每个输入数据的获取频率相差较大时,常规配置的有向无环图可能会出现数据乱序问题,而如果数据顺序错误或对应不同输入数据的中间值相互混淆,下游节点可能会接收到错误或不完整的输入数据,从而导致计算结果错误或数据链路崩溃。
基于此,本申请实施例提供了一种数据链路优化处理方法,以使得每个输入数据各自的数据处理链路顺序保持稳定,即使在多数据并发的情况下,也能保证数据处理的顺序与规则符合有向无环图配置,最后通过有向无环图的尾节点获取最终的数据处理结果。
本申请实施例所提供的数据链路优化处理方法,基于有向无环图实现,所述数据链路优化处理方法,包括:
根据至少一份数据链路配置生成至少一个有向无环图。
这里,在有向无环图中,每个数据执行步骤即节点,数据链路配置至少包括数据处理流程中包含的各个数据执行步骤,将配置转换为每个节点之间的数据传输关系、每个节点的输入参数类型、输出参数类型,节点信息以及节点类型。具体的,输入参数类型和输出参数类型指的是每个节点输入、输出数值的数据类型,例如int、string、float、bool等常见的数据类型,本申请对数据类型不做限制。节点信息可以包括:节点名称、节点的具体实现算法。根据本申请提供的实施例,节点类型可以包括:头节点、输出节点和尾节点。具体的,头节点表示有向无环图中接收初始输入的节点,是需要获取服务输入的节点。输出节点表示该输出节点的至少一个输出参数向下游的其他节点输出。尾节点表示在有向无环图中最终输出整个数据处理流程的处理结果的节点。在有向无环图中,节点可以分为头节点和非头节点,其中,非头节点必须有一个由其他节点输出的参数作为它的输入参数,因此对于每个非头节点,数据链路配置中必须配置各个非头节点输入数据的数据类型,而头节点的输入可以由数据配置决定,或由实际的数据处理业务确定。在有向无环图中,还包括输出节点和非输出节点,只有配置了输出参数类型的节点才可以作为输出节点,输出节点能向外输出其处理结果。
在本申请提供的实施例中,每个有向无环图中包括至少两个节点,每个节点之间通过有向边连接,每个节点包括至少一个中继单元。中继单元用于存储和管理到达该节点的数据,当到达节点的数据触发节点的运行条件时,所述中继单元控制节点执行数据处理,若到达节点的数据不满足节点的运行条件,所述中继单元控制节点不执行数据处理。
这样,根据本申请实施例所提供的数据链路优化处理方法,首先根据数据链路配置生成有向无环图。这里,根据数据链路配置生成有向无环图的方法在现有技术中有详细说明,在此不再赘述。有向无环图生成后,将数据处理业务的待处理数据输入到有向无环图的头节点中,以使每个节点按照流式顺序对接收到的数据执行对应的数据处理。当到达节点的数据触发节点的运行条件时,中继单元才控制节点执行数据处理,这样可以实现每个节点在数据处理过程中保持有向无环图的流式顺序,即使在多数据并发的情况下,也能保证数据处理的顺序符合有向无环图。最后通过有向无环图的尾节点获取最终的数据处理结果。
进一步的,每个节点的中继单元中包括至少一个输入数据队列和至少一个输出数据队列,节点的运行条件至少包括:节点对应的所有输入数据队列中均存在预设输入数据。这里,预设输入数据指的是节点进行数据处理所需要的输入数据,例如,节点A预设的输入数据为X、Y、Z,则需要当输入的X、Y、Z均满足预设条件,例如预设条件可以是每个输入数据队列中的第一个数据均已通过完整性验证,才能够视为是预设输入数据X、Y、Z。节点的输入数据队列用来存储传输到该节点,执行该节点功能所需的预设输入数据,输出数据队列用来存储需要向下游节点继续传输的数据。作为示例,当某个中间节点包括3个输入数据队列,说明该节点需要接收3个上游节点传输过来的三种数据,当这3个输入数据队列中均存在上游节点传输的预设输入数据,且均符合预设条件,则认为该节点的数据满足运行条件,此时中继单元可以控制该节点执行数据处理。
请参阅图2,图2为本申请实施例所提供的一种带有中继单元的有向无环图的示意图。如图2所示,图2中的每个节点均配置了中继单元,中继单元内包括一个ingress协程和一个egress协程,其中ingress协程表示输入协程,egress协程表示输出协程。ingress协程中包括至少一个输入数据队列,节点的输入数据队列的数量根据与该节点相连的输入有向边的数量确定,egress协程中包括至少一个输出数据队列,节点的输出数据队列的数量根据与该节点相连的输出有向边的数量确定,每个输出数据队列的数据通常相同。即便节点每个输入数据的计算频率不同,配置了ingress协程的节点也能保证按照顺序执行计算。例如,图2中的节点G的输入数据源于节点A1输出、节点C输出和节点D输出,因此节点G的中继单元中包括三个数据输入队列。在实际业务之中,每个节点的输出频率很可能不同,且相差较大,假设A1的输出频率是每秒一次,节点C的输出频率是5s一次,节点D的输出频率是30s一次,那么在等待节点D的输出时,节点A1的输出数据和节点C的输出数据需要进入节点G的中继单元中节点A1对应的输入数据队列和节点C对应的输入数据队列中进行存储,中继单元用来保证节点G仅有在节点A1的输出数据、节点C的输出数据和节点D的输出数据均已经进入了各自的队列中,中继单元才会控制节点G执行数据处理。这时节点G响应于中继单元发出的可执行信号,调用对应的处理函数,执行各个队列的第一个数据(每个队列内部是先入先出模式,第一个数据也就是最先进入该队列的数据)对应的数据处理步骤,以此实现数据的保序。若未设置中继单元、数据队列与节点运行条件,可能导致节点在还未获取到输出频率较低的数据时就开始执行函数而导致报错,或导致节点G可能基于节点D的第1个数据和节点A1的第30个数据、节点C的第6个数据执行计算,从而导致后续数据错误。
作为一种可选的实施例,响应于数据链路配置中存在数据执行条件配置,则相应有向边配置数据处理表达式,所述数据链路优化处理方法还包括:
响应于有传输数据经过第一有向边,则基于所述数据处理表达式对传输数据进行处理。
这里,第一有向边为存在数据处理表达式的有向边。作为示例,数据处理可以包括对数据进行计算、对数据加前缀和/或后缀、对数据进行明文隐藏以及对数据进行格式转换等,对此本申请不做具体限定。
针对上述步骤,在具体实施时,当数据链路配置中存在数据执行条件配置时,则相应的有向边配置有数据处理表达式。当有传输数据经过存在数据处理表达式的第一有向边时,则基于该数据处理表达式对传输数据进行处理。这里,作为示例,当第一有向边配置的数据处理表达式为data * 2,则表示将与第一有向边相连的输出节点的传输数据乘2之后再传输到与第一有向边相连的输入节点。
作为一种可选的实施例,响应于数据链路配置中存在数据输出条件配置,则相应节点配置数据输出条件,所述数据链路优化处理方法还包括:
响应于第一节点输出数据处理结果,则基于所述数据处理结果和所述数据输出条件,从第一节点的至少一个下游节点中确定目标节点,并通过输出数据队列仅将处理后的数据处理结果传输至所述目标节点。
这里,第一节点为存在数据输出条件的节点。数据输出条件可以是对数据处理结果的传输路径进行选择的条件。下游节点指的是一个节点的输出边直接连接的N个节点的输入端,则N个节点为该节点的下游节点。例如,延续图2中的示例,在节点A2可以配置数据输出条件:当节点A2输出的数据处理结果为奇数时,则将数据处理结果传输到节点C,当节点A2输出的数据处理结果为偶数时,则将数据处理结果传输到节点D。数据输出条件也可以是对数据处理结果进行过滤的条件。例如,在节点A2可以配置数据输出条件:当数据处理结果大于1才向下传输。
针对上述步骤,在具体实施时,响应于数据链路配置中存在数据输出条件配置,则相应的节点配置有数据输出条件。响应于第一节点输出数据处理结果,则基于数据处理结果和第一节点配置的数据输出条件,从第一节点的至少一个下游节点中确定目标节点,并通过输出数据队列仅将处理后的数据处理结果传输至目标节点。这里,延续上述示例,当节点A2输出的数据处理结果为奇数时,则基于该数据处理结果和上述示例中的数据输出条件,将节点C确定为目标节点,并通过输出数据队列仅将数据处理结果传输至节点C。
进一步的,所述数据链路优化处理方法还包括:
通过输出数据队列仅将处理后的数据处理结果传输至所述目标节点,且将第二信号传输至其他下游节点,所述第二信号为空信号。
这里,空信号是用来指示取消节点在当前轮次的数据处理的信号。
针对上述步骤,在具体实施时,第一节点通过输出数据队列仅将处理后的数据处理结果传输至目标节点,而对于第一节点的至少一个下游节点中出目标节点外的其他下游节点,第一节点传输空信号至其他下游节点。这里,延续上述示例,当节点A2将节点C确定为目标节点时,节点A2需将第二信号传输至节点D,节点D接收到第二信号时则停止执行当前轮次的数据处理,这样可以保证节点D的有序性。
进一步的,所述有向无环图中的节点中包括至少一个基础节点;所述数据链路优化处理方法还包括:
响应于所述基础节点出现异常,则向出现异常的基础节点的下游节点传递第一异常信号,下游节点继续按有向边方向传输第一异常信号。
这里,基础节点表示有向无环图中实现主要数据处理功能的必要节点,例如在有向无环图中的必要节点。第一异常信号为基础节点出现异常的异常信号。出现异常可以包括节点执行失败、节点执行不符合预期、节点执行超时等情况。这里,作为一种可选的实施例,节点执行超时通常是网络波动原因,需要重复尝试,如果重试成功了,则忽略该超时问题继续正常运行,若重试依旧超时,则继续重试,直到重试次数超过预设的重试阈值,还是出现超时问题,此时将超时错误转化为该节点的第一异常信号。
针对于上述步骤,在具体实施时,当有向无环图中的基础节点出现异常时,向出现异常的基础节点的下游节点传递第一异常信号,下游节点继续按有向边方向传输第一异常信号。这样,当基础节点出现异常时,从当前数据开始停止该基础节点及该基础节点后续的运行,不影响先前已传输数据的处理。
进一步的,响应于所述数据链路配置中存在数据增强处理步骤,则所述有向无环图中还包括增强节点;所述数据链路优化处理方法还包括:
响应于所述增强节点出现异常,则向出现异常的增强节点的下游增强节点传递第二异常信号,下游增强节点继续按有向边方向传输第二异常信号。
这里,增强节点表示有向无环图中实现数据增强处理步骤的节点,增强节点是有向无环图中的非必要节点,例如在有向无环图中依赖于其他基础节点或者其他增强节点的输出,并在此基础上进行数据增强处理的节点。其中,增强节点的下游节点也必然是增强节点,增强节点的上游节点可以是基础节点,也可以是增强节点。第二异常信号为增强节点出现异常的异常信号。
针对上述步骤,在具体实施时,当数据链路配置中存在数据增强处理步骤时,则有向无环图中包括增强节点。当增强节点出现异常时,则向出现异常的增强节点的下游节点传递第二异常信号,下游节点继续按有向边方向进一步传输第二异常信号。这样,当增强节点出现异常时,将第二异常信号根据有向边的顺序向下游增强节点传递,能够实现从当前数据开始停止增强节点和下游增强节点的运行,不影响之前已经运行完毕的数据,也不影响必要节点及其他增强功能对应的增强节点的数据处理。
其中,必要节点对应的数据处理步骤用于实现必要功能,而增强节点对应的数据处理步骤用于实现非必要的增强功能,且,一份数据链路配置中可以包括一个或多个增强功能,即对应于一个或多个增强功能分支。这里,根据上述两个步骤,在面向异常处理时,基础节点和增强节点的区别在于,本申请设置了中继节点,通过设置节点运行条件实现了数据保序设计,而如果基础节点在某次数据处理或数据传递期间出现异常,即,如果第N次数据传递到增强节点时出现异常,则停止第N次及之后的数据执行,但不会影响已经运行过的第0次~第N-1次的数据;而对于增强节点,若某一增强节点出现异常,则在当前及之后的处理中跳过出现异常的增强功能分支以及下游分支,并不会影响其他的必要节点和其他增强功能对应的节点。
进一步的,所述数据链路优化处理方法还包括:
响应于节点接收到第三指示信号时,节点中断运行,并根据所述有向无环图的有向边顺序将所述第三指示信号向下游节点传递;且,响应于管理单元检测到任一节点接收到第三指示信号,将所述第三指示信号输入头节点,以根据所述有向无环图的有向边将所述第三指示信号传输至所有节点,以中断所述有向无环图的运行。
这里,管理单元用于控制、管理有向无环图,第三指示信号用于指示至少一个节点中断运行,头节点表示有向无环图中接收有向无环图外部输入数据的节点。
针对上述两个步骤,在具体实施时,当有向无环图中的节点接收到指示该节点中断运行的第三指示信号时,该节点中断运行,并根据根有向无环图的有向边顺序将第三指示信号向下游节点传递。这里,第三指示信号的传输和输入数据、输出数据的传输一样,第三指示信号也通过有向无环图中的有向边顺序向下游传递。管理单元监听所有节点,当管理单元检测到任一节点接收到第三指示信号时,管理单元将第三指示信号输入头节点,头节点根据有向无环图的有向边将第三指示信号传输至有向无环图中的所有节点,保证依序中断有向无环图中所有节点的运行。
这里,作为一种可选的实施例,节点接收到第三指示信号后,当已中断节点的中继单元若检测到已中断节点满足运行条件时,即已中断节点对应的所有输入数据队列中均存在预设输入数据时,该中继单元可重启已中断节点的数据处理工作。
本申请实施例提供的数据链路优化处理方法,根据数据链路配置生成有向无环图,基于有向无环图表示数据框架内的数据关系,其数据执行的逻辑和顺序完全可控,且有向无环图的实现支持数据的异步和并行执行。有向无环图生成后,将数据处理业务的待处理数据输入到有向无环图的头节点中,中继单元能够保证每个节点、每条有向边按照流式顺序对接收到的数据稳定执行对应的数据处理,无论待处理数据的形式为流式形式还是非流式形式,仅当到达节点的数据触发节点的运行条件时,中继单元才控制节点执行数据处理,这样可以实现每个节点在数据处理过程中保持有向无环图的流式顺序,即使在多数据并发的情况下,也能保证数据处理的顺序与规则符合有向无环图配置,最后通过有向无环图的尾节点获取最终的数据处理结果。
基于相同的发明构思,本申请实施例还提供了一种数据链路优化处理装置,所述数据链路优化处理装置基于有向无环图实现,所述数据链路优化处理装置包括:
有向无环图生成模块,用于根据至少一份数据链路配置生成至少一个有向无环图;其中,每个有向无环图中包括至少两个节点,每个节点之间通过有向边连接,每个节点包括至少一个中继单元,所述中继单元用于存储和管理到达该节点的数据,当到达节点的数据触发节点的运行条件时,所述中继单元控制节点执行数据处理,若到达节点的数据不满足节点的运行条件,所述中继单元控制节点不执行数据处理。
进一步的,每个节点的中继单元中包括至少一个输入数据队列和至少一个输出数据队列,节点的运行条件至少包括:节点对应的所有输入数据队列中均存在预设输入数据 。
进一步的,该节点的输入数据队列的数量根据与该节点相连的输入有向边的数量确定,该节点的输出数据队列的数量根据与该节点相连的输出有向边的数量确定,其中,每个输出数据队列的数据均相同。
进一步的,响应于数据链路配置中存在数据执行条件配置,则相应有向边配置数据处理表达式,所述数据链路优化处理装置还包括第一处理模块,所述第一处理模块用于:
响应于有传输数据经过第一有向边,则基于所述数据处理表达式对传输数据进行处理,所述第一有向边为存在数据处理表达式的有向边。
进一步的,响应于数据链路配置中存在数据输出条件配置,则相应节点配置数据输出条件,所述数据链路优化处理装置还包括第一传输模块,所述第一传输模块用于:
响应于第一节点输出数据处理结果,则基于所述数据处理结果和所述数据输出条件,从第一节点的至少一个下游节点中确定目标节点,并通过输出数据队列仅将处理后的数据处理结果传输至所述目标节点 ,所述第一节点为存在数据输出条件的节点。
进一步的,所述数据链路优化处理装置还包括第二传输模块,所述第二传输模块用于:
通过输出数据队列仅将处理后的数据处理结果传输至所述目标节点,且将第二信号传输至其他下游节点,所述第二信号为空信号 。
进一步的,所述有向无环图中的节点中包括至少一个基础节点;所述数据链路优化处理装置还包括第三传输模块,所述第三传输模块用于:
响应于所述基础节点出现异常,则向出现异常的基础节点的下游节点传递第一异常信号,下游节点继续按有向边方向传输第一异常信号;所述基础节点表示所述有向无环图实现主要数据处理功能的必要节点,第一异常信号为基础节点出现异常的异常信号。
进一步的,响应于所述数据链路配置中存在数据增强处理步骤,则所述有向无环图中还包括增强节点;所述数据链路优化处理装置还包括第四传输模块,所述第四传输模块用于:
响应于所述增强节点出现异常,则向出现异常的增强节点的下游增强节点传递第二异常信号,下游增强节点继续按有向边方向传输第二异常信号;所述增强节点表示所述有向无环图实现数据增强处理步骤的节点,所述第二异常信号为增强节点出现异常的异常信号。
进一步的,所述数据链路优化处理装置还包括第五传输模块,所述第五传输模块用于:
响应于节点接收到第三指示信号时,节点中断运行,并根据所述有向无环图的有向边顺序将所述第三指示信号向下游节点传递;
且,响应于管理单元检测到任一节点接收到第三指示信号,将所述第三指示信号输入头节点,以根据所述有向无环图的有向边将所述第三指示信号传输至所有节点,以中断所述有向无环图的运行;
其中,管理单元用于控制、管理所述有向无环图,第三指示信号用于指示至少一个节点中断运行,头节点表示有向无环图中接收初始输入的节点。
请参阅图3,图3为本申请实施例所提供的一种电子设备的结构示意图。如图3中所示,所述电子设备300包括处理器310、存储器320和总线330。
所述存储器320存储有所述处理器310可执行的机器可读指令,当电子设备300运行时,所述处理器310与所述存储器320之间通过总线330通信,所述机器可读指令被所述处理器310执行时,可以执行方法实施例中的数据链路优化处理方法的步骤,具体实现方式可参见方法实施例,在此不再赘述。
本申请实施例还提供一种计算机可读存储介质,该计算机可读存储介质上存储有计算机程序,该计算机程序被处理器运行时可以执行方法实施例中的数据链路优化处理方法的步骤,具体实现方式可参见方法实施例,在此不再赘述。
所属领域的技术人员可以清楚地了解到,为描述的方便和简洁,上述描述的系统、装置和单元的具体工作过程,可以参考前述方法实施例中的对应过程,在此不再赘述。
在本申请所提供的几个实施例中,应该理解到,所揭露的系统、装置和方法,可以通过其它的方式实现。以上所描述的装置实施例仅仅是示意性的,例如,所述单元的划分,仅仅为一种逻辑功能划分,实际实现时可以有另外的划分方式,又例如,多个单元或组件可以结合或者可以集成到另一个系统,或一些特征可以忽略,或不执行。另一点,所显示或讨论的相互之间的耦合或直接耦合或通信连接可以是通过一些通信接口,装置或单元的间接耦合或通信连接,可以是电性,机械或其它的形式。
所述作为分离部件说明的单元可以是或者也可以不是物理上分开的,作为单元显示的部件可以是或者也可以不是物理单元,即可以位于一个地方,或者也可以分布到多个网络单元上。可以根据实际的需要选择其中的部分或者全部单元来实现本实施例方案的目的。
另外,在本申请各个实施例中的各功能单元可以集成在一个处理单元中,也可以是各个单元单独物理存在,也可以两个或两个以上单元集成在一个单元中。
所述功能如果以软件功能单元的形式实现并作为独立的产品销售或使用时,可以存储在一个处理器可执行的非易失的计算机可读取存储介质中。基于这样的理解,本申请的技术方案本质上或者说对现有技术做出贡献的部分或者该技术方案的部分可以以软件产品的形式体现出来,该计算机软件产品存储在一个存储介质中,包括若干指令用以使得一台计算机设备(可以是个人计算机,服务器,或者网络设备等)执行本申请各个实施例所述方法的全部或部分步骤。而前述的存储介质包括:U盘、移动硬盘、只读存储器(Read-OnlyMemory,ROM)、随机存取存储器(Random Access Memory,RAM)、磁碟或者光盘等各种可以存储程序代码的介质。
最后应说明的是:以上所述实施例,仅为本申请的具体实施方式,用以说明本申请的技术方案,而非对其限制,本申请的保护范围并不局限于此,尽管参照前述实施例对本申请进行了详细的说明,本领域的普通技术人员应当理解:任何熟悉本技术领域的技术人员在本申请揭露的技术范围内,其依然可以对前述实施例所记载的技术方案进行修改或可轻易想到变化,或者对其中部分技术特征进行等同替换;而这些修改、变化或者替换,并不使相应技术方案的本质脱离本申请实施例技术方案的精神和范围,都应涵盖在本申请的保护范围之内。因此,本申请的保护范围应以权利要求的保护范围为准。
Claims (6)
1.一种数据链路优化处理方法,所述数据链路优化处理方法基于有向无环图实现,其特征在于,所述数据链路优化处理方法包括:
根据至少一份数据链路配置生成至少一个有向无环图;其中,每个有向无环图中包括至少两个节点,每个节点之间通过有向边连接,每个节点包括至少一个中继单元,所述中继单元用于存储和管理到达该节点的数据,当到达节点的数据触发节点的运行条件时,所述中继单元控制节点执行数据处理,若到达节点的数据不满足节点的运行条件,所述中继单元控制节点不执行数据处理;
每个节点的中继单元中包括至少一个输入数据队列和至少一个输出数据队列,节点的运行条件至少包括:节点对应的所有输入数据队列中均存在预设输入数据;
该节点的输入数据队列的数量根据与该节点相连的输入有向边的数量确定,该节点的输出数据队列的数量根据与该节点相连的输出有向边的数量确定,其中,每个输出数据队列的数据均相同;
响应于数据链路配置中存在数据输出条件配置,则相应节点配置数据输出条件,所述数据链路优化处理方法还包括:
响应于第一节点输出数据处理结果,则基于所述数据处理结果和所述数据输出条件,从第一节点的至少一个下游节点中确定目标节点,并通过输出数据队列仅将处理后的数据处理结果传输至所述目标节点,所述第一节点为存在数据输出条件的节点;
所述数据链路优化处理方法还包括:
通过输出数据队列仅将处理后的数据处理结果传输至所述目标节点,且将第二信号传输至其他下游节点,所述第二信号为空信号。
2.根据权利要求1所述的数据链路优化处理方法,其特征在于,响应于数据链路配置中存在数据执行条件配置,则相应有向边配置数据处理表达式,所述数据链路优化处理方法还包括:
响应于有传输数据经过第一有向边,则基于所述数据处理表达式对传输数据进行处理,所述第一有向边为存在数据处理表达式的有向边。
3.根据权利要求1所述的数据链路优化处理方法,其特征在于,所述有向无环图中的节点中包括至少一个基础节点;所述数据链路优化处理方法还包括:
响应于所述基础节点出现异常,则向出现异常的基础节点的下游节点传递第一异常信号,下游节点继续按有向边方向传输第一异常信号;所述基础节点表示所述有向无环图实现数据处理功能的必要节点,第一异常信号为基础节点出现异常的异常信号。
4.根据权利要求3所述的数据链路优化处理方法,其特征在于,响应于所述数据链路配置中存在数据增强处理步骤,则所述有向无环图中还包括增强节点;所述数据链路优化处理方法还包括:
响应于所述增强节点出现异常,则向出现异常的增强节点的下游增强节点传递第二异常信号,下游增强节点继续按有向边方向传输第二异常信号;所述增强节点表示所述有向无环图实现数据增强处理步骤的节点,所述第二异常信号为增强节点出现异常的异常信号。
5.根据权利要求1所述的数据链路优化处理方法,其特征在于,所述数据链路优化处理方法还包括:
响应于节点接收到第三指示信号时,节点中断运行,并根据所述有向无环图的有向边顺序将所述第三指示信号向下游节点传递;
且,响应于管理单元检测到任一节点接收到第三指示信号,将所述第三指示信号输入头节点,以根据所述有向无环图的有向边将所述第三指示信号传输至所有节点,以中断所述有向无环图的运行;
其中,管理单元用于控制、管理所述有向无环图,第三指示信号用于指示至少一个节点中断运行,头节点表示有向无环图中接收初始输入的节点。
6.一种数据链路优化处理装置,所述数据链路优化处理装置基于有向无环图实现,其特征在于,所述数据链路优化处理装置包括:
有向无环图生成模块,用于根据至少一份数据链路配置生成至少一个有向无环图;其中,每个有向无环图中包括至少两个节点,每个节点之间通过有向边连接,每个节点包括至少一个中继单元,所述中继单元用于存储和管理到达该节点的数据,当到达节点的数据触发节点的运行条件时,所述中继单元控制节点执行数据处理,若到达节点的数据不满足节点的运行条件,所述中继单元控制节点不执行数据处理;
每个节点的中继单元中包括至少一个输入数据队列和至少一个输出数据队列,节点的运行条件至少包括:节点对应的所有输入数据队列中均存在预设输入数据;
该节点的输入数据队列的数量根据与该节点相连的输入有向边的数量确定,该节点的输出数据队列的数量根据与该节点相连的输出有向边的数量确定,其中,每个输出数据队列的数据均相同;
响应于数据链路配置中存在数据输出条件配置,则相应节点配置数据输出条件,所述数据链路优化处理装置还包括第一传输模块,所述第一传输模块用于:
响应于第一节点输出数据处理结果,则基于所述数据处理结果和所述数据输出条件,从第一节点的至少一个下游节点中确定目标节点,并通过输出数据队列仅将处理后的数据处理结果传输至所述目标节点 ,所述第一节点为存在数据输出条件的节点;
所述数据链路优化处理装置还包括第二传输模块,所述第二传输模块用于:
通过输出数据队列仅将处理后的数据处理结果传输至所述目标节点,且将第二信号传输至其他下游节点,所述第二信号为空信号。
Priority Applications (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202510874890.7A CN120407124B (zh) | 2025-06-27 | 2025-06-27 | 一种数据链路优化处理方法及数据链路优化处理装置 |
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202510874890.7A CN120407124B (zh) | 2025-06-27 | 2025-06-27 | 一种数据链路优化处理方法及数据链路优化处理装置 |
Publications (2)
| Publication Number | Publication Date |
|---|---|
| CN120407124A CN120407124A (zh) | 2025-08-01 |
| CN120407124B true CN120407124B (zh) | 2025-10-10 |
Family
ID=96525161
Family Applications (1)
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| CN202510874890.7A Active CN120407124B (zh) | 2025-06-27 | 2025-06-27 | 一种数据链路优化处理方法及数据链路优化处理装置 |
Country Status (1)
| Country | Link |
|---|---|
| CN (1) | CN120407124B (zh) |
Citations (2)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN109033109A (zh) * | 2017-06-09 | 2018-12-18 | 杭州海康威视数字技术股份有限公司 | 数据处理方法及系统 |
| CN115309531A (zh) * | 2022-08-18 | 2022-11-08 | 北京智慧星光信息技术有限公司 | 基于实时任务队列的高并发多任务调度管理方法 |
Family Cites Families (5)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| EP3651020A1 (en) * | 2017-11-20 | 2020-05-13 | Shanghai Cambricon Information Technology Co., Ltd | Computer equipment, data processing method, and storage medium |
| CN110190991B (zh) * | 2019-05-21 | 2020-06-02 | 华中科技大学 | 一种多应用场景下的分布式流处理系统的容错方法 |
| CN114510338B (zh) * | 2022-04-19 | 2022-09-06 | 浙江大华技术股份有限公司 | 一种任务调度方法、任务调度设备和计算机可读存储介质 |
| CN115794393A (zh) * | 2022-11-28 | 2023-03-14 | 北京锐安科技有限公司 | 业务模型的执行方法、装置、服务器及存储介质 |
| CN119987971A (zh) * | 2025-01-27 | 2025-05-13 | 上海哔哩哔哩科技有限公司 | 执行任务的方法、相关装置及计算机程序产品 |
-
2025
- 2025-06-27 CN CN202510874890.7A patent/CN120407124B/zh active Active
Patent Citations (2)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN109033109A (zh) * | 2017-06-09 | 2018-12-18 | 杭州海康威视数字技术股份有限公司 | 数据处理方法及系统 |
| CN115309531A (zh) * | 2022-08-18 | 2022-11-08 | 北京智慧星光信息技术有限公司 | 基于实时任务队列的高并发多任务调度管理方法 |
Also Published As
| Publication number | Publication date |
|---|---|
| CN120407124A (zh) | 2025-08-01 |
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| CN113220542B (zh) | 一种计算任务的预警方法、装置、计算机设备及存储介质 | |
| CN107092521B (zh) | 一种分布式任务调度方法、装置及系统 | |
| KR20110116178A (ko) | 태스크 실행 관리 | |
| CN112860342B (zh) | 微服务配置的方法、装置、设备、系统以及存储介质 | |
| CN111080449A (zh) | 区块链的跨链交易方法、管理节点、区块链网络 | |
| CN108089915B (zh) | 基于消息队列的业务控件化处理的方法及系统 | |
| KR20210096077A (ko) | 로봇 에이전트 관리를 위한 시스템 및 방법 | |
| CN113127182A (zh) | 深度学习调度配置系统及方法 | |
| CN120407124B (zh) | 一种数据链路优化处理方法及数据链路优化处理装置 | |
| JP6613315B2 (ja) | トランザクション処理システムおよびトランザクション制御方法 | |
| CN114237910A (zh) | 客户端负载均衡实现方法及装置 | |
| JP7507250B2 (ja) | 分散コンピューティングネットワークにおける通信の最適化 | |
| JP2006209593A (ja) | 情報処理装置および情報処理方法 | |
| Nakayama et al. | A p2p model of publish/subscribe systems | |
| CN113328917A (zh) | 用于epa链路诊断的方法、epa设备和计算机存储介质 | |
| US9003226B2 (en) | Core file limiter for abnormally terminating processes | |
| CN116016368B (zh) | 多方安全计算中的网络传输方法和执行该方法的调度器 | |
| CN119203164B (zh) | 一种支持编排的分布式扫描器系统实现方法和实现装置 | |
| CN117909907B (zh) | 高通量计算平台及其异常排除方法、装置及存储介质 | |
| Li et al. | Orchestrating safe streaming computations with precise control | |
| JP2001297016A (ja) | 命令実行システム | |
| Kruk | Open Problem—Protocols for Resource-Sharing Networks with Locally Edge-Minimal Fluid Models | |
| Yamada | Standardization of spacecraft and ground systems based on a spacecraft functional model | |
| Albassam et al. | Variable recovery and adaptation connectors for dynamic software product lines | |
| JP2018147510A (ja) | サーバ装置およびサーバシステム |
Legal Events
| Date | Code | Title | Description |
|---|---|---|---|
| PB01 | Publication | ||
| PB01 | Publication | ||
| SE01 | Entry into force of request for substantive examination | ||
| SE01 | Entry into force of request for substantive examination | ||
| GR01 | Patent grant | ||
| GR01 | Patent grant |