refactor(trigger): rename trigger-related fields and methods for consistency

- Updated the naming convention from 'trigger_name' to 'event_name' across various models and services to align with the new event-driven architecture.
- Refactored methods in PluginTriggerManager and PluginTriggerProviderController to use 'invoke_trigger_event' instead of 'invoke_trigger'.
- Adjusted database migration scripts to reflect changes in the schema, including the addition of 'event_name' and 'subscription_id' fields in the workflow_plugin_triggers table.
- Removed deprecated trigger-related methods in WorkflowPluginTriggerService to streamline the codebase.
This commit is contained in:
Harry
2025-10-11 12:19:57 +08:00
parent 622d12137a
commit d53399e546
12 changed files with 149 additions and 608 deletions

View File

@@ -1135,8 +1135,7 @@ class DraftWorkflowTriggerRunApi(Resource):
if not subscription_id:
return jsonable_encoder({"status": "error", "message": "Subscription ID not found"}), 404
# TODO Frontend data management is completely messy here
provider_id = TriggerProviderID(node_data.get("provider_name"))
provider_id = TriggerProviderID(node_data.get("provider_id"))
pool_key: str = PluginTriggerDebugEvent.build_pool_key(
tenant_id=app_model.tenant_id,
provider_id=provider_id,

View File

@@ -241,12 +241,8 @@ class RequestFetchAppInfo(BaseModel):
app_id: str
class Event(BaseModel):
variables: Mapping[str, Any]
class TriggerInvokeResponse(BaseModel):
event: Event
class TriggerInvokeEventResponse(BaseModel):
variables: Mapping[str, Any] = Field(default_factory=dict)
cancelled: Optional[bool] = False

View File

@@ -1,5 +1,5 @@
import binascii
from collections.abc import Mapping
from collections.abc import Generator, Mapping
from typing import Any
from flask import Request
@@ -8,17 +8,18 @@ from core.plugin.entities.plugin_daemon import CredentialType, PluginTriggerProv
from core.plugin.entities.request import (
PluginTriggerDispatchResponse,
TriggerDispatchResponse,
TriggerInvokeResponse,
TriggerInvokeEventResponse,
TriggerSubscriptionResponse,
TriggerValidateProviderCredentialsResponse,
)
from core.plugin.impl.base import BasePluginClient
from core.plugin.utils.http_parser import deserialize_response, serialize_request
from core.trigger.entities.entities import Subscription
from models.provider_ids import GenericProviderID, TriggerProviderID
from models.provider_ids import TriggerProviderID
class PluginTriggerManager(BasePluginClient):
def fetch_trigger_providers(self, tenant_id: str) -> list[PluginTriggerProviderEntity]:
"""
Fetch trigger providers for the given tenant.
@@ -33,10 +34,10 @@ class PluginTriggerManager(BasePluginClient):
return json_response
response = self._request_with_plugin_daemon_response(
"GET",
f"plugin/{tenant_id}/management/triggers",
list[PluginTriggerProviderEntity],
response: list[PluginTriggerProviderEntity] = self._request_with_plugin_daemon_response(
method="GET",
path=f"plugin/{tenant_id}/management/triggers",
type=list[PluginTriggerProviderEntity],
params={"page": 1, "page_size": 256},
transformer=transformer,
)
@@ -63,10 +64,10 @@ class PluginTriggerManager(BasePluginClient):
return json_response
response = self._request_with_plugin_daemon_response(
"GET",
f"plugin/{tenant_id}/management/trigger",
PluginTriggerProviderEntity,
response: PluginTriggerProviderEntity = self._request_with_plugin_daemon_response(
method="GET",
path=f"plugin/{tenant_id}/management/trigger",
type=PluginTriggerProviderEntity,
params={"provider": provider_id.provider_name, "plugin_id": provider_id.plugin_id},
transformer=transformer,
)
@@ -79,31 +80,30 @@ class PluginTriggerManager(BasePluginClient):
return response
def invoke_trigger(
def invoke_trigger_event(
self,
tenant_id: str,
user_id: str,
provider: str,
trigger: str,
event_name: str,
credentials: Mapping[str, str],
credential_type: CredentialType,
request: Request,
parameters: Mapping[str, Any],
) -> TriggerInvokeResponse:
) -> TriggerInvokeEventResponse:
"""
Invoke a trigger with the given parameters.
"""
trigger_provider_id = GenericProviderID(provider)
response = self._request_with_plugin_daemon_response_stream(
"POST",
f"plugin/{tenant_id}/dispatch/trigger/invoke",
TriggerInvokeResponse,
provider_id = TriggerProviderID(provider)
response: Generator[TriggerInvokeEventResponse, None, None] = self._request_with_plugin_daemon_response_stream(
method="POST",
path=f"plugin/{tenant_id}/dispatch/trigger/invoke_event",
type=TriggerInvokeEventResponse,
data={
"user_id": user_id,
"data": {
"provider": trigger_provider_id.provider_name,
"trigger": trigger,
"provider": provider_id.provider_name,
"event": event_name,
"credentials": credentials,
"credential_type": credential_type,
"raw_http_request": binascii.hexlify(serialize_request(request)).decode(),
@@ -111,13 +111,13 @@ class PluginTriggerManager(BasePluginClient):
},
},
headers={
"X-Plugin-ID": trigger_provider_id.plugin_id,
"X-Plugin-ID": provider_id.plugin_id,
"Content-Type": "application/json",
},
)
for resp in response:
return TriggerInvokeResponse(event=resp.event)
return resp
raise ValueError("No response received from plugin daemon for invoke trigger")
@@ -127,23 +127,24 @@ class PluginTriggerManager(BasePluginClient):
"""
Validate the credentials of the trigger provider.
"""
trigger_provider_id = GenericProviderID(provider)
response = self._request_with_plugin_daemon_response_stream(
"POST",
f"plugin/{tenant_id}/dispatch/trigger/validate_credentials",
TriggerValidateProviderCredentialsResponse,
data={
"user_id": user_id,
"data": {
"provider": trigger_provider_id.provider_name,
"credentials": credentials,
provider_id = TriggerProviderID(provider)
response: Generator[TriggerValidateProviderCredentialsResponse, None, None] = (
self._request_with_plugin_daemon_response_stream(
method="POST",
path=f"plugin/{tenant_id}/dispatch/trigger/validate_credentials",
type=TriggerValidateProviderCredentialsResponse,
data={
"user_id": user_id,
"data": {
"provider": provider_id.provider_name,
"credentials": credentials,
},
},
},
headers={
"X-Plugin-ID": trigger_provider_id.plugin_id,
"Content-Type": "application/json",
},
headers={
"X-Plugin-ID": provider_id.plugin_id,
"Content-Type": "application/json",
},
)
)
for resp in response:
@@ -162,24 +163,25 @@ class PluginTriggerManager(BasePluginClient):
"""
Dispatch an event to triggers.
"""
trigger_provider_id = GenericProviderID(provider)
response = self._request_with_plugin_daemon_response_stream(
"POST",
f"plugin/{tenant_id}/dispatch/trigger/dispatch_event",
PluginTriggerDispatchResponse,
data={
"user_id": user_id,
"data": {
"provider": trigger_provider_id.provider_name,
"subscription": subscription,
"raw_http_request": binascii.hexlify(serialize_request(request)).decode(),
provider_id = TriggerProviderID(provider)
response: Generator[PluginTriggerDispatchResponse, None, None] = (
self._request_with_plugin_daemon_response_stream(
method="POST",
path=f"plugin/{tenant_id}/dispatch/trigger/dispatch_event",
type=PluginTriggerDispatchResponse,
data={
"user_id": user_id,
"data": {
"provider": provider_id.provider_name,
"subscription": subscription,
"raw_http_request": binascii.hexlify(serialize_request(request)).decode(),
},
},
},
headers={
"X-Plugin-ID": trigger_provider_id.plugin_id,
"Content-Type": "application/json",
},
headers={
"X-Plugin-ID": provider_id.plugin_id,
"Content-Type": "application/json",
},
)
)
for resp in response:
@@ -202,23 +204,22 @@ class PluginTriggerManager(BasePluginClient):
"""
Subscribe to a trigger.
"""
trigger_provider_id = GenericProviderID(provider)
response = self._request_with_plugin_daemon_response_stream(
"POST",
f"plugin/{tenant_id}/dispatch/trigger/subscribe",
TriggerSubscriptionResponse,
provider_id = TriggerProviderID(provider)
response: Generator[TriggerSubscriptionResponse, None, None] = self._request_with_plugin_daemon_response_stream(
method="POST",
path=f"plugin/{tenant_id}/dispatch/trigger/subscribe",
type=TriggerSubscriptionResponse,
data={
"user_id": user_id,
"data": {
"provider": trigger_provider_id.provider_name,
"provider": provider_id.provider_name,
"credentials": credentials,
"endpoint": endpoint,
"parameters": parameters,
},
},
headers={
"X-Plugin-ID": trigger_provider_id.plugin_id,
"X-Plugin-ID": provider_id.plugin_id,
"Content-Type": "application/json",
},
)
@@ -239,22 +240,21 @@ class PluginTriggerManager(BasePluginClient):
"""
Unsubscribe from a trigger.
"""
trigger_provider_id = GenericProviderID(provider)
response = self._request_with_plugin_daemon_response_stream(
"POST",
f"plugin/{tenant_id}/dispatch/trigger/unsubscribe",
TriggerSubscriptionResponse,
provider_id = TriggerProviderID(provider)
response: Generator[TriggerSubscriptionResponse, None, None] = self._request_with_plugin_daemon_response_stream(
method="POST",
path=f"plugin/{tenant_id}/dispatch/trigger/unsubscribe",
type=TriggerSubscriptionResponse,
data={
"user_id": user_id,
"data": {
"provider": trigger_provider_id.provider_name,
"provider": provider_id.provider_name,
"subscription": subscription.model_dump(),
"credentials": credentials,
},
},
headers={
"X-Plugin-ID": trigger_provider_id.plugin_id,
"X-Plugin-ID": provider_id.plugin_id,
"Content-Type": "application/json",
},
)
@@ -275,22 +275,21 @@ class PluginTriggerManager(BasePluginClient):
"""
Refresh a trigger subscription.
"""
trigger_provider_id = GenericProviderID(provider)
response = self._request_with_plugin_daemon_response_stream(
"POST",
f"plugin/{tenant_id}/dispatch/trigger/refresh",
TriggerSubscriptionResponse,
provider_id = TriggerProviderID(provider)
response: Generator[TriggerSubscriptionResponse, None, None] = self._request_with_plugin_daemon_response_stream(
method="POST",
path=f"plugin/{tenant_id}/dispatch/trigger/refresh",
type=TriggerSubscriptionResponse,
data={
"user_id": user_id,
"data": {
"provider": trigger_provider_id.provider_name,
"provider": provider_id.provider_name,
"subscription": subscription.model_dump(),
"credentials": credentials,
},
},
headers={
"X-Plugin-ID": trigger_provider_id.plugin_id,
"X-Plugin-ID": provider_id.plugin_id,
"Content-Type": "application/json",
},
)

View File

@@ -269,22 +269,6 @@ class TriggerEventData(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
class TriggerInputs(BaseModel):
"""Standard inputs for trigger nodes."""
request_id: str
trigger_name: str
subscription_id: str
def to_workflow_args(self) -> dict[str, Any]:
"""Convert to workflow arguments format."""
return {"inputs": self.model_dump(), "files": []}
def to_dict(self) -> dict[str, Any]:
"""Convert to dict (alias for model_dump)."""
return self.model_dump()
class TriggerCreationMethod(StrEnum):
OAUTH = "OAUTH"
APIKEY = "APIKEY"
@@ -292,7 +276,7 @@ class TriggerCreationMethod(StrEnum):
# Export all entities
__all__ = [
__all__: list[str] = [
"EventDescription",
"EventEntity",
"EventIdentity",
@@ -304,7 +288,6 @@ __all__ = [
"SubscriptionBuilder",
"TriggerCreationMethod",
"TriggerEventData",
"TriggerInputs",
"TriggerProviderEntity",
"TriggerProviderIdentity",
"Unsubscription",

View File

@@ -4,7 +4,7 @@ Trigger Provider Controller for managing trigger providers
import logging
from collections.abc import Mapping
from typing import Any, Optional
from typing import Any
from flask import Request
@@ -12,7 +12,7 @@ from core.entities.provider_entities import BasicProviderConfig
from core.plugin.entities.plugin_daemon import CredentialType
from core.plugin.entities.request import (
TriggerDispatchResponse,
TriggerInvokeResponse,
TriggerInvokeEventResponse,
)
from core.plugin.impl.trigger import PluginTriggerManager
from core.trigger.entities.api_entities import TriggerApiEntity, TriggerProviderApiEntity
@@ -125,7 +125,7 @@ class PluginTriggerProviderController:
"""
return self.entity.events
def get_event(self, event_name: str) -> Optional[EventEntity]:
def get_event(self, event_name: str) -> EventEntity:
"""
Get a specific event by name
@@ -135,7 +135,7 @@ class PluginTriggerProviderController:
for event in self.entity.events:
if event.identity.name == event_name:
return event
return None
raise ValueError(f"Event {event_name} not found in provider {self.provider_id}")
def get_subscription_default_properties(self) -> Mapping[str, Any]:
"""
@@ -270,20 +270,20 @@ class PluginTriggerProviderController:
)
return response
def invoke_trigger(
def invoke_trigger_event(
self,
user_id: str,
trigger_name: str,
event_name: str,
parameters: Mapping[str, Any],
credentials: Mapping[str, str],
credential_type: CredentialType,
request: Request,
) -> TriggerInvokeResponse:
) -> TriggerInvokeEventResponse:
"""
Execute a trigger through plugin runtime
:param user_id: User ID
:param trigger_name: Trigger name
:param event_name: Event name
:param parameters: Trigger parameters
:param credentials: Provider credentials
:param credential_type: Credential type
@@ -291,13 +291,14 @@ class PluginTriggerProviderController:
:return: Trigger execution result
"""
manager = PluginTriggerManager()
provider_id = self.get_provider_id()
provider_id: TriggerProviderID = self.get_provider_id()
event: EventEntity = self.get_event(event_name=event_name)
return manager.invoke_trigger(
return manager.invoke_trigger_event(
tenant_id=self.tenant_id,
user_id=user_id,
provider=str(provider_id),
trigger=trigger_name,
event_name=event.identity.name,
credentials=credentials,
credential_type=credential_type,
request=request,

View File

@@ -11,7 +11,7 @@ from flask import Request
import contexts
from core.plugin.entities.plugin_daemon import CredentialType
from core.plugin.entities.request import Event, TriggerInvokeResponse
from core.plugin.entities.request import TriggerInvokeEventResponse
from core.plugin.impl.exc import PluginInvokeError
from core.plugin.impl.trigger import PluginTriggerManager
from core.trigger.entities.entities import (
@@ -128,41 +128,47 @@ class TriggerManager:
return provider.get_events()
@classmethod
def invoke_trigger(
def invoke_trigger_event(
cls,
tenant_id: str,
user_id: str,
provider_id: TriggerProviderID,
trigger_name: str,
event_name: str,
parameters: Mapping[str, Any],
credentials: Mapping[str, str],
credential_type: CredentialType,
request: Request,
) -> TriggerInvokeResponse:
) -> TriggerInvokeEventResponse:
"""
Execute a trigger
:param tenant_id: Tenant ID
:param user_id: User ID
:param provider_id: Provider ID
:param trigger_name: Trigger name
:param event_name: Event name
:param parameters: Trigger parameters
:param credentials: Provider credentials
:param credential_type: Credential type
:param request: Request
:return: Trigger execution result
"""
provider = cls.get_trigger_provider(tenant_id, provider_id)
trigger = provider.get_event(trigger_name)
if not trigger:
raise ValueError(f"Trigger {trigger_name} not found in provider {provider_id}")
provider: PluginTriggerProviderController = cls.get_trigger_provider(
tenant_id=tenant_id, provider_id=provider_id
)
try:
return provider.invoke_trigger(user_id, trigger_name, parameters, credentials, credential_type, request)
return provider.invoke_trigger_event(
user_id=user_id,
event_name=event_name,
parameters=parameters,
credentials=credentials,
credential_type=credential_type,
request=request,
)
except PluginInvokeError as e:
if e.get_error_type() == "TriggerIgnoreEventError":
return TriggerInvokeResponse(event=Event(variables={}), cancelled=True)
return TriggerInvokeEventResponse(variables={}, cancelled=True)
else:
logger.exception("Failed to invoke trigger")
logger.exception("Failed to invoke trigger event")
raise
@classmethod
@@ -226,7 +232,6 @@ class TriggerManager:
:param tenant_id: Tenant ID
:param provider_id: Provider ID
:param trigger_name: Trigger name
:param subscription: Subscription metadata from subscribe operation
:param credentials: Provider credentials
:return: Refreshed subscription result

View File

@@ -1,8 +1,8 @@
"""plugin_trigger
Revision ID: 132392a2635f
Revises: 9ee7d347f4c1
Create Date: 2025-09-03 15:00:57.326868
Revises: c19938f630b6
Create Date: 2025-09-26 12:05:00.000000
"""
from alembic import op
@@ -64,23 +64,22 @@ def upgrade():
batch_op.create_index('idx_trigger_providers_tenant_endpoint', ['tenant_id', 'endpoint_id'], unique=False)
batch_op.create_index('idx_trigger_providers_tenant_provider', ['tenant_id', 'provider_id'], unique=False)
# Create workflow_plugin_triggers table with final schema (merged from all 4 migrations)
op.create_table('workflow_plugin_triggers',
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
sa.Column('app_id', models.types.StringUUID(), nullable=False),
sa.Column('node_id', sa.String(length=64), nullable=False),
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
sa.Column('provider_id', sa.String(length=255), nullable=False),
sa.Column('trigger_id', sa.String(length=510), nullable=False),
sa.Column('triggered_by', sa.String(length=16), nullable=False),
sa.Column('provider_id', sa.String(length=512), nullable=False),
sa.Column('subscription_id', sa.String(length=255), nullable=False),
sa.Column('event_name', sa.String(length=255), nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.PrimaryKeyConstraint('id', name='workflow_plugin_trigger_pkey'),
sa.UniqueConstraint('app_id', 'node_id', 'triggered_by', name='uniq_plugin_node'),
sa.UniqueConstraint('trigger_id', 'node_id', name='uniq_trigger_node')
sa.UniqueConstraint('app_id', 'node_id', name='uniq_app_node_subscription')
)
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
batch_op.create_index('workflow_plugin_trigger_tenant_idx', ['tenant_id'], unique=False)
batch_op.create_index('workflow_plugin_trigger_trigger_idx', ['trigger_id'], unique=False)
batch_op.create_index('workflow_plugin_trigger_tenant_subscription_idx', ['tenant_id', 'subscription_id', 'event_name'], unique=False)
# ### end Alembic commands ###
@@ -88,8 +87,7 @@ def upgrade():
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
batch_op.drop_index('workflow_plugin_trigger_trigger_idx')
batch_op.drop_index('workflow_plugin_trigger_tenant_idx')
batch_op.drop_index('workflow_plugin_trigger_tenant_subscription_idx')
op.drop_table('workflow_plugin_triggers')
with op.batch_alter_table('trigger_subscriptions', schema=None) as batch_op:

View File

@@ -1,62 +0,0 @@
"""plugin_trigger_workflow
Revision ID: 86f068bf56fb
Revises: 132392a2635f
Create Date: 2025-09-04 12:12:44.661875
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '86f068bf56fb'
down_revision = '132392a2635f'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
batch_op.add_column(sa.Column('subscription_id', sa.String(length=255), nullable=False))
batch_op.alter_column('provider_id',
existing_type=sa.VARCHAR(length=255),
type_=sa.String(length=512),
existing_nullable=False)
batch_op.alter_column('trigger_id',
existing_type=sa.VARCHAR(length=510),
type_=sa.String(length=255),
existing_nullable=False)
batch_op.drop_constraint(batch_op.f('uniq_plugin_node'), type_='unique')
batch_op.drop_constraint(batch_op.f('uniq_trigger_node'), type_='unique')
batch_op.drop_index(batch_op.f('workflow_plugin_trigger_tenant_idx'))
batch_op.drop_index(batch_op.f('workflow_plugin_trigger_trigger_idx'))
batch_op.create_unique_constraint('uniq_app_node_subscription', ['app_id', 'node_id'])
batch_op.create_index('workflow_plugin_trigger_tenant_subscription_idx', ['tenant_id', 'subscription_id'], unique=False)
batch_op.drop_column('triggered_by')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
batch_op.add_column(sa.Column('triggered_by', sa.VARCHAR(length=16), autoincrement=False, nullable=False))
batch_op.drop_index('workflow_plugin_trigger_tenant_subscription_idx')
batch_op.drop_constraint('uniq_app_node_subscription', type_='unique')
batch_op.create_index(batch_op.f('workflow_plugin_trigger_trigger_idx'), ['trigger_id'], unique=False)
batch_op.create_index(batch_op.f('workflow_plugin_trigger_tenant_idx'), ['tenant_id'], unique=False)
batch_op.create_unique_constraint(batch_op.f('uniq_trigger_node'), ['trigger_id', 'node_id'], postgresql_nulls_not_distinct=False)
batch_op.create_unique_constraint(batch_op.f('uniq_plugin_node'), ['app_id', 'node_id', 'triggered_by'], postgresql_nulls_not_distinct=False)
batch_op.alter_column('trigger_id',
existing_type=sa.String(length=255),
type_=sa.VARCHAR(length=510),
existing_nullable=False)
batch_op.alter_column('provider_id',
existing_type=sa.String(length=512),
type_=sa.VARCHAR(length=255),
existing_nullable=False)
batch_op.drop_column('subscription_id')
# ### end Alembic commands ###

View File

@@ -1,37 +0,0 @@
"""plugin_trigger_idx
Revision ID: 875c659da2f8
Revises: 86f068bf56fb
Create Date: 2025-09-05 15:51:08.635283
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '875c659da2f8'
down_revision = '86f068bf56fb'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
batch_op.add_column(sa.Column('trigger_name', sa.String(length=255), nullable=False))
batch_op.drop_index(batch_op.f('workflow_plugin_trigger_tenant_subscription_idx'))
batch_op.create_index('workflow_plugin_trigger_tenant_subscription_idx', ['tenant_id', 'subscription_id', 'trigger_name'], unique=False)
batch_op.drop_column('trigger_id')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
batch_op.add_column(sa.Column('trigger_id', sa.VARCHAR(length=255), autoincrement=False, nullable=False))
batch_op.drop_index('workflow_plugin_trigger_tenant_subscription_idx')
batch_op.create_index(batch_op.f('workflow_plugin_trigger_tenant_subscription_idx'), ['tenant_id', 'subscription_id'], unique=False)
batch_op.drop_column('trigger_name')
# ### end Alembic commands ###

View File

@@ -1804,7 +1804,7 @@ class WorkflowPluginTrigger(Base):
- node_id (varchar) Node ID which node in the workflow
- tenant_id (uuid) Workspace ID
- provider_id (varchar) Plugin provider ID
- trigger_name (varchar) trigger name
- event_name (varchar) trigger name
- subscription_id (varchar) Subscription ID
- created_at (timestamp) Creation time
- updated_at (timestamp) Last update time
@@ -1813,7 +1813,7 @@ class WorkflowPluginTrigger(Base):
__tablename__ = "workflow_plugin_triggers"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"),
sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id", "trigger_name"),
sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id", "event_name"),
sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node_subscription"),
)
@@ -1822,7 +1822,7 @@ class WorkflowPluginTrigger(Base):
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
provider_id: Mapped[str] = mapped_column(String(512), nullable=False)
trigger_name: Mapped[str] = mapped_column(String(255), nullable=False)
event_name: Mapped[str] = mapped_column(String(255), nullable=False)
subscription_id: Mapped[str] = mapped_column(String(255), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(

View File

@@ -87,11 +87,11 @@ class TriggerService:
return 0
subscribers: list[WorkflowPluginTrigger] = cls.get_subscriber_triggers(
tenant_id=subscription.tenant_id, subscription_id=subscription.id, trigger_name=event.identity.name
tenant_id=subscription.tenant_id, subscription_id=subscription.id, event_name=event.identity.name
)
if not subscribers:
logger.warning(
"No workflows found for trigger '%s' in subscription '%s'",
"No workflows found for trigger event '%s' in subscription '%s'",
event.identity.name,
subscription.id,
)
@@ -112,30 +112,30 @@ class TriggerService:
continue
# Find the trigger node in the workflow
trigger_node = None
event_node = None
for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN):
if node_id == plugin_trigger.node_id:
trigger_node = node_config
event_node = node_config
break
if not trigger_node:
logger.error("Trigger node not found for app %s", plugin_trigger.app_id)
if not event_node:
logger.error("Trigger event node not found for app %s", plugin_trigger.app_id)
continue
# invoke triger
invoke_response = TriggerManager.invoke_trigger(
invoke_response = TriggerManager.invoke_trigger_event(
tenant_id=subscription.tenant_id,
user_id=subscription.user_id,
provider_id=TriggerProviderID(subscription.provider_id),
trigger_name=event.identity.name,
parameters=trigger_node.get("config", {}),
event_name=event.identity.name,
parameters=event_node.get("config", {}),
credentials=subscription.credentials,
credential_type=CredentialType.of(subscription.credential_type),
request=request,
)
if invoke_response.cancelled:
logger.info(
"Trigger ignored for app %s with trigger %s",
"Trigger ignored for app %s with trigger event %s",
plugin_trigger.app_id,
event.identity.name,
)
@@ -150,7 +150,7 @@ class TriggerService:
trigger_type=WorkflowRunTriggeredFrom.PLUGIN,
plugin_id=subscription.provider_id,
endpoint_id=subscription.endpoint_id,
inputs=invoke_response.event.variables,
inputs=invoke_response.variables.variables,
)
# Trigger async workflow
@@ -158,7 +158,7 @@ class TriggerService:
AsyncWorkflowService.trigger_workflow_async(session, tenant_owner, trigger_data)
dispatched_count += 1
logger.info(
"Triggered workflow for app %s with trigger %s",
"Triggered workflow for app %s with trigger event %s",
plugin_trigger.app_id,
event.identity.name,
)
@@ -224,7 +224,7 @@ class TriggerService:
@classmethod
def get_subscriber_triggers(
cls, tenant_id: str, subscription_id: str, trigger_name: str
cls, tenant_id: str, subscription_id: str, event_name: str
) -> list[WorkflowPluginTrigger]:
"""
Get WorkflowPluginTriggers for a subscription and trigger.
@@ -232,14 +232,14 @@ class TriggerService:
Args:
tenant_id: Tenant ID
subscription_id: Subscription ID
trigger_name: Trigger name
event_name: Event name
"""
with Session(db.engine, expire_on_commit=False) as session:
subscribers = session.scalars(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.tenant_id == tenant_id,
WorkflowPluginTrigger.subscription_id == subscription_id,
WorkflowPluginTrigger.trigger_name == trigger_name,
WorkflowPluginTrigger.event_name == event_name,
)
).all()
return list(subscribers)

View File

@@ -1,15 +1,12 @@
from typing import Optional
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.orm import Session
from werkzeug.exceptions import NotFound
from core.workflow.enums import NodeType
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.model import App
from models.trigger import TriggerSubscription
from models.workflow import Workflow, WorkflowPluginTrigger
@@ -19,313 +16,6 @@ class WorkflowPluginTriggerService:
__PLUGIN_TRIGGER_NODE_CACHE_KEY__ = "plugin_trigger_nodes"
MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW = 5 # Maximum allowed plugin trigger nodes per workflow
@classmethod
def create_plugin_trigger(
cls,
app_id: str,
tenant_id: str,
node_id: str,
provider_id: str,
trigger_name: str,
subscription_id: str,
) -> WorkflowPluginTrigger:
"""Create a new plugin trigger
Args:
app_id: The app ID
tenant_id: The tenant ID
node_id: The node ID in the workflow
provider_id: The plugin provider ID
trigger_name: The trigger name
subscription_id: The subscription ID
Returns:
The created WorkflowPluginTrigger instance
Raises:
BadRequest: If plugin trigger already exists for this app and node
"""
with Session(db.engine) as session:
# Check if plugin trigger already exists for this app and node
# Based on unique constraint: uniq_app_node
existing_trigger = session.scalar(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.app_id == app_id,
WorkflowPluginTrigger.node_id == node_id,
)
)
if existing_trigger:
raise ValueError("Plugin trigger already exists for this app and node")
# Check if subscription exists
subscription = session.scalar(
select(TriggerSubscription).where(
TriggerSubscription.id == subscription_id,
)
)
if not subscription:
raise NotFound("Subscription not found")
# Create new plugin trigger
plugin_trigger = WorkflowPluginTrigger(
app_id=app_id,
node_id=node_id,
tenant_id=tenant_id,
provider_id=provider_id,
trigger_name=trigger_name,
subscription_id=subscription_id,
)
session.add(plugin_trigger)
session.commit()
session.refresh(plugin_trigger)
return plugin_trigger
@classmethod
def get_plugin_trigger(
cls,
app_id: str,
node_id: str,
) -> WorkflowPluginTrigger:
"""Get a plugin trigger by app_id and node_id
Args:
app_id: The app ID
node_id: The node ID in the workflow
Returns:
The WorkflowPluginTrigger instance
Raises:
NotFound: If plugin trigger not found
"""
with Session(db.engine) as session:
# Find plugin trigger using unique constraint
plugin_trigger = session.scalar(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.app_id == app_id,
WorkflowPluginTrigger.node_id == node_id,
)
)
if not plugin_trigger:
raise NotFound("Plugin trigger not found")
return plugin_trigger
@classmethod
def get_plugin_trigger_by_subscription(
cls,
tenant_id: str,
subscription_id: str,
) -> WorkflowPluginTrigger:
"""Get a plugin trigger by tenant_id and subscription_id
This is the primary query pattern, optimized with composite index
Args:
tenant_id: The tenant ID
subscription_id: The subscription ID
Returns:
The WorkflowPluginTrigger instance
Raises:
NotFound: If plugin trigger not found
"""
with Session(db.engine) as session:
# Find plugin trigger using indexed columns
plugin_trigger = session.scalar(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.tenant_id == tenant_id,
WorkflowPluginTrigger.subscription_id == subscription_id,
)
)
if not plugin_trigger:
raise NotFound("Plugin trigger not found")
return plugin_trigger
@classmethod
def list_plugin_triggers_by_tenant(
cls,
tenant_id: str,
) -> list[WorkflowPluginTrigger]:
"""List all plugin triggers for a tenant
Args:
tenant_id: The tenant ID
Returns:
List of WorkflowPluginTrigger instances
"""
with Session(db.engine) as session:
plugin_triggers = session.scalars(
select(WorkflowPluginTrigger)
.where(WorkflowPluginTrigger.tenant_id == tenant_id)
.order_by(WorkflowPluginTrigger.created_at.desc())
).all()
return list(plugin_triggers)
@classmethod
def list_plugin_triggers_by_subscription(
cls,
subscription_id: str,
) -> list[WorkflowPluginTrigger]:
"""List all plugin triggers for a subscription
Args:
subscription_id: The subscription ID
Returns:
List of WorkflowPluginTrigger instances
"""
with Session(db.engine) as session:
plugin_triggers = session.scalars(
select(WorkflowPluginTrigger)
.where(WorkflowPluginTrigger.subscription_id == subscription_id)
.order_by(WorkflowPluginTrigger.created_at.desc())
).all()
return list(plugin_triggers)
@classmethod
def update_plugin_trigger(
cls,
app_id: str,
node_id: str,
subscription_id: str,
) -> WorkflowPluginTrigger:
"""Update a plugin trigger
Args:
app_id: The app ID
node_id: The node ID in the workflow
subscription_id: The new subscription ID (optional)
Returns:
The updated WorkflowPluginTrigger instance
Raises:
NotFound: If plugin trigger not found
"""
with Session(db.engine) as session:
# Find plugin trigger using unique constraint
plugin_trigger = session.scalar(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.app_id == app_id,
WorkflowPluginTrigger.node_id == node_id,
)
)
if not plugin_trigger:
raise NotFound("Plugin trigger not found")
# Check if subscription exists
subscription = session.scalar(
select(TriggerSubscription).where(
TriggerSubscription.id == subscription_id,
)
)
if not subscription:
raise NotFound("Subscription not found")
# Update subscription ID
plugin_trigger.subscription_id = subscription_id
session.commit()
session.refresh(plugin_trigger)
return plugin_trigger
@classmethod
def update_plugin_trigger_by_subscription(
cls,
tenant_id: str,
subscription_id: str,
provider_id: Optional[str] = None,
trigger_name: Optional[str] = None,
new_subscription_id: Optional[str] = None,
) -> WorkflowPluginTrigger:
"""Update a plugin trigger by tenant_id and subscription_id
Args:
tenant_id: The tenant ID
subscription_id: The current subscription ID
provider_id: The new provider ID (optional)
trigger_name: The new trigger name (optional)
new_subscription_id: The new subscription ID (optional)
Returns:
The updated WorkflowPluginTrigger instance
Raises:
NotFound: If plugin trigger not found
"""
with Session(db.engine) as session:
# Find plugin trigger using indexed columns
plugin_trigger = session.scalar(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.tenant_id == tenant_id,
WorkflowPluginTrigger.subscription_id == subscription_id,
)
)
if not plugin_trigger:
raise NotFound("Plugin trigger not found")
# Update fields if provided
if provider_id:
plugin_trigger.provider_id = provider_id
if trigger_name:
# Update trigger_id if provider_id or trigger_name changed
provider_id = provider_id or plugin_trigger.provider_id
plugin_trigger.trigger_name = f"{provider_id}:{trigger_name}"
if new_subscription_id:
plugin_trigger.subscription_id = new_subscription_id
session.commit()
session.refresh(plugin_trigger)
return plugin_trigger
@classmethod
def delete_plugin_trigger(
cls,
app_id: str,
node_id: str,
) -> None:
"""Delete a plugin trigger by app_id and node_id
Args:
app_id: The app ID
node_id: The node ID in the workflow
Raises:
NotFound: If plugin trigger not found
"""
with Session(db.engine) as session:
# Find plugin trigger using unique constraint
plugin_trigger = session.scalar(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.app_id == app_id,
WorkflowPluginTrigger.node_id == node_id,
)
)
if not plugin_trigger:
raise NotFound("Plugin trigger not found")
session.delete(plugin_trigger)
session.commit()
@classmethod
def delete_plugin_trigger_by_subscription(
cls,
@@ -356,37 +46,6 @@ class WorkflowPluginTriggerService:
session.delete(plugin_trigger)
@classmethod
def delete_all_by_subscription(
cls,
subscription_id: str,
) -> int:
"""Delete all plugin triggers for a subscription
Useful when a subscription is cancelled
Args:
subscription_id: The subscription ID
Returns:
Number of triggers deleted
"""
with Session(db.engine) as session:
# Find all plugin triggers for this subscription
plugin_triggers = session.scalars(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.subscription_id == subscription_id,
)
).all()
count = len(plugin_triggers)
for trigger in plugin_triggers:
session.delete(trigger)
session.commit()
return count
@classmethod
def sync_plugin_trigger_relationships(cls, app: App, workflow: Workflow):
"""
@@ -412,7 +71,7 @@ class WorkflowPluginTriggerService:
record_id: str
node_id: str
provider_id: str
trigger_name: str
event_name: str
subscription_id: str
# Walk nodes to find plugin triggers
@@ -421,7 +80,7 @@ class WorkflowPluginTriggerService:
# Extract plugin trigger configuration from node
plugin_id = node_config.get("plugin_id", "")
provider_id = node_config.get("provider_id", "")
trigger_name = node_config.get("trigger_name", "")
event_name = node_config.get("event_name", "")
subscription_id = node_config.get("subscription_id", "")
if not subscription_id:
@@ -432,7 +91,7 @@ class WorkflowPluginTriggerService:
"node_id": node_id,
"plugin_id": plugin_id,
"provider_id": provider_id,
"trigger_name": trigger_name,
"event_name": event_name,
"subscription_id": subscription_id,
}
)
@@ -480,7 +139,7 @@ class WorkflowPluginTriggerService:
tenant_id=app.tenant_id,
node_id=node_info["node_id"],
provider_id=node_info["provider_id"],
trigger_name=node_info["trigger_name"],
event_name=node_info["event_name"],
subscription_id=node_info["subscription_id"],
)
session.add(plugin_trigger)
@@ -490,7 +149,7 @@ class WorkflowPluginTriggerService:
record_id=plugin_trigger.id,
node_id=node_info["node_id"],
provider_id=node_info["provider_id"],
trigger_name=node_info["trigger_name"],
event_name=node_info["event_name"],
subscription_id=node_info["subscription_id"],
)
redis_client.set(
@@ -508,11 +167,11 @@ class WorkflowPluginTriggerService:
if (
existing_record.subscription_id != node_info["subscription_id"]
or existing_record.provider_id != node_info["provider_id"]
or existing_record.trigger_name != node_info["trigger_name"]
or existing_record.event_name != node_info["event_name"]
):
existing_record.subscription_id = node_info["subscription_id"]
existing_record.provider_id = node_info["provider_id"]
existing_record.trigger_name = node_info["trigger_name"]
existing_record.event_name = node_info["event_name"]
session.add(existing_record)
# Update cache
@@ -520,7 +179,7 @@ class WorkflowPluginTriggerService:
record_id=existing_record.id,
node_id=node_id,
provider_id=node_info["provider_id"],
trigger_name=node_info["trigger_name"],
event_name=node_info["event_name"],
subscription_id=node_info["subscription_id"],
)
redis_client.set(