From d53399e5460f05fa97240f1565b1d84812d4f6a7 Mon Sep 17 00:00:00 2001 From: Harry Date: Sat, 11 Oct 2025 12:19:57 +0800 Subject: [PATCH] refactor(trigger): rename trigger-related fields and methods for consistency - Updated the naming convention from 'trigger_name' to 'event_name' across various models and services to align with the new event-driven architecture. - Refactored methods in PluginTriggerManager and PluginTriggerProviderController to use 'invoke_trigger_event' instead of 'invoke_trigger'. - Adjusted database migration scripts to reflect changes in the schema, including the addition of 'event_name' and 'subscription_id' fields in the workflow_plugin_triggers table. - Removed deprecated trigger-related methods in WorkflowPluginTriggerService to streamline the codebase. --- api/controllers/console/app/workflow.py | 3 +- api/core/plugin/entities/request.py | 8 +- api/core/plugin/impl/trigger.py | 161 ++++---- api/core/trigger/entities/entities.py | 19 +- api/core/trigger/provider.py | 23 +- api/core/trigger/trigger_manager.py | 31 +- ..._09_26_1205-132392a2635f_plugin_trigger.py | 20 +- ...06-86f068bf56fb_plugin_trigger_workflow.py | 62 --- ...26_1207-875c659da2f8_plugin_trigger_idx.py | 37 -- api/models/workflow.py | 6 +- api/services/trigger/trigger_service.py | 30 +- .../workflow_plugin_trigger_service.py | 357 +----------------- 12 files changed, 149 insertions(+), 608 deletions(-) delete mode 100644 api/migrations/versions/2025_09_26_1206-86f068bf56fb_plugin_trigger_workflow.py delete mode 100644 api/migrations/versions/2025_09_26_1207-875c659da2f8_plugin_trigger_idx.py diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 0939cf41fe..b5c9fe18da 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -1135,8 +1135,7 @@ class DraftWorkflowTriggerRunApi(Resource): if not subscription_id: return jsonable_encoder({"status": "error", "message": "Subscription ID not found"}), 404 - # TODO Frontend data management is completely messy here - provider_id = TriggerProviderID(node_data.get("provider_name")) + provider_id = TriggerProviderID(node_data.get("provider_id")) pool_key: str = PluginTriggerDebugEvent.build_pool_key( tenant_id=app_model.tenant_id, provider_id=provider_id, diff --git a/api/core/plugin/entities/request.py b/api/core/plugin/entities/request.py index f0c1be6a6b..07752d7841 100644 --- a/api/core/plugin/entities/request.py +++ b/api/core/plugin/entities/request.py @@ -241,12 +241,8 @@ class RequestFetchAppInfo(BaseModel): app_id: str -class Event(BaseModel): - variables: Mapping[str, Any] - - -class TriggerInvokeResponse(BaseModel): - event: Event +class TriggerInvokeEventResponse(BaseModel): + variables: Mapping[str, Any] = Field(default_factory=dict) cancelled: Optional[bool] = False diff --git a/api/core/plugin/impl/trigger.py b/api/core/plugin/impl/trigger.py index 8da132f1b6..147e320f36 100644 --- a/api/core/plugin/impl/trigger.py +++ b/api/core/plugin/impl/trigger.py @@ -1,5 +1,5 @@ import binascii -from collections.abc import Mapping +from collections.abc import Generator, Mapping from typing import Any from flask import Request @@ -8,17 +8,18 @@ from core.plugin.entities.plugin_daemon import CredentialType, PluginTriggerProv from core.plugin.entities.request import ( PluginTriggerDispatchResponse, TriggerDispatchResponse, - TriggerInvokeResponse, + TriggerInvokeEventResponse, TriggerSubscriptionResponse, TriggerValidateProviderCredentialsResponse, ) from core.plugin.impl.base import BasePluginClient from core.plugin.utils.http_parser import deserialize_response, serialize_request from core.trigger.entities.entities import Subscription -from models.provider_ids import GenericProviderID, TriggerProviderID +from models.provider_ids import TriggerProviderID class PluginTriggerManager(BasePluginClient): + def fetch_trigger_providers(self, tenant_id: str) -> list[PluginTriggerProviderEntity]: """ Fetch trigger providers for the given tenant. @@ -33,10 +34,10 @@ class PluginTriggerManager(BasePluginClient): return json_response - response = self._request_with_plugin_daemon_response( - "GET", - f"plugin/{tenant_id}/management/triggers", - list[PluginTriggerProviderEntity], + response: list[PluginTriggerProviderEntity] = self._request_with_plugin_daemon_response( + method="GET", + path=f"plugin/{tenant_id}/management/triggers", + type=list[PluginTriggerProviderEntity], params={"page": 1, "page_size": 256}, transformer=transformer, ) @@ -63,10 +64,10 @@ class PluginTriggerManager(BasePluginClient): return json_response - response = self._request_with_plugin_daemon_response( - "GET", - f"plugin/{tenant_id}/management/trigger", - PluginTriggerProviderEntity, + response: PluginTriggerProviderEntity = self._request_with_plugin_daemon_response( + method="GET", + path=f"plugin/{tenant_id}/management/trigger", + type=PluginTriggerProviderEntity, params={"provider": provider_id.provider_name, "plugin_id": provider_id.plugin_id}, transformer=transformer, ) @@ -79,31 +80,30 @@ class PluginTriggerManager(BasePluginClient): return response - def invoke_trigger( + def invoke_trigger_event( self, tenant_id: str, user_id: str, provider: str, - trigger: str, + event_name: str, credentials: Mapping[str, str], credential_type: CredentialType, request: Request, parameters: Mapping[str, Any], - ) -> TriggerInvokeResponse: + ) -> TriggerInvokeEventResponse: """ Invoke a trigger with the given parameters. """ - trigger_provider_id = GenericProviderID(provider) - - response = self._request_with_plugin_daemon_response_stream( - "POST", - f"plugin/{tenant_id}/dispatch/trigger/invoke", - TriggerInvokeResponse, + provider_id = TriggerProviderID(provider) + response: Generator[TriggerInvokeEventResponse, None, None] = self._request_with_plugin_daemon_response_stream( + method="POST", + path=f"plugin/{tenant_id}/dispatch/trigger/invoke_event", + type=TriggerInvokeEventResponse, data={ "user_id": user_id, "data": { - "provider": trigger_provider_id.provider_name, - "trigger": trigger, + "provider": provider_id.provider_name, + "event": event_name, "credentials": credentials, "credential_type": credential_type, "raw_http_request": binascii.hexlify(serialize_request(request)).decode(), @@ -111,13 +111,13 @@ class PluginTriggerManager(BasePluginClient): }, }, headers={ - "X-Plugin-ID": trigger_provider_id.plugin_id, + "X-Plugin-ID": provider_id.plugin_id, "Content-Type": "application/json", }, ) for resp in response: - return TriggerInvokeResponse(event=resp.event) + return resp raise ValueError("No response received from plugin daemon for invoke trigger") @@ -127,23 +127,24 @@ class PluginTriggerManager(BasePluginClient): """ Validate the credentials of the trigger provider. """ - trigger_provider_id = GenericProviderID(provider) - - response = self._request_with_plugin_daemon_response_stream( - "POST", - f"plugin/{tenant_id}/dispatch/trigger/validate_credentials", - TriggerValidateProviderCredentialsResponse, - data={ - "user_id": user_id, - "data": { - "provider": trigger_provider_id.provider_name, - "credentials": credentials, + provider_id = TriggerProviderID(provider) + response: Generator[TriggerValidateProviderCredentialsResponse, None, None] = ( + self._request_with_plugin_daemon_response_stream( + method="POST", + path=f"plugin/{tenant_id}/dispatch/trigger/validate_credentials", + type=TriggerValidateProviderCredentialsResponse, + data={ + "user_id": user_id, + "data": { + "provider": provider_id.provider_name, + "credentials": credentials, + }, }, - }, - headers={ - "X-Plugin-ID": trigger_provider_id.plugin_id, - "Content-Type": "application/json", - }, + headers={ + "X-Plugin-ID": provider_id.plugin_id, + "Content-Type": "application/json", + }, + ) ) for resp in response: @@ -162,24 +163,25 @@ class PluginTriggerManager(BasePluginClient): """ Dispatch an event to triggers. """ - trigger_provider_id = GenericProviderID(provider) - - response = self._request_with_plugin_daemon_response_stream( - "POST", - f"plugin/{tenant_id}/dispatch/trigger/dispatch_event", - PluginTriggerDispatchResponse, - data={ - "user_id": user_id, - "data": { - "provider": trigger_provider_id.provider_name, - "subscription": subscription, - "raw_http_request": binascii.hexlify(serialize_request(request)).decode(), + provider_id = TriggerProviderID(provider) + response: Generator[PluginTriggerDispatchResponse, None, None] = ( + self._request_with_plugin_daemon_response_stream( + method="POST", + path=f"plugin/{tenant_id}/dispatch/trigger/dispatch_event", + type=PluginTriggerDispatchResponse, + data={ + "user_id": user_id, + "data": { + "provider": provider_id.provider_name, + "subscription": subscription, + "raw_http_request": binascii.hexlify(serialize_request(request)).decode(), + }, }, - }, - headers={ - "X-Plugin-ID": trigger_provider_id.plugin_id, - "Content-Type": "application/json", - }, + headers={ + "X-Plugin-ID": provider_id.plugin_id, + "Content-Type": "application/json", + }, + ) ) for resp in response: @@ -202,23 +204,22 @@ class PluginTriggerManager(BasePluginClient): """ Subscribe to a trigger. """ - trigger_provider_id = GenericProviderID(provider) - - response = self._request_with_plugin_daemon_response_stream( - "POST", - f"plugin/{tenant_id}/dispatch/trigger/subscribe", - TriggerSubscriptionResponse, + provider_id = TriggerProviderID(provider) + response: Generator[TriggerSubscriptionResponse, None, None] = self._request_with_plugin_daemon_response_stream( + method="POST", + path=f"plugin/{tenant_id}/dispatch/trigger/subscribe", + type=TriggerSubscriptionResponse, data={ "user_id": user_id, "data": { - "provider": trigger_provider_id.provider_name, + "provider": provider_id.provider_name, "credentials": credentials, "endpoint": endpoint, "parameters": parameters, }, }, headers={ - "X-Plugin-ID": trigger_provider_id.plugin_id, + "X-Plugin-ID": provider_id.plugin_id, "Content-Type": "application/json", }, ) @@ -239,22 +240,21 @@ class PluginTriggerManager(BasePluginClient): """ Unsubscribe from a trigger. """ - trigger_provider_id = GenericProviderID(provider) - - response = self._request_with_plugin_daemon_response_stream( - "POST", - f"plugin/{tenant_id}/dispatch/trigger/unsubscribe", - TriggerSubscriptionResponse, + provider_id = TriggerProviderID(provider) + response: Generator[TriggerSubscriptionResponse, None, None] = self._request_with_plugin_daemon_response_stream( + method="POST", + path=f"plugin/{tenant_id}/dispatch/trigger/unsubscribe", + type=TriggerSubscriptionResponse, data={ "user_id": user_id, "data": { - "provider": trigger_provider_id.provider_name, + "provider": provider_id.provider_name, "subscription": subscription.model_dump(), "credentials": credentials, }, }, headers={ - "X-Plugin-ID": trigger_provider_id.plugin_id, + "X-Plugin-ID": provider_id.plugin_id, "Content-Type": "application/json", }, ) @@ -275,22 +275,21 @@ class PluginTriggerManager(BasePluginClient): """ Refresh a trigger subscription. """ - trigger_provider_id = GenericProviderID(provider) - - response = self._request_with_plugin_daemon_response_stream( - "POST", - f"plugin/{tenant_id}/dispatch/trigger/refresh", - TriggerSubscriptionResponse, + provider_id = TriggerProviderID(provider) + response: Generator[TriggerSubscriptionResponse, None, None] = self._request_with_plugin_daemon_response_stream( + method="POST", + path=f"plugin/{tenant_id}/dispatch/trigger/refresh", + type=TriggerSubscriptionResponse, data={ "user_id": user_id, "data": { - "provider": trigger_provider_id.provider_name, + "provider": provider_id.provider_name, "subscription": subscription.model_dump(), "credentials": credentials, }, }, headers={ - "X-Plugin-ID": trigger_provider_id.plugin_id, + "X-Plugin-ID": provider_id.plugin_id, "Content-Type": "application/json", }, ) diff --git a/api/core/trigger/entities/entities.py b/api/core/trigger/entities/entities.py index 0044b545d7..1240084fda 100644 --- a/api/core/trigger/entities/entities.py +++ b/api/core/trigger/entities/entities.py @@ -269,22 +269,6 @@ class TriggerEventData(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) -class TriggerInputs(BaseModel): - """Standard inputs for trigger nodes.""" - - request_id: str - trigger_name: str - subscription_id: str - - def to_workflow_args(self) -> dict[str, Any]: - """Convert to workflow arguments format.""" - return {"inputs": self.model_dump(), "files": []} - - def to_dict(self) -> dict[str, Any]: - """Convert to dict (alias for model_dump).""" - return self.model_dump() - - class TriggerCreationMethod(StrEnum): OAUTH = "OAUTH" APIKEY = "APIKEY" @@ -292,7 +276,7 @@ class TriggerCreationMethod(StrEnum): # Export all entities -__all__ = [ +__all__: list[str] = [ "EventDescription", "EventEntity", "EventIdentity", @@ -304,7 +288,6 @@ __all__ = [ "SubscriptionBuilder", "TriggerCreationMethod", "TriggerEventData", - "TriggerInputs", "TriggerProviderEntity", "TriggerProviderIdentity", "Unsubscription", diff --git a/api/core/trigger/provider.py b/api/core/trigger/provider.py index 9918b39f11..f45736cb79 100644 --- a/api/core/trigger/provider.py +++ b/api/core/trigger/provider.py @@ -4,7 +4,7 @@ Trigger Provider Controller for managing trigger providers import logging from collections.abc import Mapping -from typing import Any, Optional +from typing import Any from flask import Request @@ -12,7 +12,7 @@ from core.entities.provider_entities import BasicProviderConfig from core.plugin.entities.plugin_daemon import CredentialType from core.plugin.entities.request import ( TriggerDispatchResponse, - TriggerInvokeResponse, + TriggerInvokeEventResponse, ) from core.plugin.impl.trigger import PluginTriggerManager from core.trigger.entities.api_entities import TriggerApiEntity, TriggerProviderApiEntity @@ -125,7 +125,7 @@ class PluginTriggerProviderController: """ return self.entity.events - def get_event(self, event_name: str) -> Optional[EventEntity]: + def get_event(self, event_name: str) -> EventEntity: """ Get a specific event by name @@ -135,7 +135,7 @@ class PluginTriggerProviderController: for event in self.entity.events: if event.identity.name == event_name: return event - return None + raise ValueError(f"Event {event_name} not found in provider {self.provider_id}") def get_subscription_default_properties(self) -> Mapping[str, Any]: """ @@ -270,20 +270,20 @@ class PluginTriggerProviderController: ) return response - def invoke_trigger( + def invoke_trigger_event( self, user_id: str, - trigger_name: str, + event_name: str, parameters: Mapping[str, Any], credentials: Mapping[str, str], credential_type: CredentialType, request: Request, - ) -> TriggerInvokeResponse: + ) -> TriggerInvokeEventResponse: """ Execute a trigger through plugin runtime :param user_id: User ID - :param trigger_name: Trigger name + :param event_name: Event name :param parameters: Trigger parameters :param credentials: Provider credentials :param credential_type: Credential type @@ -291,13 +291,14 @@ class PluginTriggerProviderController: :return: Trigger execution result """ manager = PluginTriggerManager() - provider_id = self.get_provider_id() + provider_id: TriggerProviderID = self.get_provider_id() + event: EventEntity = self.get_event(event_name=event_name) - return manager.invoke_trigger( + return manager.invoke_trigger_event( tenant_id=self.tenant_id, user_id=user_id, provider=str(provider_id), - trigger=trigger_name, + event_name=event.identity.name, credentials=credentials, credential_type=credential_type, request=request, diff --git a/api/core/trigger/trigger_manager.py b/api/core/trigger/trigger_manager.py index 0732421e52..548543ada7 100644 --- a/api/core/trigger/trigger_manager.py +++ b/api/core/trigger/trigger_manager.py @@ -11,7 +11,7 @@ from flask import Request import contexts from core.plugin.entities.plugin_daemon import CredentialType -from core.plugin.entities.request import Event, TriggerInvokeResponse +from core.plugin.entities.request import TriggerInvokeEventResponse from core.plugin.impl.exc import PluginInvokeError from core.plugin.impl.trigger import PluginTriggerManager from core.trigger.entities.entities import ( @@ -128,41 +128,47 @@ class TriggerManager: return provider.get_events() @classmethod - def invoke_trigger( + def invoke_trigger_event( cls, tenant_id: str, user_id: str, provider_id: TriggerProviderID, - trigger_name: str, + event_name: str, parameters: Mapping[str, Any], credentials: Mapping[str, str], credential_type: CredentialType, request: Request, - ) -> TriggerInvokeResponse: + ) -> TriggerInvokeEventResponse: """ Execute a trigger :param tenant_id: Tenant ID :param user_id: User ID :param provider_id: Provider ID - :param trigger_name: Trigger name + :param event_name: Event name :param parameters: Trigger parameters :param credentials: Provider credentials :param credential_type: Credential type :param request: Request :return: Trigger execution result """ - provider = cls.get_trigger_provider(tenant_id, provider_id) - trigger = provider.get_event(trigger_name) - if not trigger: - raise ValueError(f"Trigger {trigger_name} not found in provider {provider_id}") + provider: PluginTriggerProviderController = cls.get_trigger_provider( + tenant_id=tenant_id, provider_id=provider_id + ) try: - return provider.invoke_trigger(user_id, trigger_name, parameters, credentials, credential_type, request) + return provider.invoke_trigger_event( + user_id=user_id, + event_name=event_name, + parameters=parameters, + credentials=credentials, + credential_type=credential_type, + request=request, + ) except PluginInvokeError as e: if e.get_error_type() == "TriggerIgnoreEventError": - return TriggerInvokeResponse(event=Event(variables={}), cancelled=True) + return TriggerInvokeEventResponse(variables={}, cancelled=True) else: - logger.exception("Failed to invoke trigger") + logger.exception("Failed to invoke trigger event") raise @classmethod @@ -226,7 +232,6 @@ class TriggerManager: :param tenant_id: Tenant ID :param provider_id: Provider ID - :param trigger_name: Trigger name :param subscription: Subscription metadata from subscribe operation :param credentials: Provider credentials :return: Refreshed subscription result diff --git a/api/migrations/versions/2025_09_26_1205-132392a2635f_plugin_trigger.py b/api/migrations/versions/2025_09_26_1205-132392a2635f_plugin_trigger.py index d74f293367..1117444a01 100644 --- a/api/migrations/versions/2025_09_26_1205-132392a2635f_plugin_trigger.py +++ b/api/migrations/versions/2025_09_26_1205-132392a2635f_plugin_trigger.py @@ -1,8 +1,8 @@ """plugin_trigger Revision ID: 132392a2635f -Revises: 9ee7d347f4c1 -Create Date: 2025-09-03 15:00:57.326868 +Revises: c19938f630b6 +Create Date: 2025-09-26 12:05:00.000000 """ from alembic import op @@ -64,23 +64,22 @@ def upgrade(): batch_op.create_index('idx_trigger_providers_tenant_endpoint', ['tenant_id', 'endpoint_id'], unique=False) batch_op.create_index('idx_trigger_providers_tenant_provider', ['tenant_id', 'provider_id'], unique=False) + # Create workflow_plugin_triggers table with final schema (merged from all 4 migrations) op.create_table('workflow_plugin_triggers', sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), sa.Column('app_id', models.types.StringUUID(), nullable=False), sa.Column('node_id', sa.String(length=64), nullable=False), sa.Column('tenant_id', models.types.StringUUID(), nullable=False), - sa.Column('provider_id', sa.String(length=255), nullable=False), - sa.Column('trigger_id', sa.String(length=510), nullable=False), - sa.Column('triggered_by', sa.String(length=16), nullable=False), + sa.Column('provider_id', sa.String(length=512), nullable=False), + sa.Column('subscription_id', sa.String(length=255), nullable=False), + sa.Column('event_name', sa.String(length=255), nullable=False), sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), sa.PrimaryKeyConstraint('id', name='workflow_plugin_trigger_pkey'), - sa.UniqueConstraint('app_id', 'node_id', 'triggered_by', name='uniq_plugin_node'), - sa.UniqueConstraint('trigger_id', 'node_id', name='uniq_trigger_node') + sa.UniqueConstraint('app_id', 'node_id', name='uniq_app_node_subscription') ) with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op: - batch_op.create_index('workflow_plugin_trigger_tenant_idx', ['tenant_id'], unique=False) - batch_op.create_index('workflow_plugin_trigger_trigger_idx', ['trigger_id'], unique=False) + batch_op.create_index('workflow_plugin_trigger_tenant_subscription_idx', ['tenant_id', 'subscription_id', 'event_name'], unique=False) # ### end Alembic commands ### @@ -88,8 +87,7 @@ def upgrade(): def downgrade(): # ### commands auto generated by Alembic - please adjust! ### with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op: - batch_op.drop_index('workflow_plugin_trigger_trigger_idx') - batch_op.drop_index('workflow_plugin_trigger_tenant_idx') + batch_op.drop_index('workflow_plugin_trigger_tenant_subscription_idx') op.drop_table('workflow_plugin_triggers') with op.batch_alter_table('trigger_subscriptions', schema=None) as batch_op: diff --git a/api/migrations/versions/2025_09_26_1206-86f068bf56fb_plugin_trigger_workflow.py b/api/migrations/versions/2025_09_26_1206-86f068bf56fb_plugin_trigger_workflow.py deleted file mode 100644 index 58f6ef07ed..0000000000 --- a/api/migrations/versions/2025_09_26_1206-86f068bf56fb_plugin_trigger_workflow.py +++ /dev/null @@ -1,62 +0,0 @@ -"""plugin_trigger_workflow - -Revision ID: 86f068bf56fb -Revises: 132392a2635f -Create Date: 2025-09-04 12:12:44.661875 - -""" -from alembic import op -import models as models -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '86f068bf56fb' -down_revision = '132392a2635f' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op: - batch_op.add_column(sa.Column('subscription_id', sa.String(length=255), nullable=False)) - batch_op.alter_column('provider_id', - existing_type=sa.VARCHAR(length=255), - type_=sa.String(length=512), - existing_nullable=False) - batch_op.alter_column('trigger_id', - existing_type=sa.VARCHAR(length=510), - type_=sa.String(length=255), - existing_nullable=False) - batch_op.drop_constraint(batch_op.f('uniq_plugin_node'), type_='unique') - batch_op.drop_constraint(batch_op.f('uniq_trigger_node'), type_='unique') - batch_op.drop_index(batch_op.f('workflow_plugin_trigger_tenant_idx')) - batch_op.drop_index(batch_op.f('workflow_plugin_trigger_trigger_idx')) - batch_op.create_unique_constraint('uniq_app_node_subscription', ['app_id', 'node_id']) - batch_op.create_index('workflow_plugin_trigger_tenant_subscription_idx', ['tenant_id', 'subscription_id'], unique=False) - batch_op.drop_column('triggered_by') - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op: - batch_op.add_column(sa.Column('triggered_by', sa.VARCHAR(length=16), autoincrement=False, nullable=False)) - batch_op.drop_index('workflow_plugin_trigger_tenant_subscription_idx') - batch_op.drop_constraint('uniq_app_node_subscription', type_='unique') - batch_op.create_index(batch_op.f('workflow_plugin_trigger_trigger_idx'), ['trigger_id'], unique=False) - batch_op.create_index(batch_op.f('workflow_plugin_trigger_tenant_idx'), ['tenant_id'], unique=False) - batch_op.create_unique_constraint(batch_op.f('uniq_trigger_node'), ['trigger_id', 'node_id'], postgresql_nulls_not_distinct=False) - batch_op.create_unique_constraint(batch_op.f('uniq_plugin_node'), ['app_id', 'node_id', 'triggered_by'], postgresql_nulls_not_distinct=False) - batch_op.alter_column('trigger_id', - existing_type=sa.String(length=255), - type_=sa.VARCHAR(length=510), - existing_nullable=False) - batch_op.alter_column('provider_id', - existing_type=sa.String(length=512), - type_=sa.VARCHAR(length=255), - existing_nullable=False) - batch_op.drop_column('subscription_id') - # ### end Alembic commands ### diff --git a/api/migrations/versions/2025_09_26_1207-875c659da2f8_plugin_trigger_idx.py b/api/migrations/versions/2025_09_26_1207-875c659da2f8_plugin_trigger_idx.py deleted file mode 100644 index ce7985097f..0000000000 --- a/api/migrations/versions/2025_09_26_1207-875c659da2f8_plugin_trigger_idx.py +++ /dev/null @@ -1,37 +0,0 @@ -"""plugin_trigger_idx - -Revision ID: 875c659da2f8 -Revises: 86f068bf56fb -Create Date: 2025-09-05 15:51:08.635283 - -""" -from alembic import op -import models as models -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '875c659da2f8' -down_revision = '86f068bf56fb' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op: - batch_op.add_column(sa.Column('trigger_name', sa.String(length=255), nullable=False)) - batch_op.drop_index(batch_op.f('workflow_plugin_trigger_tenant_subscription_idx')) - batch_op.create_index('workflow_plugin_trigger_tenant_subscription_idx', ['tenant_id', 'subscription_id', 'trigger_name'], unique=False) - batch_op.drop_column('trigger_id') - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op: - batch_op.add_column(sa.Column('trigger_id', sa.VARCHAR(length=255), autoincrement=False, nullable=False)) - batch_op.drop_index('workflow_plugin_trigger_tenant_subscription_idx') - batch_op.create_index(batch_op.f('workflow_plugin_trigger_tenant_subscription_idx'), ['tenant_id', 'subscription_id'], unique=False) - batch_op.drop_column('trigger_name') - # ### end Alembic commands ### diff --git a/api/models/workflow.py b/api/models/workflow.py index a3c7aefa57..1a0866a343 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -1804,7 +1804,7 @@ class WorkflowPluginTrigger(Base): - node_id (varchar) Node ID which node in the workflow - tenant_id (uuid) Workspace ID - provider_id (varchar) Plugin provider ID - - trigger_name (varchar) trigger name + - event_name (varchar) trigger name - subscription_id (varchar) Subscription ID - created_at (timestamp) Creation time - updated_at (timestamp) Last update time @@ -1813,7 +1813,7 @@ class WorkflowPluginTrigger(Base): __tablename__ = "workflow_plugin_triggers" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"), - sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id", "trigger_name"), + sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id", "event_name"), sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node_subscription"), ) @@ -1822,7 +1822,7 @@ class WorkflowPluginTrigger(Base): node_id: Mapped[str] = mapped_column(String(64), nullable=False) tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) provider_id: Mapped[str] = mapped_column(String(512), nullable=False) - trigger_name: Mapped[str] = mapped_column(String(255), nullable=False) + event_name: Mapped[str] = mapped_column(String(255), nullable=False) subscription_id: Mapped[str] = mapped_column(String(255), nullable=False) created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) updated_at: Mapped[datetime] = mapped_column( diff --git a/api/services/trigger/trigger_service.py b/api/services/trigger/trigger_service.py index 2084aea0cc..c384349515 100644 --- a/api/services/trigger/trigger_service.py +++ b/api/services/trigger/trigger_service.py @@ -87,11 +87,11 @@ class TriggerService: return 0 subscribers: list[WorkflowPluginTrigger] = cls.get_subscriber_triggers( - tenant_id=subscription.tenant_id, subscription_id=subscription.id, trigger_name=event.identity.name + tenant_id=subscription.tenant_id, subscription_id=subscription.id, event_name=event.identity.name ) if not subscribers: logger.warning( - "No workflows found for trigger '%s' in subscription '%s'", + "No workflows found for trigger event '%s' in subscription '%s'", event.identity.name, subscription.id, ) @@ -112,30 +112,30 @@ class TriggerService: continue # Find the trigger node in the workflow - trigger_node = None + event_node = None for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN): if node_id == plugin_trigger.node_id: - trigger_node = node_config + event_node = node_config break - if not trigger_node: - logger.error("Trigger node not found for app %s", plugin_trigger.app_id) + if not event_node: + logger.error("Trigger event node not found for app %s", plugin_trigger.app_id) continue # invoke triger - invoke_response = TriggerManager.invoke_trigger( + invoke_response = TriggerManager.invoke_trigger_event( tenant_id=subscription.tenant_id, user_id=subscription.user_id, provider_id=TriggerProviderID(subscription.provider_id), - trigger_name=event.identity.name, - parameters=trigger_node.get("config", {}), + event_name=event.identity.name, + parameters=event_node.get("config", {}), credentials=subscription.credentials, credential_type=CredentialType.of(subscription.credential_type), request=request, ) if invoke_response.cancelled: logger.info( - "Trigger ignored for app %s with trigger %s", + "Trigger ignored for app %s with trigger event %s", plugin_trigger.app_id, event.identity.name, ) @@ -150,7 +150,7 @@ class TriggerService: trigger_type=WorkflowRunTriggeredFrom.PLUGIN, plugin_id=subscription.provider_id, endpoint_id=subscription.endpoint_id, - inputs=invoke_response.event.variables, + inputs=invoke_response.variables.variables, ) # Trigger async workflow @@ -158,7 +158,7 @@ class TriggerService: AsyncWorkflowService.trigger_workflow_async(session, tenant_owner, trigger_data) dispatched_count += 1 logger.info( - "Triggered workflow for app %s with trigger %s", + "Triggered workflow for app %s with trigger event %s", plugin_trigger.app_id, event.identity.name, ) @@ -224,7 +224,7 @@ class TriggerService: @classmethod def get_subscriber_triggers( - cls, tenant_id: str, subscription_id: str, trigger_name: str + cls, tenant_id: str, subscription_id: str, event_name: str ) -> list[WorkflowPluginTrigger]: """ Get WorkflowPluginTriggers for a subscription and trigger. @@ -232,14 +232,14 @@ class TriggerService: Args: tenant_id: Tenant ID subscription_id: Subscription ID - trigger_name: Trigger name + event_name: Event name """ with Session(db.engine, expire_on_commit=False) as session: subscribers = session.scalars( select(WorkflowPluginTrigger).where( WorkflowPluginTrigger.tenant_id == tenant_id, WorkflowPluginTrigger.subscription_id == subscription_id, - WorkflowPluginTrigger.trigger_name == trigger_name, + WorkflowPluginTrigger.event_name == event_name, ) ).all() return list(subscribers) diff --git a/api/services/trigger/workflow_plugin_trigger_service.py b/api/services/trigger/workflow_plugin_trigger_service.py index e1b1b0a291..54d26fbef6 100644 --- a/api/services/trigger/workflow_plugin_trigger_service.py +++ b/api/services/trigger/workflow_plugin_trigger_service.py @@ -1,15 +1,12 @@ -from typing import Optional from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.orm import Session -from werkzeug.exceptions import NotFound from core.workflow.enums import NodeType from extensions.ext_database import db from extensions.ext_redis import redis_client from models.model import App -from models.trigger import TriggerSubscription from models.workflow import Workflow, WorkflowPluginTrigger @@ -19,313 +16,6 @@ class WorkflowPluginTriggerService: __PLUGIN_TRIGGER_NODE_CACHE_KEY__ = "plugin_trigger_nodes" MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW = 5 # Maximum allowed plugin trigger nodes per workflow - @classmethod - def create_plugin_trigger( - cls, - app_id: str, - tenant_id: str, - node_id: str, - provider_id: str, - trigger_name: str, - subscription_id: str, - ) -> WorkflowPluginTrigger: - """Create a new plugin trigger - - Args: - app_id: The app ID - tenant_id: The tenant ID - node_id: The node ID in the workflow - provider_id: The plugin provider ID - trigger_name: The trigger name - subscription_id: The subscription ID - - Returns: - The created WorkflowPluginTrigger instance - - Raises: - BadRequest: If plugin trigger already exists for this app and node - """ - with Session(db.engine) as session: - # Check if plugin trigger already exists for this app and node - # Based on unique constraint: uniq_app_node - existing_trigger = session.scalar( - select(WorkflowPluginTrigger).where( - WorkflowPluginTrigger.app_id == app_id, - WorkflowPluginTrigger.node_id == node_id, - ) - ) - - if existing_trigger: - raise ValueError("Plugin trigger already exists for this app and node") - - # Check if subscription exists - subscription = session.scalar( - select(TriggerSubscription).where( - TriggerSubscription.id == subscription_id, - ) - ) - - if not subscription: - raise NotFound("Subscription not found") - - # Create new plugin trigger - plugin_trigger = WorkflowPluginTrigger( - app_id=app_id, - node_id=node_id, - tenant_id=tenant_id, - provider_id=provider_id, - trigger_name=trigger_name, - subscription_id=subscription_id, - ) - - session.add(plugin_trigger) - session.commit() - session.refresh(plugin_trigger) - - return plugin_trigger - - @classmethod - def get_plugin_trigger( - cls, - app_id: str, - node_id: str, - ) -> WorkflowPluginTrigger: - """Get a plugin trigger by app_id and node_id - - Args: - app_id: The app ID - node_id: The node ID in the workflow - - Returns: - The WorkflowPluginTrigger instance - - Raises: - NotFound: If plugin trigger not found - """ - with Session(db.engine) as session: - # Find plugin trigger using unique constraint - plugin_trigger = session.scalar( - select(WorkflowPluginTrigger).where( - WorkflowPluginTrigger.app_id == app_id, - WorkflowPluginTrigger.node_id == node_id, - ) - ) - - if not plugin_trigger: - raise NotFound("Plugin trigger not found") - - return plugin_trigger - - @classmethod - def get_plugin_trigger_by_subscription( - cls, - tenant_id: str, - subscription_id: str, - ) -> WorkflowPluginTrigger: - """Get a plugin trigger by tenant_id and subscription_id - This is the primary query pattern, optimized with composite index - - Args: - tenant_id: The tenant ID - subscription_id: The subscription ID - - Returns: - The WorkflowPluginTrigger instance - - Raises: - NotFound: If plugin trigger not found - """ - with Session(db.engine) as session: - # Find plugin trigger using indexed columns - plugin_trigger = session.scalar( - select(WorkflowPluginTrigger).where( - WorkflowPluginTrigger.tenant_id == tenant_id, - WorkflowPluginTrigger.subscription_id == subscription_id, - ) - ) - - if not plugin_trigger: - raise NotFound("Plugin trigger not found") - - return plugin_trigger - - @classmethod - def list_plugin_triggers_by_tenant( - cls, - tenant_id: str, - ) -> list[WorkflowPluginTrigger]: - """List all plugin triggers for a tenant - - Args: - tenant_id: The tenant ID - - Returns: - List of WorkflowPluginTrigger instances - """ - with Session(db.engine) as session: - plugin_triggers = session.scalars( - select(WorkflowPluginTrigger) - .where(WorkflowPluginTrigger.tenant_id == tenant_id) - .order_by(WorkflowPluginTrigger.created_at.desc()) - ).all() - - return list(plugin_triggers) - - @classmethod - def list_plugin_triggers_by_subscription( - cls, - subscription_id: str, - ) -> list[WorkflowPluginTrigger]: - """List all plugin triggers for a subscription - - Args: - subscription_id: The subscription ID - - Returns: - List of WorkflowPluginTrigger instances - """ - with Session(db.engine) as session: - plugin_triggers = session.scalars( - select(WorkflowPluginTrigger) - .where(WorkflowPluginTrigger.subscription_id == subscription_id) - .order_by(WorkflowPluginTrigger.created_at.desc()) - ).all() - - return list(plugin_triggers) - - @classmethod - def update_plugin_trigger( - cls, - app_id: str, - node_id: str, - subscription_id: str, - ) -> WorkflowPluginTrigger: - """Update a plugin trigger - - Args: - app_id: The app ID - node_id: The node ID in the workflow - subscription_id: The new subscription ID (optional) - - Returns: - The updated WorkflowPluginTrigger instance - - Raises: - NotFound: If plugin trigger not found - """ - with Session(db.engine) as session: - # Find plugin trigger using unique constraint - plugin_trigger = session.scalar( - select(WorkflowPluginTrigger).where( - WorkflowPluginTrigger.app_id == app_id, - WorkflowPluginTrigger.node_id == node_id, - ) - ) - - if not plugin_trigger: - raise NotFound("Plugin trigger not found") - - # Check if subscription exists - subscription = session.scalar( - select(TriggerSubscription).where( - TriggerSubscription.id == subscription_id, - ) - ) - - if not subscription: - raise NotFound("Subscription not found") - - # Update subscription ID - plugin_trigger.subscription_id = subscription_id - - session.commit() - session.refresh(plugin_trigger) - - return plugin_trigger - - @classmethod - def update_plugin_trigger_by_subscription( - cls, - tenant_id: str, - subscription_id: str, - provider_id: Optional[str] = None, - trigger_name: Optional[str] = None, - new_subscription_id: Optional[str] = None, - ) -> WorkflowPluginTrigger: - """Update a plugin trigger by tenant_id and subscription_id - - Args: - tenant_id: The tenant ID - subscription_id: The current subscription ID - provider_id: The new provider ID (optional) - trigger_name: The new trigger name (optional) - new_subscription_id: The new subscription ID (optional) - - Returns: - The updated WorkflowPluginTrigger instance - - Raises: - NotFound: If plugin trigger not found - """ - with Session(db.engine) as session: - # Find plugin trigger using indexed columns - plugin_trigger = session.scalar( - select(WorkflowPluginTrigger).where( - WorkflowPluginTrigger.tenant_id == tenant_id, - WorkflowPluginTrigger.subscription_id == subscription_id, - ) - ) - - if not plugin_trigger: - raise NotFound("Plugin trigger not found") - - # Update fields if provided - if provider_id: - plugin_trigger.provider_id = provider_id - - if trigger_name: - # Update trigger_id if provider_id or trigger_name changed - provider_id = provider_id or plugin_trigger.provider_id - plugin_trigger.trigger_name = f"{provider_id}:{trigger_name}" - - if new_subscription_id: - plugin_trigger.subscription_id = new_subscription_id - - session.commit() - session.refresh(plugin_trigger) - - return plugin_trigger - - @classmethod - def delete_plugin_trigger( - cls, - app_id: str, - node_id: str, - ) -> None: - """Delete a plugin trigger by app_id and node_id - - Args: - app_id: The app ID - node_id: The node ID in the workflow - - Raises: - NotFound: If plugin trigger not found - """ - with Session(db.engine) as session: - # Find plugin trigger using unique constraint - plugin_trigger = session.scalar( - select(WorkflowPluginTrigger).where( - WorkflowPluginTrigger.app_id == app_id, - WorkflowPluginTrigger.node_id == node_id, - ) - ) - - if not plugin_trigger: - raise NotFound("Plugin trigger not found") - - session.delete(plugin_trigger) - session.commit() - @classmethod def delete_plugin_trigger_by_subscription( cls, @@ -356,37 +46,6 @@ class WorkflowPluginTriggerService: session.delete(plugin_trigger) - @classmethod - def delete_all_by_subscription( - cls, - subscription_id: str, - ) -> int: - """Delete all plugin triggers for a subscription - Useful when a subscription is cancelled - - Args: - subscription_id: The subscription ID - - Returns: - Number of triggers deleted - """ - with Session(db.engine) as session: - # Find all plugin triggers for this subscription - plugin_triggers = session.scalars( - select(WorkflowPluginTrigger).where( - WorkflowPluginTrigger.subscription_id == subscription_id, - ) - ).all() - - count = len(plugin_triggers) - - for trigger in plugin_triggers: - session.delete(trigger) - - session.commit() - - return count - @classmethod def sync_plugin_trigger_relationships(cls, app: App, workflow: Workflow): """ @@ -412,7 +71,7 @@ class WorkflowPluginTriggerService: record_id: str node_id: str provider_id: str - trigger_name: str + event_name: str subscription_id: str # Walk nodes to find plugin triggers @@ -421,7 +80,7 @@ class WorkflowPluginTriggerService: # Extract plugin trigger configuration from node plugin_id = node_config.get("plugin_id", "") provider_id = node_config.get("provider_id", "") - trigger_name = node_config.get("trigger_name", "") + event_name = node_config.get("event_name", "") subscription_id = node_config.get("subscription_id", "") if not subscription_id: @@ -432,7 +91,7 @@ class WorkflowPluginTriggerService: "node_id": node_id, "plugin_id": plugin_id, "provider_id": provider_id, - "trigger_name": trigger_name, + "event_name": event_name, "subscription_id": subscription_id, } ) @@ -480,7 +139,7 @@ class WorkflowPluginTriggerService: tenant_id=app.tenant_id, node_id=node_info["node_id"], provider_id=node_info["provider_id"], - trigger_name=node_info["trigger_name"], + event_name=node_info["event_name"], subscription_id=node_info["subscription_id"], ) session.add(plugin_trigger) @@ -490,7 +149,7 @@ class WorkflowPluginTriggerService: record_id=plugin_trigger.id, node_id=node_info["node_id"], provider_id=node_info["provider_id"], - trigger_name=node_info["trigger_name"], + event_name=node_info["event_name"], subscription_id=node_info["subscription_id"], ) redis_client.set( @@ -508,11 +167,11 @@ class WorkflowPluginTriggerService: if ( existing_record.subscription_id != node_info["subscription_id"] or existing_record.provider_id != node_info["provider_id"] - or existing_record.trigger_name != node_info["trigger_name"] + or existing_record.event_name != node_info["event_name"] ): existing_record.subscription_id = node_info["subscription_id"] existing_record.provider_id = node_info["provider_id"] - existing_record.trigger_name = node_info["trigger_name"] + existing_record.event_name = node_info["event_name"] session.add(existing_record) # Update cache @@ -520,7 +179,7 @@ class WorkflowPluginTriggerService: record_id=existing_record.id, node_id=node_id, provider_id=node_info["provider_id"], - trigger_name=node_info["trigger_name"], + event_name=node_info["event_name"], subscription_id=node_info["subscription_id"], ) redis_client.set(