From 45552fc099ea8af0d45ddc6c60e2db28de3de329 Mon Sep 17 00:00:00 2001 From: Shekhar Patnaik Date: Tue, 7 Oct 2025 14:43:04 +0100 Subject: [PATCH] Support Third Party catalog items in triggers This MR adds support for third party catalog items in flow triggers Changelog: changed EE: true --- ee/app/models/ai/catalog/item.rb | 2 +- ee/app/models/ai/flow_trigger.rb | 2 +- .../services/ai/flow_triggers/run_service.rb | 36 ++++++++++++- ee/spec/models/ai/catalog/item_spec.rb | 24 +++++++++ ee/spec/models/ai/flow_trigger_spec.rb | 15 ++++++ .../ai/flow_triggers/run_service_spec.rb | 53 +++++++++++++++++++ 6 files changed, 129 insertions(+), 3 deletions(-) diff --git a/ee/app/models/ai/catalog/item.rb b/ee/app/models/ai/catalog/item.rb index 01399bf7052dfb..e5ece1f1a58777 100644 --- a/ee/app/models/ai/catalog/item.rb +++ b/ee/app/models/ai/catalog/item.rb @@ -113,7 +113,7 @@ def definition(pinned_version_prefix = nil, pinned_version_id = nil) case item_type.to_sym when AGENT_TYPE AgentDefinition.new(self, version) - when FLOW_TYPE + when FLOW_TYPE, THIRD_PARTY_FLOW_TYPE raise ArgumentError, "pinned_version_id is not supported for flows" if pinned_version_id FlowDefinition.new(self, version) diff --git a/ee/app/models/ai/flow_trigger.rb b/ee/app/models/ai/flow_trigger.rb index cdb860f14451bb..6e1fca86bd0c57 100644 --- a/ee/app/models/ai/flow_trigger.rb +++ b/ee/app/models/ai/flow_trigger.rb @@ -60,7 +60,7 @@ def catalog_item_valid errors.add(:base, 'ai_catalog_item_consumer project does not match project') end - return if ai_catalog_item_consumer.item.flow? + return if ai_catalog_item_consumer.item.flow? || ai_catalog_item_consumer.item.third_party_flow? errors.add(:base, 'ai_catalog_item_consumer is not a flow') end diff --git a/ee/app/services/ai/flow_triggers/run_service.rb b/ee/app/services/ai/flow_triggers/run_service.rb index 254feda11b197a..62165f0afcbb27 100644 --- a/ee/app/services/ai/flow_triggers/run_service.rb +++ b/ee/app/services/ai/flow_triggers/run_service.rb @@ -19,7 +19,7 @@ def execute(params) ) response, workflow = note_service.execute(params) do |updated_params| - if flow_trigger.ai_catalog_item_consumer.present? + if should_use_catalog_execution? start_catalog_workflow(params) else run_workload(updated_params) @@ -40,6 +40,13 @@ def execute(params) attr_reader :project, :current_user, :resource, :flow_trigger, :flow_trigger_user + def should_use_catalog_execution? + return false unless flow_trigger.ai_catalog_item_consumer.present? + + catalog_item = flow_trigger.ai_catalog_item_consumer.item + catalog_item.item_type.to_sym != ::Ai::Catalog::Item::THIRD_PARTY_FLOW_TYPE + end + def run_workload(params) workflow_params = { workflow_definition: "Trigger - #{flow_trigger.description}", @@ -111,6 +118,33 @@ def start_catalog_workflow(params) end def fetch_flow_definition + if third_party_catalog_flow? + fetch_catalog_flow_definition + else + fetch_yaml_flow_definition + end + end + + def third_party_catalog_flow? + return false unless flow_trigger.ai_catalog_item_consumer.present? + + catalog_item = flow_trigger.ai_catalog_item_consumer.item + catalog_item.item_type.to_sym == ::Ai::Catalog::Item::THIRD_PARTY_FLOW_TYPE + end + + def fetch_catalog_flow_definition + item_consumer = flow_trigger.ai_catalog_item_consumer + version = item_consumer.pinned_version_prefix + item_version = item_consumer.item.resolve_version(version) + return unless item_version + + definition = item_version.definition + return unless definition.is_a?(Hash) + + definition.stringify_keys + end + + def fetch_yaml_flow_definition root_ref = project.repository.root_ref flow_definition = project.repository.blob_data_at(root_ref, flow_trigger.config_path) return unless flow_definition diff --git a/ee/spec/models/ai/catalog/item_spec.rb b/ee/spec/models/ai/catalog/item_spec.rb index c522836764bf8a..0c02b205e88cba 100644 --- a/ee/spec/models/ai/catalog/item_spec.rb +++ b/ee/spec/models/ai/catalog/item_spec.rb @@ -430,6 +430,30 @@ end end + context 'when item_type is third_party_flow' do + let(:item) { create(:ai_catalog_third_party_flow) } + + it 'returns a FlowDefinition instance' do + result = item.definition(version.version) + + expect(result).to be_an_instance_of(Ai::Catalog::FlowDefinition) + end + + it 'passes the item and version to FlowDefinition' do + expect(Ai::Catalog::FlowDefinition).to receive(:new).with(item, version) + + item.definition(version.version) + end + + context 'when pinned_version_id is provided' do + it 'raises an ArgumentError' do + expect { item.definition(nil, item.versions.first.id) }.to raise_error( + ArgumentError, 'pinned_version_id is not supported for flows' + ) + end + end + end + describe 'version resolution' do let_it_be(:item) { create(:ai_catalog_agent) } let_it_be(:v1_1) { create(:ai_catalog_agent_version, item: item, version: '1.1.0') } diff --git a/ee/spec/models/ai/flow_trigger_spec.rb b/ee/spec/models/ai/flow_trigger_spec.rb index 99112fab97268a..873dcdd665d371 100644 --- a/ee/spec/models/ai/flow_trigger_spec.rb +++ b/ee/spec/models/ai/flow_trigger_spec.rb @@ -124,6 +124,21 @@ expect(flow_trigger.errors[:base]).to include('ai_catalog_item_consumer is not a flow') end end + + context 'when item is a third_party_flow' do + let(:item) { create(:ai_catalog_third_party_flow) } + + it 'is valid' do + flow_trigger = build( + :ai_flow_trigger, + project: project, + config_path: nil, + ai_catalog_item_consumer: item_consumer + ) + + expect(flow_trigger).to be_valid + end + end end end diff --git a/ee/spec/services/ai/flow_triggers/run_service_spec.rb b/ee/spec/services/ai/flow_triggers/run_service_spec.rb index de684556520177..49550a6a81d61c 100644 --- a/ee/spec/services/ai/flow_triggers/run_service_spec.rb +++ b/ee/spec/services/ai/flow_triggers/run_service_spec.rb @@ -761,5 +761,58 @@ end end end + + context 'when flow trigger has ai_catalog_item_consumer with third_party_flow' do + let_it_be(:third_party_catalog_item) do + create(:ai_catalog_item, :third_party_flow, project: project) + end + + let_it_be(:third_party_consumer) do + create(:ai_catalog_item_consumer, + item: third_party_catalog_item, project: project, pinned_version_prefix: nil) + end + + let_it_be(:flow_trigger_with_third_party) do + create(:ai_flow_trigger, + project: project, + user: service_account, + config_path: nil, + ai_catalog_item_consumer: third_party_consumer) + end + + subject(:service) do + described_class.new( + project: project, + current_user: current_user, + resource: resource, + flow_trigger: flow_trigger_with_third_party + ) + end + + it 'uses run_workload instead of catalog execution' do + expect(::Ai::Catalog::Flows::ExecuteService).not_to receive(:new) + expect(::Ci::Workloads::RunWorkloadService).to receive(:new).and_call_original + + service.execute(params) + end + + it 'fetches definition from catalog item version' do + expect { service.execute(params) }.to change { ::Ci::Workloads::Workload.count }.by(1) + + response = service.execute(params) + expect(response).to be_success + + workload = response.payload + expect(workload).to be_persisted + end + + it 'creates workflow and workload association' do + expect { service.execute(params) }.to change { ::Ai::DuoWorkflows::Workflow.count }.by(1) + expect { service.execute(params) }.to change { ::Ai::DuoWorkflows::WorkflowsWorkload.count }.by(1) + + response = service.execute(params) + expect(response).to be_success + end + end end end -- GitLab