Compare commits

...

5 Commits

Author SHA1 Message Date
Novice Lee
81375088e9 fix: remove unused fields 2024-12-26 14:51:33 +08:00
Novice Lee
cfade297e8 fix: mypy static type checking issues 2024-12-26 09:04:29 +08:00
Novice Lee
8933dd85bf Merge branch 'main' into feat/retry-single-step-debug 2024-12-26 08:56:24 +08:00
Novice Lee
ae5e8d3160 fix: remove unused import 2024-12-25 10:40:02 +08:00
Novice Lee
fc6c0317a5 feat: add single step retry 2024-12-25 10:38:54 +08:00
3 changed files with 175 additions and 40 deletions

View File

@@ -440,6 +440,31 @@ class WorkflowConfigApi(Resource):
} }
class DraftWorkflowNodeRetriableApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@marshal_with(workflow_run_node_execution_fields)
def post(self, app_model: App, node_id: str):
"""
Run draft workflow node
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not current_user.is_editor:
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json")
args = parser.parse_args()
workflow_service = WorkflowService()
workflow_node_execution = workflow_service.run_retriable_draft_workflow_node(
app_model=app_model, node_id=node_id, user_inputs=args.get("inputs", {}), account=current_user
)
return workflow_node_execution
api.add_resource(DraftWorkflowApi, "/apps/<uuid:app_id>/workflows/draft") api.add_resource(DraftWorkflowApi, "/apps/<uuid:app_id>/workflows/draft")
api.add_resource(WorkflowConfigApi, "/apps/<uuid:app_id>/workflows/draft/config") api.add_resource(WorkflowConfigApi, "/apps/<uuid:app_id>/workflows/draft/config")
api.add_resource(AdvancedChatDraftWorkflowRunApi, "/apps/<uuid:app_id>/advanced-chat/workflows/draft/run") api.add_resource(AdvancedChatDraftWorkflowRunApi, "/apps/<uuid:app_id>/advanced-chat/workflows/draft/run")
@@ -459,3 +484,4 @@ api.add_resource(
DefaultBlockConfigApi, "/apps/<uuid:app_id>/workflows/default-workflow-block-configs/<string:block_type>" DefaultBlockConfigApi, "/apps/<uuid:app_id>/workflows/default-workflow-block-configs/<string:block_type>"
) )
api.add_resource(ConvertToWorkflowApi, "/apps/<uuid:app_id>/convert-to-workflow") api.add_resource(ConvertToWorkflowApi, "/apps/<uuid:app_id>/convert-to-workflow")
api.add_resource(DraftWorkflowNodeRetriableApi, "/apps/<uuid:app_id>/workflows/draft/retry/nodes/<string:node_id>/run")

View File

@@ -29,7 +29,6 @@ workflow_run_for_list_fields = {
"created_at": TimestampField, "created_at": TimestampField,
"finished_at": TimestampField, "finished_at": TimestampField,
"exceptions_count": fields.Integer, "exceptions_count": fields.Integer,
"retry_index": fields.Integer,
} }
advanced_chat_workflow_run_for_list_fields = { advanced_chat_workflow_run_for_list_fields = {
@@ -46,7 +45,6 @@ advanced_chat_workflow_run_for_list_fields = {
"created_at": TimestampField, "created_at": TimestampField,
"finished_at": TimestampField, "finished_at": TimestampField,
"exceptions_count": fields.Integer, "exceptions_count": fields.Integer,
"retry_index": fields.Integer,
} }
advanced_chat_workflow_run_pagination_fields = { advanced_chat_workflow_run_pagination_fields = {
@@ -81,19 +79,6 @@ workflow_run_detail_fields = {
"exceptions_count": fields.Integer, "exceptions_count": fields.Integer,
} }
retry_event_field = {
"elapsed_time": fields.Float,
"status": fields.String,
"inputs": fields.Raw(attribute="inputs"),
"process_data": fields.Raw(attribute="process_data"),
"outputs": fields.Raw(attribute="outputs"),
"metadata": fields.Raw(attribute="metadata"),
"llm_usage": fields.Raw(attribute="llm_usage"),
"error": fields.String,
"retry_index": fields.Integer,
}
workflow_run_node_execution_fields = { workflow_run_node_execution_fields = {
"id": fields.String, "id": fields.String,
"index": fields.Integer, "index": fields.Integer,

View File

@@ -2,14 +2,14 @@ import json
import time import time
from collections.abc import Sequence from collections.abc import Sequence
from datetime import UTC, datetime from datetime import UTC, datetime
from typing import Any, Optional, cast from typing import Optional, cast
from uuid import uuid4 from uuid import uuid4
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.model_runtime.utils.encoders import jsonable_encoder from core.model_runtime.utils.encoders import jsonable_encoder
from core.variables import Variable from core.variables import Variable
from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult
from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from core.workflow.nodes.base.entities import BaseNodeData from core.workflow.nodes.base.entities import BaseNodeData
@@ -243,29 +243,7 @@ class WorkflowService:
raise ValueError("Node run failed with no run result") raise ValueError("Node run failed with no run result")
# single step debug mode error handling return # single step debug mode error handling return
if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error: if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
node_error_args: dict[str, Any] = { node_run_result = self._handle_continue_on_error(node_instance, node_run_result)
"status": WorkflowNodeExecutionStatus.EXCEPTION,
"error": node_run_result.error,
"inputs": node_run_result.inputs,
"metadata": {"error_strategy": node_instance.node_data.error_strategy},
}
if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
node_run_result = NodeRunResult(
**node_error_args,
outputs={
**node_instance.node_data.default_value_dict,
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
else:
node_run_result = NodeRunResult(
**node_error_args,
outputs={
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
run_succeeded = node_run_result.status in ( run_succeeded = node_run_result.status in (
WorkflowNodeExecutionStatus.SUCCEEDED, WorkflowNodeExecutionStatus.SUCCEEDED,
WorkflowNodeExecutionStatus.EXCEPTION, WorkflowNodeExecutionStatus.EXCEPTION,
@@ -362,3 +340,149 @@ class WorkflowService:
) )
else: else:
raise ValueError(f"Invalid app mode: {app_model.mode}") raise ValueError(f"Invalid app mode: {app_model.mode}")
def run_retriable_draft_workflow_node(
self, app_model: App, node_id: str, user_inputs: dict, account: Account
) -> list[WorkflowNodeExecution]:
"""
Run draft retry workflow node
"""
# fetch draft workflow by app_model
draft_workflow = self.get_draft_workflow(app_model=app_model)
if not draft_workflow:
raise ValueError("Workflow not initialized")
# init retry variables
start_at = time.perf_counter()
should_retry = True
retries = 0
max_retries = 0
retry_interval = 0.0
list_node_executions = []
while retries <= max_retries and should_retry:
reties_start_at = time.perf_counter()
should_retry = False
try:
# run draft workflow node
node_instance, generator = WorkflowEntry.single_step_run(
workflow=draft_workflow,
node_id=node_id,
user_inputs=user_inputs,
user_id=account.id,
)
node_instance = cast(BaseNode[BaseNodeData], node_instance)
max_retries = node_instance.node_data.retry_config.max_retries
retry_interval = node_instance.node_data.retry_config.retry_interval_seconds
node_run_result: NodeRunResult | None = None
for event in generator:
if isinstance(event, RunCompletedEvent):
node_run_result = event.run_result
# sign output files
node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
break
if not node_run_result:
raise ValueError("Node run failed with no run result")
# single step debug mode error handling return
if node_run_result.status == WorkflowNodeExecutionStatus.FAILED:
if node_instance.should_retry and retries < max_retries:
retries += 1
should_retry = True
node_run_result.status = WorkflowNodeExecutionStatus.RETRY
elif node_instance.should_continue_on_error:
node_run_result = self._handle_continue_on_error(node_instance, node_run_result)
elif node_instance.node_type == NodeType.HTTP_REQUEST and node_run_result.outputs:
node_run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED
error = node_run_result.error or None
except WorkflowNodeRunFailedError as e:
node_instance = e.node_instance
node_run_result = None
error = e.error
start_at = (
reties_start_at
if not node_run_result or node_run_result.status == WorkflowNodeExecutionStatus.RETRY
else start_at
)
workflow_node_execution = WorkflowNodeExecution()
workflow_node_execution.tenant_id = app_model.tenant_id
workflow_node_execution.app_id = app_model.id
workflow_node_execution.workflow_id = draft_workflow.id
workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
workflow_node_execution.index = 1
workflow_node_execution.node_id = node_id
workflow_node_execution.node_type = node_instance.node_type
workflow_node_execution.title = node_instance.node_data.title
workflow_node_execution.elapsed_time = time.perf_counter() - start_at
workflow_node_execution.created_by_role = CreatedByRole.ACCOUNT.value
workflow_node_execution.created_by = account.id
workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
workflow_node_execution.error = error or None
if node_run_result:
# create workflow node execution
inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
process_data = (
WorkflowEntry.handle_special_values(node_run_result.process_data)
if node_run_result.process_data
else None
)
outputs = (
WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
)
workflow_node_execution.status = node_run_result.status.value
workflow_node_execution.inputs = json.dumps(inputs)
workflow_node_execution.process_data = json.dumps(process_data)
workflow_node_execution.outputs = json.dumps(outputs)
workflow_node_execution.execution_metadata = (
json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
)
db.session.add(workflow_node_execution)
list_node_executions.append(workflow_node_execution)
db.session.commit()
if should_retry and retry_interval:
time.sleep(retry_interval)
return list_node_executions
def _handle_continue_on_error(
self, node_instance: BaseNode[BaseNodeData], node_run_result: NodeRunResult
) -> NodeRunResult:
node_error_args = {
"status": WorkflowNodeExecutionStatus.EXCEPTION,
"error": node_run_result.error,
"inputs": node_run_result.inputs,
"metadata": {NodeRunMetadataKey.ERROR_STRATEGY: node_instance.node_data.error_strategy},
}
if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
node_run_result = NodeRunResult(
status=WorkflowNodeExecutionStatus.EXCEPTION,
error=node_run_result.error,
inputs=node_run_result.inputs,
metadata={NodeRunMetadataKey.ERROR_STRATEGY: node_instance.node_data.error_strategy},
outputs={
**node_instance.node_data.default_value_dict,
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
else:
node_run_result = NodeRunResult(
status=WorkflowNodeExecutionStatus.EXCEPTION,
error=node_run_result.error,
inputs=node_run_result.inputs,
metadata={NodeRunMetadataKey.ERROR_STRATEGY: node_instance.node_data.error_strategy},
outputs={
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
return node_run_result