refactor(trigger): Reinstate DraftWorkflowTriggerNodeApi with improved structure

- Restored the `DraftWorkflowTriggerNodeApi` class to handle polling for trigger events in draft workflows.
- Enhanced the implementation to utilize `TriggerDebugEvent` and `TriggerDebugEventPoller` for better event management.
- Improved error handling and response structure for node execution, ensuring clarity in API responses.
- Updated API documentation to reflect the restored functionality and parameters.
This commit is contained in:
Harry
2025-10-15 14:45:00 +08:00
parent 8f7bef9509
commit bd31c6f90b

View File

@@ -1003,80 +1003,6 @@ class DraftWorkflowNodeLastRunApi(Resource):
return node_exec
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/trigger/run")
class DraftWorkflowTriggerNodeApi(Resource):
"""
Single node debug - Polling API for trigger events
Path: /apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/trigger/run
"""
@api.doc("poll_draft_workflow_trigger_node")
@api.doc(description="Poll for trigger events and execute single node when event arrives")
@api.doc(params={"app_id": "Application ID", "node_id": "Node ID"})
@api.response(200, "Trigger event received and node executed successfully")
@api.response(403, "Permission denied")
@api.response(500, "Internal server error")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.WORKFLOW])
def post(self, app_model: App, node_id: str):
"""
Poll for trigger events and execute single node when event arrives
"""
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
workflow_service = WorkflowService()
draft_workflow = workflow_service.get_draft_workflow(app_model)
if not draft_workflow:
raise ValueError("Workflow not found")
node_config = draft_workflow.get_node_config_by_id(node_id=node_id)
if not node_config:
raise ValueError("Node data not found for node %s", node_id)
node_type: NodeType = draft_workflow.get_node_type_from_node_config(node_config)
event: TriggerDebugEvent | None = None
# for schedule trigger, when run single node, just execute directly
if node_type == NodeType.TRIGGER_SCHEDULE:
event = TriggerDebugEvent(
workflow_args={},
node_id=node_id,
)
# for other trigger types, poll for the event
else:
poller: TriggerDebugEventPoller = create_event_poller(
draft_workflow=draft_workflow,
tenant_id=app_model.tenant_id,
user_id=current_user.id,
app_id=app_model.id,
node_id=node_id,
)
event = poller.poll()
if not event:
return jsonable_encoder({"status": "waiting", "retry_in": LISTENING_RETRY_IN})
try:
node_execution = workflow_service.run_draft_workflow_node(
app_model=app_model,
draft_workflow=draft_workflow,
node_id=node_id,
user_inputs=event.workflow_args,
account=current_user,
query="",
files=[],
)
return jsonable_encoder(node_execution)
except Exception as e:
logger.exception("Error running draft workflow trigger node")
return jsonable_encoder(
{
"status": "error",
"error": str(e),
}
), 500
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/trigger/run")
class DraftWorkflowTriggerRunApi(Resource):
"""
@@ -1160,6 +1086,80 @@ class DraftWorkflowTriggerRunApi(Resource):
), 500
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/trigger/run")
class DraftWorkflowTriggerNodeApi(Resource):
"""
Single node debug - Polling API for trigger events
Path: /apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/trigger/run
"""
@api.doc("poll_draft_workflow_trigger_node")
@api.doc(description="Poll for trigger events and execute single node when event arrives")
@api.doc(params={"app_id": "Application ID", "node_id": "Node ID"})
@api.response(200, "Trigger event received and node executed successfully")
@api.response(403, "Permission denied")
@api.response(500, "Internal server error")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.WORKFLOW])
def post(self, app_model: App, node_id: str):
"""
Poll for trigger events and execute single node when event arrives
"""
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
workflow_service = WorkflowService()
draft_workflow = workflow_service.get_draft_workflow(app_model)
if not draft_workflow:
raise ValueError("Workflow not found")
node_config = draft_workflow.get_node_config_by_id(node_id=node_id)
if not node_config:
raise ValueError("Node data not found for node %s", node_id)
node_type: NodeType = draft_workflow.get_node_type_from_node_config(node_config)
event: TriggerDebugEvent | None = None
# for schedule trigger, when run single node, just execute directly
if node_type == NodeType.TRIGGER_SCHEDULE:
event = TriggerDebugEvent(
workflow_args={},
node_id=node_id,
)
# for other trigger types, poll for the event
else:
poller: TriggerDebugEventPoller = create_event_poller(
draft_workflow=draft_workflow,
tenant_id=app_model.tenant_id,
user_id=current_user.id,
app_id=app_model.id,
node_id=node_id,
)
event = poller.poll()
if not event:
return jsonable_encoder({"status": "waiting", "retry_in": LISTENING_RETRY_IN})
try:
node_execution = workflow_service.run_draft_workflow_node(
app_model=app_model,
draft_workflow=draft_workflow,
node_id=node_id,
user_inputs=event.workflow_args,
account=current_user,
query="",
files=[],
)
return jsonable_encoder(node_execution)
except Exception as e:
logger.exception("Error running draft workflow trigger node")
return jsonable_encoder(
{
"status": "error",
"error": str(e),
}
), 500
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/trigger/run-all")
class DraftWorkflowTriggerRunAllApi(Resource):
"""