mirror of
https://github.com/langgenius/dify.git
synced 2026-03-25 17:56:52 +00:00
Compare commits
26 Commits
1.13.3
...
yanli/fix-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d69e88ca29 | ||
|
|
6b09ecc25e | ||
|
|
c446dd6164 | ||
|
|
ead33f2914 | ||
|
|
226cf788d1 | ||
|
|
f659eb48c6 | ||
|
|
a17f6f62bf | ||
|
|
7b8c57d95b | ||
|
|
ceccc70d15 | ||
|
|
3193d7c9a5 | ||
|
|
cd9306d4f9 | ||
|
|
84d1b05501 | ||
|
|
f81e0c7c8d | ||
|
|
dea90b0ccd | ||
|
|
bdd4542759 | ||
|
|
5e22818296 | ||
|
|
64308c3d0d | ||
|
|
37df3899ff | ||
|
|
9100190a68 | ||
|
|
344f6be7cd | ||
|
|
f169cf8654 | ||
|
|
e76fbcb045 | ||
|
|
e6f00a2bf9 | ||
|
|
715f3affe5 | ||
|
|
4f73766a21 | ||
|
|
fe90453eed |
@@ -16,14 +16,12 @@ api = ExternalApi(
|
||||
inner_api_ns = Namespace("inner_api", description="Internal API operations", path="/")
|
||||
|
||||
from . import mail as _mail
|
||||
from .app import dsl as _app_dsl
|
||||
from .plugin import plugin as _plugin
|
||||
from .workspace import workspace as _workspace
|
||||
|
||||
api.add_namespace(inner_api_ns)
|
||||
|
||||
__all__ = [
|
||||
"_app_dsl",
|
||||
"_mail",
|
||||
"_plugin",
|
||||
"_workspace",
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
|
||||
@@ -1,110 +0,0 @@
|
||||
"""Inner API endpoints for app DSL import/export.
|
||||
|
||||
Called by the enterprise admin-api service. Import requires ``creator_email``
|
||||
to attribute the created app; workspace/membership validation is done by the
|
||||
Go admin-api caller.
|
||||
"""
|
||||
|
||||
from flask import request
|
||||
from flask_restx import Resource
|
||||
from pydantic import BaseModel, Field
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from controllers.common.schema import register_schema_model
|
||||
from controllers.console.wraps import setup_required
|
||||
from controllers.inner_api import inner_api_ns
|
||||
from controllers.inner_api.wraps import enterprise_inner_api_only
|
||||
from extensions.ext_database import db
|
||||
from models import Account, App
|
||||
from models.account import AccountStatus
|
||||
from services.app_dsl_service import AppDslService, ImportMode, ImportStatus
|
||||
|
||||
|
||||
class InnerAppDSLImportPayload(BaseModel):
|
||||
yaml_content: str = Field(description="YAML DSL content")
|
||||
creator_email: str = Field(description="Email of the workspace member who will own the imported app")
|
||||
name: str | None = Field(default=None, description="Override app name from DSL")
|
||||
description: str | None = Field(default=None, description="Override app description from DSL")
|
||||
|
||||
|
||||
register_schema_model(inner_api_ns, InnerAppDSLImportPayload)
|
||||
|
||||
|
||||
@inner_api_ns.route("/enterprise/workspaces/<string:workspace_id>/dsl/import")
|
||||
class EnterpriseAppDSLImport(Resource):
|
||||
@setup_required
|
||||
@enterprise_inner_api_only
|
||||
@inner_api_ns.doc("enterprise_app_dsl_import")
|
||||
@inner_api_ns.expect(inner_api_ns.models[InnerAppDSLImportPayload.__name__])
|
||||
@inner_api_ns.doc(
|
||||
responses={
|
||||
200: "Import completed",
|
||||
202: "Import pending (DSL version mismatch requires confirmation)",
|
||||
400: "Import failed (business error)",
|
||||
404: "Creator account not found or inactive",
|
||||
}
|
||||
)
|
||||
def post(self, workspace_id: str):
|
||||
"""Import a DSL into a workspace on behalf of a specified creator."""
|
||||
args = InnerAppDSLImportPayload.model_validate(inner_api_ns.payload or {})
|
||||
|
||||
account = _get_active_account(args.creator_email)
|
||||
if account is None:
|
||||
return {"message": f"account '{args.creator_email}' not found or inactive"}, 404
|
||||
|
||||
account.set_tenant_id(workspace_id)
|
||||
|
||||
with Session(db.engine) as session:
|
||||
dsl_service = AppDslService(session)
|
||||
result = dsl_service.import_app(
|
||||
account=account,
|
||||
import_mode=ImportMode.YAML_CONTENT,
|
||||
yaml_content=args.yaml_content,
|
||||
name=args.name,
|
||||
description=args.description,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
if result.status == ImportStatus.FAILED:
|
||||
return result.model_dump(mode="json"), 400
|
||||
if result.status == ImportStatus.PENDING:
|
||||
return result.model_dump(mode="json"), 202
|
||||
return result.model_dump(mode="json"), 200
|
||||
|
||||
|
||||
@inner_api_ns.route("/enterprise/apps/<string:app_id>/dsl")
|
||||
class EnterpriseAppDSLExport(Resource):
|
||||
@setup_required
|
||||
@enterprise_inner_api_only
|
||||
@inner_api_ns.doc(
|
||||
"enterprise_app_dsl_export",
|
||||
responses={
|
||||
200: "Export successful",
|
||||
404: "App not found",
|
||||
},
|
||||
)
|
||||
def get(self, app_id: str):
|
||||
"""Export an app's DSL as YAML."""
|
||||
include_secret = request.args.get("include_secret", "false").lower() == "true"
|
||||
|
||||
app_model = db.session.query(App).filter_by(id=app_id).first()
|
||||
if not app_model:
|
||||
return {"message": "app not found"}, 404
|
||||
|
||||
data = AppDslService.export_dsl(
|
||||
app_model=app_model,
|
||||
include_secret=include_secret,
|
||||
)
|
||||
|
||||
return {"data": data}, 200
|
||||
|
||||
|
||||
def _get_active_account(email: str) -> Account | None:
|
||||
"""Look up an active account by email.
|
||||
|
||||
Workspace membership is already validated by the Go admin-api caller.
|
||||
"""
|
||||
account = db.session.query(Account).filter_by(email=email).first()
|
||||
if account is None or account.status != AccountStatus.ACTIVE:
|
||||
return None
|
||||
return account
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "dify-api"
|
||||
version = "1.13.3"
|
||||
version = "1.13.2"
|
||||
requires-python = ">=3.11,<3.13"
|
||||
|
||||
dependencies = [
|
||||
|
||||
@@ -163,9 +163,11 @@ class DifyTestContainers:
|
||||
wait_for_logs(self.redis, "Ready to accept connections", timeout=30)
|
||||
logger.info("Redis container is ready and accepting connections")
|
||||
|
||||
# Start Dify Sandbox container for code execution environment.
|
||||
# Start Dify Sandbox container for code execution environment
|
||||
# Dify Sandbox provides a secure environment for executing user code
|
||||
# Use pinned version 0.2.12 to match production docker-compose configuration
|
||||
logger.info("Initializing Dify Sandbox container...")
|
||||
self.dify_sandbox = DockerContainer(image="langgenius/dify-sandbox:0.2.14").with_network(self.network)
|
||||
self.dify_sandbox = DockerContainer(image="langgenius/dify-sandbox:0.2.12").with_network(self.network)
|
||||
self.dify_sandbox.with_exposed_ports(8194)
|
||||
self.dify_sandbox.env = {
|
||||
"API_KEY": "test_api_key",
|
||||
@@ -185,7 +187,7 @@ class DifyTestContainers:
|
||||
# Start Dify Plugin Daemon container for plugin management
|
||||
# Dify Plugin Daemon provides plugin lifecycle management and execution
|
||||
logger.info("Initializing Dify Plugin Daemon container...")
|
||||
self.dify_plugin_daemon = DockerContainer(image="langgenius/dify-plugin-daemon:0.5.3-local").with_network(
|
||||
self.dify_plugin_daemon = DockerContainer(image="langgenius/dify-plugin-daemon:0.5.4-local").with_network(
|
||||
self.network
|
||||
)
|
||||
self.dify_plugin_daemon.with_exposed_ports(5002)
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
|
||||
@@ -1,245 +0,0 @@
|
||||
"""Unit tests for inner_api app DSL import/export endpoints.
|
||||
|
||||
Tests Pydantic model validation, endpoint handler logic, and the
|
||||
_get_active_account helper. Auth/setup decorators are tested separately
|
||||
in test_auth_wraps.py; handler tests use inspect.unwrap() to bypass them.
|
||||
"""
|
||||
|
||||
import inspect
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from flask import Flask
|
||||
from pydantic import ValidationError
|
||||
|
||||
from controllers.inner_api.app.dsl import (
|
||||
EnterpriseAppDSLExport,
|
||||
EnterpriseAppDSLImport,
|
||||
InnerAppDSLImportPayload,
|
||||
_get_active_account,
|
||||
)
|
||||
from services.app_dsl_service import ImportStatus
|
||||
|
||||
|
||||
class TestInnerAppDSLImportPayload:
|
||||
"""Test InnerAppDSLImportPayload Pydantic model validation."""
|
||||
|
||||
def test_valid_payload_all_fields(self):
|
||||
data = {
|
||||
"yaml_content": "version: 0.6.0\nkind: app\n",
|
||||
"creator_email": "user@example.com",
|
||||
"name": "My App",
|
||||
"description": "A test app",
|
||||
}
|
||||
payload = InnerAppDSLImportPayload.model_validate(data)
|
||||
assert payload.yaml_content == data["yaml_content"]
|
||||
assert payload.creator_email == "user@example.com"
|
||||
assert payload.name == "My App"
|
||||
assert payload.description == "A test app"
|
||||
|
||||
def test_valid_payload_optional_fields_omitted(self):
|
||||
data = {
|
||||
"yaml_content": "version: 0.6.0\n",
|
||||
"creator_email": "user@example.com",
|
||||
}
|
||||
payload = InnerAppDSLImportPayload.model_validate(data)
|
||||
assert payload.name is None
|
||||
assert payload.description is None
|
||||
|
||||
def test_missing_yaml_content_fails(self):
|
||||
with pytest.raises(ValidationError) as exc_info:
|
||||
InnerAppDSLImportPayload.model_validate({"creator_email": "a@b.com"})
|
||||
assert "yaml_content" in str(exc_info.value)
|
||||
|
||||
def test_missing_creator_email_fails(self):
|
||||
with pytest.raises(ValidationError) as exc_info:
|
||||
InnerAppDSLImportPayload.model_validate({"yaml_content": "test"})
|
||||
assert "creator_email" in str(exc_info.value)
|
||||
|
||||
|
||||
class TestGetActiveAccount:
|
||||
"""Test the _get_active_account helper function."""
|
||||
|
||||
@patch("controllers.inner_api.app.dsl.db")
|
||||
def test_returns_active_account(self, mock_db):
|
||||
mock_account = MagicMock()
|
||||
mock_account.status = "active"
|
||||
mock_db.session.query.return_value.filter_by.return_value.first.return_value = mock_account
|
||||
|
||||
result = _get_active_account("user@example.com")
|
||||
|
||||
assert result is mock_account
|
||||
mock_db.session.query.return_value.filter_by.assert_called_once_with(email="user@example.com")
|
||||
|
||||
@patch("controllers.inner_api.app.dsl.db")
|
||||
def test_returns_none_for_inactive_account(self, mock_db):
|
||||
mock_account = MagicMock()
|
||||
mock_account.status = "banned"
|
||||
mock_db.session.query.return_value.filter_by.return_value.first.return_value = mock_account
|
||||
|
||||
result = _get_active_account("banned@example.com")
|
||||
|
||||
assert result is None
|
||||
|
||||
@patch("controllers.inner_api.app.dsl.db")
|
||||
def test_returns_none_for_nonexistent_email(self, mock_db):
|
||||
mock_db.session.query.return_value.filter_by.return_value.first.return_value = None
|
||||
|
||||
result = _get_active_account("missing@example.com")
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestEnterpriseAppDSLImport:
|
||||
"""Test EnterpriseAppDSLImport endpoint handler logic.
|
||||
|
||||
Uses inspect.unwrap() to bypass auth/setup decorators.
|
||||
"""
|
||||
|
||||
@pytest.fixture
|
||||
def api_instance(self):
|
||||
return EnterpriseAppDSLImport()
|
||||
|
||||
@pytest.fixture
|
||||
def _mock_import_deps(self):
|
||||
"""Patch db, Session, and AppDslService for import handler tests."""
|
||||
with (
|
||||
patch("controllers.inner_api.app.dsl.db"),
|
||||
patch("controllers.inner_api.app.dsl.Session") as mock_session,
|
||||
patch("controllers.inner_api.app.dsl.AppDslService") as mock_dsl_cls,
|
||||
):
|
||||
mock_session.return_value.__enter__ = MagicMock(return_value=MagicMock())
|
||||
mock_session.return_value.__exit__ = MagicMock(return_value=False)
|
||||
self._mock_dsl = MagicMock()
|
||||
mock_dsl_cls.return_value = self._mock_dsl
|
||||
yield
|
||||
|
||||
def _make_import_result(self, status: ImportStatus, **kwargs) -> "Import":
|
||||
from services.app_dsl_service import Import
|
||||
|
||||
result = Import(
|
||||
id="import-id",
|
||||
status=status,
|
||||
app_id=kwargs.get("app_id", "app-123"),
|
||||
app_mode=kwargs.get("app_mode", "workflow"),
|
||||
)
|
||||
return result
|
||||
|
||||
@pytest.mark.usefixtures("_mock_import_deps")
|
||||
@patch("controllers.inner_api.app.dsl._get_active_account")
|
||||
def test_import_success_returns_200(self, mock_get_account, api_instance, app: Flask):
|
||||
mock_account = MagicMock()
|
||||
mock_get_account.return_value = mock_account
|
||||
self._mock_dsl.import_app.return_value = self._make_import_result(ImportStatus.COMPLETED)
|
||||
|
||||
unwrapped = inspect.unwrap(api_instance.post)
|
||||
with app.test_request_context():
|
||||
with patch("controllers.inner_api.app.dsl.inner_api_ns") as mock_ns:
|
||||
mock_ns.payload = {
|
||||
"yaml_content": "version: 0.6.0\n",
|
||||
"creator_email": "user@example.com",
|
||||
}
|
||||
result = unwrapped(api_instance, workspace_id="ws-123")
|
||||
|
||||
body, status_code = result
|
||||
assert status_code == 200
|
||||
assert body["status"] == "completed"
|
||||
mock_account.set_tenant_id.assert_called_once_with("ws-123")
|
||||
|
||||
@pytest.mark.usefixtures("_mock_import_deps")
|
||||
@patch("controllers.inner_api.app.dsl._get_active_account")
|
||||
def test_import_pending_returns_202(self, mock_get_account, api_instance, app: Flask):
|
||||
mock_get_account.return_value = MagicMock()
|
||||
self._mock_dsl.import_app.return_value = self._make_import_result(ImportStatus.PENDING)
|
||||
|
||||
unwrapped = inspect.unwrap(api_instance.post)
|
||||
with app.test_request_context():
|
||||
with patch("controllers.inner_api.app.dsl.inner_api_ns") as mock_ns:
|
||||
mock_ns.payload = {"yaml_content": "test", "creator_email": "u@e.com"}
|
||||
body, status_code = unwrapped(api_instance, workspace_id="ws-123")
|
||||
|
||||
assert status_code == 202
|
||||
assert body["status"] == "pending"
|
||||
|
||||
@pytest.mark.usefixtures("_mock_import_deps")
|
||||
@patch("controllers.inner_api.app.dsl._get_active_account")
|
||||
def test_import_failed_returns_400(self, mock_get_account, api_instance, app: Flask):
|
||||
mock_get_account.return_value = MagicMock()
|
||||
self._mock_dsl.import_app.return_value = self._make_import_result(ImportStatus.FAILED)
|
||||
|
||||
unwrapped = inspect.unwrap(api_instance.post)
|
||||
with app.test_request_context():
|
||||
with patch("controllers.inner_api.app.dsl.inner_api_ns") as mock_ns:
|
||||
mock_ns.payload = {"yaml_content": "test", "creator_email": "u@e.com"}
|
||||
body, status_code = unwrapped(api_instance, workspace_id="ws-123")
|
||||
|
||||
assert status_code == 400
|
||||
assert body["status"] == "failed"
|
||||
|
||||
@patch("controllers.inner_api.app.dsl._get_active_account")
|
||||
def test_import_account_not_found_returns_404(self, mock_get_account, api_instance, app: Flask):
|
||||
mock_get_account.return_value = None
|
||||
|
||||
unwrapped = inspect.unwrap(api_instance.post)
|
||||
with app.test_request_context():
|
||||
with patch("controllers.inner_api.app.dsl.inner_api_ns") as mock_ns:
|
||||
mock_ns.payload = {"yaml_content": "test", "creator_email": "missing@e.com"}
|
||||
result = unwrapped(api_instance, workspace_id="ws-123")
|
||||
|
||||
body, status_code = result
|
||||
assert status_code == 404
|
||||
assert "missing@e.com" in body["message"]
|
||||
|
||||
|
||||
class TestEnterpriseAppDSLExport:
|
||||
"""Test EnterpriseAppDSLExport endpoint handler logic.
|
||||
|
||||
Uses inspect.unwrap() to bypass auth/setup decorators.
|
||||
"""
|
||||
|
||||
@pytest.fixture
|
||||
def api_instance(self):
|
||||
return EnterpriseAppDSLExport()
|
||||
|
||||
@patch("controllers.inner_api.app.dsl.AppDslService")
|
||||
@patch("controllers.inner_api.app.dsl.db")
|
||||
def test_export_success_returns_200(self, mock_db, mock_dsl_cls, api_instance, app: Flask):
|
||||
mock_app = MagicMock()
|
||||
mock_db.session.query.return_value.filter_by.return_value.first.return_value = mock_app
|
||||
mock_dsl_cls.export_dsl.return_value = "version: 0.6.0\nkind: app\n"
|
||||
|
||||
unwrapped = inspect.unwrap(api_instance.get)
|
||||
with app.test_request_context("?include_secret=false"):
|
||||
result = unwrapped(api_instance, app_id="app-123")
|
||||
|
||||
body, status_code = result
|
||||
assert status_code == 200
|
||||
assert body["data"] == "version: 0.6.0\nkind: app\n"
|
||||
mock_dsl_cls.export_dsl.assert_called_once_with(app_model=mock_app, include_secret=False)
|
||||
|
||||
@patch("controllers.inner_api.app.dsl.AppDslService")
|
||||
@patch("controllers.inner_api.app.dsl.db")
|
||||
def test_export_with_secret(self, mock_db, mock_dsl_cls, api_instance, app: Flask):
|
||||
mock_app = MagicMock()
|
||||
mock_db.session.query.return_value.filter_by.return_value.first.return_value = mock_app
|
||||
mock_dsl_cls.export_dsl.return_value = "yaml-data"
|
||||
|
||||
unwrapped = inspect.unwrap(api_instance.get)
|
||||
with app.test_request_context("?include_secret=true"):
|
||||
result = unwrapped(api_instance, app_id="app-123")
|
||||
|
||||
body, status_code = result
|
||||
assert status_code == 200
|
||||
mock_dsl_cls.export_dsl.assert_called_once_with(app_model=mock_app, include_secret=True)
|
||||
|
||||
@patch("controllers.inner_api.app.dsl.db")
|
||||
def test_export_app_not_found_returns_404(self, mock_db, api_instance, app: Flask):
|
||||
mock_db.session.query.return_value.filter_by.return_value.first.return_value = None
|
||||
|
||||
unwrapped = inspect.unwrap(api_instance.get)
|
||||
with app.test_request_context("?include_secret=false"):
|
||||
result = unwrapped(api_instance, app_id="nonexistent")
|
||||
|
||||
body, status_code = result
|
||||
assert status_code == 404
|
||||
assert "app not found" in body["message"]
|
||||
2
api/uv.lock
generated
2
api/uv.lock
generated
@@ -1457,7 +1457,7 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "dify-api"
|
||||
version = "1.13.3"
|
||||
version = "1.13.2"
|
||||
source = { virtual = "." }
|
||||
dependencies = [
|
||||
{ name = "aliyun-log-python-sdk" },
|
||||
|
||||
@@ -21,7 +21,7 @@ services:
|
||||
|
||||
# API service
|
||||
api:
|
||||
image: langgenius/dify-api:1.13.3
|
||||
image: langgenius/dify-api:1.13.2
|
||||
restart: always
|
||||
environment:
|
||||
# Use the shared environment variables.
|
||||
@@ -63,7 +63,7 @@ services:
|
||||
# worker service
|
||||
# The Celery worker for processing all queues (dataset, workflow, mail, etc.)
|
||||
worker:
|
||||
image: langgenius/dify-api:1.13.3
|
||||
image: langgenius/dify-api:1.13.2
|
||||
restart: always
|
||||
environment:
|
||||
# Use the shared environment variables.
|
||||
@@ -102,7 +102,7 @@ services:
|
||||
# worker_beat service
|
||||
# Celery beat for scheduling periodic tasks.
|
||||
worker_beat:
|
||||
image: langgenius/dify-api:1.13.3
|
||||
image: langgenius/dify-api:1.13.2
|
||||
restart: always
|
||||
environment:
|
||||
# Use the shared environment variables.
|
||||
@@ -132,7 +132,7 @@ services:
|
||||
|
||||
# Frontend web application.
|
||||
web:
|
||||
image: langgenius/dify-web:1.13.3
|
||||
image: langgenius/dify-web:1.13.2
|
||||
restart: always
|
||||
environment:
|
||||
CONSOLE_API_URL: ${CONSOLE_API_URL:-}
|
||||
@@ -245,7 +245,7 @@ services:
|
||||
|
||||
# The DifySandbox
|
||||
sandbox:
|
||||
image: langgenius/dify-sandbox:0.2.14
|
||||
image: langgenius/dify-sandbox:0.2.12
|
||||
restart: always
|
||||
environment:
|
||||
# The DifySandbox configurations
|
||||
@@ -269,7 +269,7 @@ services:
|
||||
|
||||
# plugin daemon
|
||||
plugin_daemon:
|
||||
image: langgenius/dify-plugin-daemon:0.5.3-local
|
||||
image: langgenius/dify-plugin-daemon:0.5.4-local
|
||||
restart: always
|
||||
environment:
|
||||
# Use the shared environment variables.
|
||||
|
||||
@@ -97,7 +97,7 @@ services:
|
||||
|
||||
# The DifySandbox
|
||||
sandbox:
|
||||
image: langgenius/dify-sandbox:0.2.14
|
||||
image: langgenius/dify-sandbox:0.2.12
|
||||
restart: always
|
||||
env_file:
|
||||
- ./middleware.env
|
||||
@@ -123,7 +123,7 @@ services:
|
||||
|
||||
# plugin daemon
|
||||
plugin_daemon:
|
||||
image: langgenius/dify-plugin-daemon:0.5.3-local
|
||||
image: langgenius/dify-plugin-daemon:0.5.4-local
|
||||
restart: always
|
||||
env_file:
|
||||
- ./middleware.env
|
||||
|
||||
@@ -731,7 +731,7 @@ services:
|
||||
|
||||
# API service
|
||||
api:
|
||||
image: langgenius/dify-api:1.13.3
|
||||
image: langgenius/dify-api:1.13.2
|
||||
restart: always
|
||||
environment:
|
||||
# Use the shared environment variables.
|
||||
@@ -773,7 +773,7 @@ services:
|
||||
# worker service
|
||||
# The Celery worker for processing all queues (dataset, workflow, mail, etc.)
|
||||
worker:
|
||||
image: langgenius/dify-api:1.13.3
|
||||
image: langgenius/dify-api:1.13.2
|
||||
restart: always
|
||||
environment:
|
||||
# Use the shared environment variables.
|
||||
@@ -812,7 +812,7 @@ services:
|
||||
# worker_beat service
|
||||
# Celery beat for scheduling periodic tasks.
|
||||
worker_beat:
|
||||
image: langgenius/dify-api:1.13.3
|
||||
image: langgenius/dify-api:1.13.2
|
||||
restart: always
|
||||
environment:
|
||||
# Use the shared environment variables.
|
||||
@@ -842,7 +842,7 @@ services:
|
||||
|
||||
# Frontend web application.
|
||||
web:
|
||||
image: langgenius/dify-web:1.13.3
|
||||
image: langgenius/dify-web:1.13.2
|
||||
restart: always
|
||||
environment:
|
||||
CONSOLE_API_URL: ${CONSOLE_API_URL:-}
|
||||
@@ -955,7 +955,7 @@ services:
|
||||
|
||||
# The DifySandbox
|
||||
sandbox:
|
||||
image: langgenius/dify-sandbox:0.2.14
|
||||
image: langgenius/dify-sandbox:0.2.12
|
||||
restart: always
|
||||
environment:
|
||||
# The DifySandbox configurations
|
||||
@@ -979,7 +979,7 @@ services:
|
||||
|
||||
# plugin daemon
|
||||
plugin_daemon:
|
||||
image: langgenius/dify-plugin-daemon:0.5.3-local
|
||||
image: langgenius/dify-plugin-daemon:0.5.4-local
|
||||
restart: always
|
||||
environment:
|
||||
# Use the shared environment variables.
|
||||
|
||||
@@ -5,8 +5,7 @@ app:
|
||||
max_workers: 4
|
||||
max_requests: 50
|
||||
worker_timeout: 5
|
||||
python_path: /opt/python/bin/python3
|
||||
nodejs_path: /usr/local/bin/node
|
||||
python_path: /usr/local/bin/python3
|
||||
enable_network: True # please make sure there is no network risk in your environment
|
||||
allowed_syscalls: # please leave it empty if you have no idea how seccomp works
|
||||
proxy:
|
||||
|
||||
@@ -5,7 +5,7 @@ app:
|
||||
max_workers: 4
|
||||
max_requests: 50
|
||||
worker_timeout: 5
|
||||
python_path: /opt/python/bin/python3
|
||||
python_path: /usr/local/bin/python3
|
||||
python_lib_path:
|
||||
- /usr/local/lib/python3.10
|
||||
- /usr/lib/python3.10
|
||||
|
||||
@@ -588,6 +588,66 @@ describe('useChat', () => {
|
||||
expect(lastResponse.workflowProcess?.status).toBe('failed')
|
||||
})
|
||||
|
||||
it('should keep separate iteration traces for repeated executions of the same iteration node', async () => {
|
||||
let callbacks: HookCallbacks
|
||||
|
||||
vi.mocked(ssePost).mockImplementation(async (_url, _params, options) => {
|
||||
callbacks = options as HookCallbacks
|
||||
})
|
||||
|
||||
const { result } = renderHook(() => useChat())
|
||||
|
||||
act(() => {
|
||||
result.current.handleSend('test-url', { query: 'iteration trace test' }, {})
|
||||
})
|
||||
|
||||
act(() => {
|
||||
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-1', task_id: 't-1' })
|
||||
callbacks.onIterationStart({ data: { id: 'iter-run-1', node_id: 'iter-1' } })
|
||||
callbacks.onIterationStart({ data: { id: 'iter-run-2', node_id: 'iter-1' } })
|
||||
callbacks.onIterationFinish({ data: { id: 'iter-run-1', node_id: 'iter-1', status: 'succeeded' } })
|
||||
callbacks.onIterationFinish({ data: { id: 'iter-run-2', node_id: 'iter-1', status: 'succeeded' } })
|
||||
})
|
||||
|
||||
const tracing = result.current.chatList[1].workflowProcess?.tracing ?? []
|
||||
|
||||
expect(tracing).toHaveLength(2)
|
||||
expect(tracing).toEqual(expect.arrayContaining([
|
||||
expect.objectContaining({ id: 'iter-run-1', status: 'succeeded' }),
|
||||
expect.objectContaining({ id: 'iter-run-2', status: 'succeeded' }),
|
||||
]))
|
||||
})
|
||||
|
||||
it('should keep separate top-level traces for repeated executions of the same node', async () => {
|
||||
let callbacks: HookCallbacks
|
||||
|
||||
vi.mocked(ssePost).mockImplementation(async (_url, _params, options) => {
|
||||
callbacks = options as HookCallbacks
|
||||
})
|
||||
|
||||
const { result } = renderHook(() => useChat())
|
||||
|
||||
act(() => {
|
||||
result.current.handleSend('test-url', { query: 'top-level trace test' }, {})
|
||||
})
|
||||
|
||||
act(() => {
|
||||
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-1', task_id: 't-1' })
|
||||
callbacks.onNodeStarted({ data: { id: 'node-run-1', node_id: 'node-1', title: 'Node 1' } })
|
||||
callbacks.onNodeStarted({ data: { id: 'node-run-2', node_id: 'node-1', title: 'Node 1 retry' } })
|
||||
callbacks.onNodeFinished({ data: { id: 'node-run-1', node_id: 'node-1', status: 'succeeded' } })
|
||||
callbacks.onNodeFinished({ data: { id: 'node-run-2', node_id: 'node-1', status: 'succeeded' } })
|
||||
})
|
||||
|
||||
const tracing = result.current.chatList[1].workflowProcess?.tracing ?? []
|
||||
|
||||
expect(tracing).toHaveLength(2)
|
||||
expect(tracing).toEqual(expect.arrayContaining([
|
||||
expect.objectContaining({ id: 'node-run-1', status: 'succeeded' }),
|
||||
expect.objectContaining({ id: 'node-run-2', status: 'succeeded' }),
|
||||
]))
|
||||
})
|
||||
|
||||
it('should handle early exits in tracing events during iteration or loop', async () => {
|
||||
let callbacks: HookCallbacks
|
||||
|
||||
@@ -623,7 +683,7 @@ describe('useChat', () => {
|
||||
callbacks.onNodeFinished({ data: { id: 'n-1', iteration_id: 'iter-1' } })
|
||||
})
|
||||
|
||||
const traceLen1 = result.current.chatList[result.current.chatList.length - 1].workflowProcess?.tracing?.length
|
||||
const traceLen1 = result.current.chatList.at(-1)!.workflowProcess?.tracing?.length
|
||||
expect(traceLen1).toBe(0) // None added due to iteration early hits
|
||||
})
|
||||
|
||||
@@ -707,7 +767,7 @@ describe('useChat', () => {
|
||||
|
||||
expect(result.current.chatList.some(item => item.id === 'question-m-child')).toBe(true)
|
||||
expect(result.current.chatList.some(item => item.id === 'm-child')).toBe(true)
|
||||
expect(result.current.chatList[result.current.chatList.length - 1].content).toBe('child answer')
|
||||
expect(result.current.chatList.at(-1)!.content).toBe('child answer')
|
||||
})
|
||||
|
||||
it('should strip local file urls before sending payload', () => {
|
||||
@@ -805,7 +865,7 @@ describe('useChat', () => {
|
||||
})
|
||||
|
||||
expect(onGetConversationMessages).toHaveBeenCalled()
|
||||
expect(result.current.chatList[result.current.chatList.length - 1].content).toBe('streamed content')
|
||||
expect(result.current.chatList.at(-1)!.content).toBe('streamed content')
|
||||
})
|
||||
|
||||
it('should clear suggested questions when suggestion fetch fails after completion', async () => {
|
||||
@@ -851,7 +911,7 @@ describe('useChat', () => {
|
||||
callbacks.onNodeFinished({ data: { node_id: 'n-loop', id: 'n-loop' } })
|
||||
})
|
||||
|
||||
const latestResponse = result.current.chatList[result.current.chatList.length - 1]
|
||||
const latestResponse = result.current.chatList.at(-1)!
|
||||
expect(latestResponse.workflowProcess?.tracing).toHaveLength(0)
|
||||
})
|
||||
|
||||
@@ -878,7 +938,7 @@ describe('useChat', () => {
|
||||
callbacks.onTTSChunk('m-th-bind', '')
|
||||
})
|
||||
|
||||
const latestResponse = result.current.chatList[result.current.chatList.length - 1]
|
||||
const latestResponse = result.current.chatList.at(-1)!
|
||||
expect(latestResponse.id).toBe('m-th-bind')
|
||||
expect(latestResponse.conversationId).toBe('c-th-bind')
|
||||
expect(latestResponse.workflowProcess?.status).toBe('succeeded')
|
||||
@@ -971,7 +1031,7 @@ describe('useChat', () => {
|
||||
callbacks.onCompleted()
|
||||
})
|
||||
|
||||
const lastResponse = result.current.chatList[result.current.chatList.length - 1]
|
||||
const lastResponse = result.current.chatList.at(-1)!
|
||||
expect(lastResponse.agent_thoughts![0].thought).toContain('resumed')
|
||||
|
||||
expect(lastResponse.workflowProcess?.tracing?.length).toBeGreaterThan(0)
|
||||
|
||||
@@ -12,6 +12,7 @@ import type {
|
||||
IOnDataMoreInfo,
|
||||
IOtherOptions,
|
||||
} from '@/service/base'
|
||||
import type { NodeTracing } from '@/types/workflow'
|
||||
import { uniqBy } from 'es-toolkit/compat'
|
||||
import { noop } from 'es-toolkit/function'
|
||||
import { produce, setAutoFreeze } from 'immer'
|
||||
@@ -31,6 +32,8 @@ import {
|
||||
} from '@/app/components/base/file-uploader/utils'
|
||||
import { useToastContext } from '@/app/components/base/toast/context'
|
||||
import { NodeRunningStatus, WorkflowRunningStatus } from '@/app/components/workflow/types'
|
||||
import { upsertTopLevelTracingNodeOnStart } from '@/app/components/workflow/utils/top-level-tracing'
|
||||
import { findTracingIndexByExecutionOrUniqueNodeId } from '@/app/components/workflow/utils/tracing-execution'
|
||||
import useTimestamp from '@/hooks/use-timestamp'
|
||||
import { useParams, usePathname } from '@/next/navigation'
|
||||
import {
|
||||
@@ -52,6 +55,19 @@ type SendCallback = {
|
||||
isPublicAPI?: boolean
|
||||
}
|
||||
|
||||
type ParallelTraceLike = Pick<NodeTracing, 'id' | 'node_id' | 'parallel_id' | 'execution_metadata'>
|
||||
|
||||
const findParallelTraceIndex = (
|
||||
tracing: ParallelTraceLike[],
|
||||
data: Partial<ParallelTraceLike>,
|
||||
) => {
|
||||
return findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: data.id,
|
||||
nodeId: data.node_id,
|
||||
parallelId: data.execution_metadata?.parallel_id ?? data.parallel_id,
|
||||
})
|
||||
}
|
||||
|
||||
export const useChat = (
|
||||
config?: ChatConfig,
|
||||
formSettings?: {
|
||||
@@ -419,8 +435,7 @@ export const useChat = (
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
|
||||
const iterationIndex = findParallelTraceIndex(tracing, iterationFinishedData)
|
||||
if (iterationIndex > -1) {
|
||||
tracing[iterationIndex] = {
|
||||
...tracing[iterationIndex],
|
||||
@@ -432,38 +447,34 @@ export const useChat = (
|
||||
},
|
||||
onNodeStarted: ({ data: nodeStartedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (params.loop_id)
|
||||
return
|
||||
|
||||
if (!responseItem.workflowProcess)
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
|
||||
// if the node is already started, update the node
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess.tracing[currentIndex] = {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (nodeStartedData.iteration_id)
|
||||
return
|
||||
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...nodeStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
}
|
||||
upsertTopLevelTracingNodeOnStart(responseItem.workflowProcess.tracing, {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
})
|
||||
},
|
||||
onNodeFinished: ({ data: nodeFinishedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (params.loop_id)
|
||||
return
|
||||
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
|
||||
if (nodeFinishedData.iteration_id)
|
||||
return
|
||||
|
||||
if (nodeFinishedData.loop_id)
|
||||
return
|
||||
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
|
||||
if (!item.execution_metadata?.parallel_id)
|
||||
return item.id === nodeFinishedData.id
|
||||
@@ -505,8 +516,7 @@ export const useChat = (
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
|
||||
const loopIndex = findParallelTraceIndex(tracing, loopFinishedData)
|
||||
if (loopIndex > -1) {
|
||||
tracing[loopIndex] = {
|
||||
...tracing[loopIndex],
|
||||
@@ -582,7 +592,7 @@ export const useChat = (
|
||||
{},
|
||||
otherOptions,
|
||||
)
|
||||
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer])
|
||||
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer, params.loop_id])
|
||||
|
||||
const updateCurrentQAOnTree = useCallback(({
|
||||
parentId,
|
||||
@@ -972,12 +982,13 @@ export const useChat = (
|
||||
},
|
||||
onIterationFinish: ({ data: iterationFinishedData }) => {
|
||||
const tracing = responseItem.workflowProcess!.tracing!
|
||||
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
|
||||
tracing[iterationIndex] = {
|
||||
...tracing[iterationIndex],
|
||||
...iterationFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
const iterationIndex = findParallelTraceIndex(tracing, iterationFinishedData)
|
||||
if (iterationIndex > -1) {
|
||||
tracing[iterationIndex] = {
|
||||
...tracing[iterationIndex],
|
||||
...iterationFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
}
|
||||
|
||||
updateCurrentQAOnTree({
|
||||
@@ -988,30 +999,19 @@ export const useChat = (
|
||||
})
|
||||
},
|
||||
onNodeStarted: ({ data: nodeStartedData }) => {
|
||||
// `data` is the outer send payload for this request; loop child runs should not emit top-level node traces here.
|
||||
if (data.loop_id)
|
||||
return
|
||||
|
||||
if (!responseItem.workflowProcess)
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess.tracing[currentIndex] = {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (nodeStartedData.iteration_id)
|
||||
return
|
||||
|
||||
if (data.loop_id)
|
||||
return
|
||||
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...nodeStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
}
|
||||
upsertTopLevelTracingNodeOnStart(responseItem.workflowProcess.tracing, {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
questionItem,
|
||||
@@ -1020,10 +1020,14 @@ export const useChat = (
|
||||
})
|
||||
},
|
||||
onNodeFinished: ({ data: nodeFinishedData }) => {
|
||||
// Use the outer request payload here as well so loop child runs skip top-level finish handling entirely.
|
||||
if (data.loop_id)
|
||||
return
|
||||
|
||||
if (nodeFinishedData.iteration_id)
|
||||
return
|
||||
|
||||
if (data.loop_id)
|
||||
if (nodeFinishedData.loop_id)
|
||||
return
|
||||
|
||||
const currentIndex = responseItem.workflowProcess!.tracing!.findIndex((item) => {
|
||||
@@ -1069,12 +1073,13 @@ export const useChat = (
|
||||
},
|
||||
onLoopFinish: ({ data: loopFinishedData }) => {
|
||||
const tracing = responseItem.workflowProcess!.tracing!
|
||||
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
|
||||
tracing[loopIndex] = {
|
||||
...tracing[loopIndex],
|
||||
...loopFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
const loopIndex = findParallelTraceIndex(tracing, loopFinishedData)
|
||||
if (loopIndex > -1) {
|
||||
tracing[loopIndex] = {
|
||||
...tracing[loopIndex],
|
||||
...loopFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
}
|
||||
|
||||
updateCurrentQAOnTree({
|
||||
|
||||
@@ -264,7 +264,7 @@ describe('UrlInput', () => {
|
||||
|
||||
render(<UrlInput {...props} />)
|
||||
const input = screen.getByRole('textbox')
|
||||
await userEvent.type(input, longUrl)
|
||||
fireEvent.change(input, { target: { value: longUrl } })
|
||||
|
||||
expect(input).toHaveValue(longUrl)
|
||||
})
|
||||
@@ -275,7 +275,7 @@ describe('UrlInput', () => {
|
||||
|
||||
render(<UrlInput {...props} />)
|
||||
const input = screen.getByRole('textbox')
|
||||
await userEvent.type(input, unicodeUrl)
|
||||
fireEvent.change(input, { target: { value: unicodeUrl } })
|
||||
|
||||
expect(input).toHaveValue(unicodeUrl)
|
||||
})
|
||||
@@ -285,7 +285,7 @@ describe('UrlInput', () => {
|
||||
|
||||
render(<UrlInput {...props} />)
|
||||
const input = screen.getByRole('textbox')
|
||||
await userEvent.type(input, 'https://rapid.com', { delay: 1 })
|
||||
fireEvent.change(input, { target: { value: 'https://rapid.com' } })
|
||||
|
||||
expect(input).toHaveValue('https://rapid.com')
|
||||
})
|
||||
@@ -297,7 +297,7 @@ describe('UrlInput', () => {
|
||||
|
||||
render(<UrlInput {...props} />)
|
||||
const input = screen.getByRole('textbox')
|
||||
await userEvent.type(input, 'https://enter.com')
|
||||
fireEvent.change(input, { target: { value: 'https://enter.com' } })
|
||||
|
||||
// Focus button and press enter
|
||||
const button = screen.getByRole('button', { name: /run/i })
|
||||
|
||||
@@ -157,7 +157,7 @@ describe('useDatasetCardState', () => {
|
||||
expect(result.current.modalState.showRenameModal).toBe(false)
|
||||
})
|
||||
|
||||
it('should close confirm delete modal when closeConfirmDelete is called', () => {
|
||||
it('should close confirm delete modal when closeConfirmDelete is called', async () => {
|
||||
const dataset = createMockDataset()
|
||||
const { result } = renderHook(() =>
|
||||
useDatasetCardState({ dataset, onSuccess: vi.fn() }),
|
||||
@@ -168,7 +168,7 @@ describe('useDatasetCardState', () => {
|
||||
result.current.detectIsUsedByApp()
|
||||
})
|
||||
|
||||
waitFor(() => {
|
||||
await waitFor(() => {
|
||||
expect(result.current.modalState.showConfirmDelete).toBe(true)
|
||||
})
|
||||
|
||||
|
||||
@@ -101,6 +101,7 @@ const createHumanInput = (overrides: Partial<HumanInputFormData> = {}): HumanInp
|
||||
describe('workflow-stream-handlers helpers', () => {
|
||||
it('should update tracing, result text, and human input state', () => {
|
||||
const parallelTrace = createTrace({
|
||||
id: 'parallel-trace-1',
|
||||
node_id: 'parallel-node',
|
||||
execution_metadata: { parallel_id: 'parallel-1' },
|
||||
details: [[]],
|
||||
@@ -109,11 +110,13 @@ describe('workflow-stream-handlers helpers', () => {
|
||||
let workflowProcessData = appendParallelStart(undefined, parallelTrace)
|
||||
workflowProcessData = appendParallelNext(workflowProcessData, parallelTrace)
|
||||
workflowProcessData = finishParallelTrace(workflowProcessData, createTrace({
|
||||
id: 'parallel-trace-1',
|
||||
node_id: 'parallel-node',
|
||||
execution_metadata: { parallel_id: 'parallel-1' },
|
||||
error: 'failed',
|
||||
}))
|
||||
workflowProcessData = upsertWorkflowNode(workflowProcessData, createTrace({
|
||||
id: 'node-trace-1',
|
||||
node_id: 'node-1',
|
||||
execution_metadata: { parallel_id: 'parallel-2' },
|
||||
}))!
|
||||
@@ -160,6 +163,129 @@ describe('workflow-stream-handlers helpers', () => {
|
||||
expect(nextProcess.tracing[0]?.details).toEqual([[], []])
|
||||
})
|
||||
|
||||
it('should keep separate iteration and loop traces for repeated executions with different ids', () => {
|
||||
const process = createWorkflowProcess()
|
||||
process.tracing = [
|
||||
createTrace({
|
||||
id: 'iter-trace-1',
|
||||
node_id: 'iter-1',
|
||||
details: [[]],
|
||||
}),
|
||||
createTrace({
|
||||
id: 'iter-trace-2',
|
||||
node_id: 'iter-1',
|
||||
details: [[]],
|
||||
}),
|
||||
createTrace({
|
||||
id: 'loop-trace-1',
|
||||
node_id: 'loop-1',
|
||||
details: [[]],
|
||||
}),
|
||||
createTrace({
|
||||
id: 'loop-trace-2',
|
||||
node_id: 'loop-1',
|
||||
details: [[]],
|
||||
}),
|
||||
]
|
||||
|
||||
const iterNextProcess = appendParallelNext(process, createTrace({
|
||||
id: 'iter-trace-2',
|
||||
node_id: 'iter-1',
|
||||
}))
|
||||
const iterFinishedProcess = finishParallelTrace(iterNextProcess, createTrace({
|
||||
id: 'iter-trace-2',
|
||||
node_id: 'iter-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
details: undefined,
|
||||
}))
|
||||
const loopNextProcess = appendParallelNext(iterFinishedProcess, createTrace({
|
||||
id: 'loop-trace-2',
|
||||
node_id: 'loop-1',
|
||||
}))
|
||||
const loopFinishedProcess = finishParallelTrace(loopNextProcess, createTrace({
|
||||
id: 'loop-trace-2',
|
||||
node_id: 'loop-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
details: undefined,
|
||||
}))
|
||||
|
||||
expect(loopFinishedProcess.tracing[0]).toEqual(expect.objectContaining({
|
||||
id: 'iter-trace-1',
|
||||
details: [[]],
|
||||
status: NodeRunningStatus.Running,
|
||||
}))
|
||||
expect(loopFinishedProcess.tracing[1]).toEqual(expect.objectContaining({
|
||||
id: 'iter-trace-2',
|
||||
details: [[], []],
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}))
|
||||
expect(loopFinishedProcess.tracing[2]).toEqual(expect.objectContaining({
|
||||
id: 'loop-trace-1',
|
||||
details: [[]],
|
||||
status: NodeRunningStatus.Running,
|
||||
}))
|
||||
expect(loopFinishedProcess.tracing[3]).toEqual(expect.objectContaining({
|
||||
id: 'loop-trace-2',
|
||||
details: [[], []],
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}))
|
||||
})
|
||||
|
||||
it('should append a new top-level trace when the same node starts with a different execution id', () => {
|
||||
const process = createWorkflowProcess()
|
||||
process.tracing = [
|
||||
createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}),
|
||||
]
|
||||
|
||||
const updatedProcess = upsertWorkflowNode(process, createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
}))!
|
||||
|
||||
expect(updatedProcess.tracing).toHaveLength(2)
|
||||
expect(updatedProcess.tracing[1]).toEqual(expect.objectContaining({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
}))
|
||||
})
|
||||
|
||||
it('should finish the matching top-level trace when the same node runs again with a new execution id', () => {
|
||||
const process = createWorkflowProcess()
|
||||
process.tracing = [
|
||||
createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}),
|
||||
createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
}),
|
||||
]
|
||||
|
||||
const updatedProcess = finishWorkflowNode(process, createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}))!
|
||||
|
||||
expect(updatedProcess.tracing).toHaveLength(2)
|
||||
expect(updatedProcess.tracing[0]).toEqual(expect.objectContaining({
|
||||
id: 'trace-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}))
|
||||
expect(updatedProcess.tracing[1]).toEqual(expect.objectContaining({
|
||||
id: 'trace-2',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}))
|
||||
})
|
||||
|
||||
it('should leave tracing unchanged when a parallel next event has no matching trace', () => {
|
||||
const process = createWorkflowProcess()
|
||||
process.tracing = [
|
||||
@@ -171,6 +297,7 @@ describe('workflow-stream-handlers helpers', () => {
|
||||
]
|
||||
|
||||
const nextProcess = appendParallelNext(process, createTrace({
|
||||
id: 'trace-missing',
|
||||
node_id: 'missing-node',
|
||||
execution_metadata: { parallel_id: 'parallel-2' },
|
||||
}))
|
||||
@@ -228,6 +355,7 @@ describe('workflow-stream-handlers helpers', () => {
|
||||
},
|
||||
}))
|
||||
const notFinished = finishParallelTrace(process, createTrace({
|
||||
id: 'trace-missing',
|
||||
node_id: 'missing',
|
||||
execution_metadata: {
|
||||
parallel_id: 'parallel-missing',
|
||||
@@ -243,6 +371,7 @@ describe('workflow-stream-handlers helpers', () => {
|
||||
loop_id: 'loop-1',
|
||||
}))
|
||||
const unmatchedFinish = finishWorkflowNode(process, createTrace({
|
||||
id: 'trace-missing',
|
||||
node_id: 'missing',
|
||||
execution_metadata: {
|
||||
parallel_id: 'missing',
|
||||
|
||||
@@ -5,6 +5,8 @@ import type { HumanInputFormTimeoutData, NodeTracing, WorkflowFinishedResponse }
|
||||
import { produce } from 'immer'
|
||||
import { getFilesInLogs } from '@/app/components/base/file-uploader/utils'
|
||||
import { NodeRunningStatus, WorkflowRunningStatus } from '@/app/components/workflow/types'
|
||||
import { upsertTopLevelTracingNodeOnStart } from '@/app/components/workflow/utils/top-level-tracing'
|
||||
import { findTracingIndexByExecutionOrUniqueNodeId } from '@/app/components/workflow/utils/tracing-execution'
|
||||
import { sseGet } from '@/service/base'
|
||||
|
||||
type Notify = (payload: { type: 'error' | 'warning', message: string }) => void
|
||||
@@ -49,6 +51,20 @@ const matchParallelTrace = (trace: WorkflowProcess['tracing'][number], data: Nod
|
||||
|| trace.parallel_id === data.execution_metadata?.parallel_id)
|
||||
}
|
||||
|
||||
const findParallelTraceIndex = (tracing: WorkflowProcess['tracing'], data: NodeTracing) => {
|
||||
const parallelId = data.execution_metadata?.parallel_id
|
||||
const matchedIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: data.id,
|
||||
nodeId: data.node_id,
|
||||
parallelId,
|
||||
})
|
||||
|
||||
if (matchedIndex > -1)
|
||||
return matchedIndex
|
||||
|
||||
return tracing.findIndex(trace => matchParallelTrace(trace, data))
|
||||
}
|
||||
|
||||
const ensureParallelTraceDetails = (details?: NodeTracing['details']) => {
|
||||
return details?.length ? details : [[]]
|
||||
}
|
||||
@@ -68,7 +84,8 @@ const appendParallelStart = (current: WorkflowProcess | undefined, data: NodeTra
|
||||
const appendParallelNext = (current: WorkflowProcess | undefined, data: NodeTracing) => {
|
||||
return updateWorkflowProcess(current, (draft) => {
|
||||
draft.expand = true
|
||||
const trace = draft.tracing.find(item => matchParallelTrace(item, data))
|
||||
const traceIndex = findParallelTraceIndex(draft.tracing, data)
|
||||
const trace = draft.tracing[traceIndex]
|
||||
if (!trace)
|
||||
return
|
||||
|
||||
@@ -80,10 +97,13 @@ const appendParallelNext = (current: WorkflowProcess | undefined, data: NodeTrac
|
||||
const finishParallelTrace = (current: WorkflowProcess | undefined, data: NodeTracing) => {
|
||||
return updateWorkflowProcess(current, (draft) => {
|
||||
draft.expand = true
|
||||
const traceIndex = draft.tracing.findIndex(item => matchParallelTrace(item, data))
|
||||
const traceIndex = findParallelTraceIndex(draft.tracing, data)
|
||||
if (traceIndex > -1) {
|
||||
const currentTrace = draft.tracing[traceIndex]
|
||||
draft.tracing[traceIndex] = {
|
||||
...currentTrace,
|
||||
...data,
|
||||
details: data.details ?? currentTrace.details,
|
||||
expand: !!data.error,
|
||||
}
|
||||
}
|
||||
@@ -96,17 +116,13 @@ const upsertWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTrac
|
||||
|
||||
return updateWorkflowProcess(current, (draft) => {
|
||||
draft.expand = true
|
||||
const currentIndex = draft.tracing.findIndex(item => item.node_id === data.node_id)
|
||||
const nextTrace = {
|
||||
...data,
|
||||
status: NodeRunningStatus.Running,
|
||||
expand: true,
|
||||
}
|
||||
|
||||
if (currentIndex > -1)
|
||||
draft.tracing[currentIndex] = nextTrace
|
||||
else
|
||||
draft.tracing.push(nextTrace)
|
||||
upsertTopLevelTracingNodeOnStart(draft.tracing, nextTrace)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -115,7 +131,7 @@ const finishWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTrac
|
||||
return current
|
||||
|
||||
return updateWorkflowProcess(current, (draft) => {
|
||||
const currentIndex = draft.tracing.findIndex(trace => matchParallelTrace(trace, data))
|
||||
const currentIndex = findParallelTraceIndex(draft.tracing, data)
|
||||
if (currentIndex > -1) {
|
||||
draft.tracing[currentIndex] = {
|
||||
...(draft.tracing[currentIndex].extras
|
||||
|
||||
@@ -109,13 +109,13 @@ describe('useWorkflowAgentLog', () => {
|
||||
const { result, store } = renderWorkflowHook(() => useWorkflowAgentLog(), {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [{ node_id: 'n1', execution_metadata: {} }],
|
||||
tracing: [{ id: 'trace-1', node_id: 'n1', execution_metadata: {} }],
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
result.current.handleWorkflowAgentLog({
|
||||
data: { node_id: 'n1', message_id: 'm1' },
|
||||
data: { node_id: 'n1', node_execution_id: 'trace-1', message_id: 'm1' },
|
||||
} as AgentLogResponse)
|
||||
|
||||
const trace = store.getState().workflowRunningData!.tracing![0]
|
||||
@@ -128,6 +128,7 @@ describe('useWorkflowAgentLog', () => {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [{
|
||||
id: 'trace-1',
|
||||
node_id: 'n1',
|
||||
execution_metadata: { agent_log: [{ message_id: 'm1', text: 'log1' }] },
|
||||
}],
|
||||
@@ -136,7 +137,7 @@ describe('useWorkflowAgentLog', () => {
|
||||
})
|
||||
|
||||
result.current.handleWorkflowAgentLog({
|
||||
data: { node_id: 'n1', message_id: 'm2' },
|
||||
data: { node_id: 'n1', node_execution_id: 'trace-1', message_id: 'm2' },
|
||||
} as AgentLogResponse)
|
||||
|
||||
expect(store.getState().workflowRunningData!.tracing![0].execution_metadata!.agent_log).toHaveLength(2)
|
||||
@@ -147,6 +148,7 @@ describe('useWorkflowAgentLog', () => {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [{
|
||||
id: 'trace-1',
|
||||
node_id: 'n1',
|
||||
execution_metadata: { agent_log: [{ message_id: 'm1', text: 'old' }] },
|
||||
}],
|
||||
@@ -155,7 +157,7 @@ describe('useWorkflowAgentLog', () => {
|
||||
})
|
||||
|
||||
result.current.handleWorkflowAgentLog({
|
||||
data: { node_id: 'n1', message_id: 'm1', text: 'new' },
|
||||
data: { node_id: 'n1', node_execution_id: 'trace-1', message_id: 'm1', text: 'new' },
|
||||
} as unknown as AgentLogResponse)
|
||||
|
||||
const log = store.getState().workflowRunningData!.tracing![0].execution_metadata!.agent_log!
|
||||
@@ -167,17 +169,39 @@ describe('useWorkflowAgentLog', () => {
|
||||
const { result, store } = renderWorkflowHook(() => useWorkflowAgentLog(), {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [{ node_id: 'n1' }],
|
||||
tracing: [{ id: 'trace-1', node_id: 'n1' }],
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
result.current.handleWorkflowAgentLog({
|
||||
data: { node_id: 'n1', message_id: 'm1' },
|
||||
data: { node_id: 'n1', node_execution_id: 'trace-1', message_id: 'm1' },
|
||||
} as AgentLogResponse)
|
||||
|
||||
expect(store.getState().workflowRunningData!.tracing![0].execution_metadata!.agent_log).toHaveLength(1)
|
||||
})
|
||||
|
||||
it('should attach the log to the matching execution id when a node runs multiple times', () => {
|
||||
const { result, store } = renderWorkflowHook(() => useWorkflowAgentLog(), {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [
|
||||
{ id: 'trace-1', node_id: 'n1', execution_metadata: {} },
|
||||
{ id: 'trace-2', node_id: 'n1', execution_metadata: {} },
|
||||
],
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
result.current.handleWorkflowAgentLog({
|
||||
data: { node_id: 'n1', node_execution_id: 'trace-2', message_id: 'm2' },
|
||||
} as AgentLogResponse)
|
||||
|
||||
const tracing = store.getState().workflowRunningData!.tracing!
|
||||
expect(tracing[0].execution_metadata!.agent_log).toBeUndefined()
|
||||
expect(tracing[1].execution_metadata!.agent_log).toHaveLength(1)
|
||||
expect(tracing[1].execution_metadata!.agent_log![0].message_id).toBe('m2')
|
||||
})
|
||||
})
|
||||
|
||||
describe('useWorkflowNodeHumanInputFormFilled', () => {
|
||||
|
||||
@@ -109,7 +109,7 @@ describe('useWorkflowNodeStarted', () => {
|
||||
|
||||
act(() => {
|
||||
result.current.handleWorkflowNodeStarted(
|
||||
{ data: { node_id: 'n1' } } as NodeStartedResponse,
|
||||
{ data: { id: 'trace-n1', node_id: 'n1' } } as NodeStartedResponse,
|
||||
containerParams,
|
||||
)
|
||||
})
|
||||
@@ -138,7 +138,7 @@ describe('useWorkflowNodeStarted', () => {
|
||||
|
||||
act(() => {
|
||||
result.current.handleWorkflowNodeStarted(
|
||||
{ data: { node_id: 'n2' } } as NodeStartedResponse,
|
||||
{ data: { id: 'trace-n2', node_id: 'n2' } } as NodeStartedResponse,
|
||||
containerParams,
|
||||
)
|
||||
})
|
||||
@@ -157,8 +157,8 @@ describe('useWorkflowNodeStarted', () => {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [
|
||||
{ node_id: 'n0', status: NodeRunningStatus.Succeeded },
|
||||
{ node_id: 'n1', status: NodeRunningStatus.Succeeded },
|
||||
{ id: 'trace-0', node_id: 'n0', status: NodeRunningStatus.Succeeded },
|
||||
{ id: 'trace-1', node_id: 'n1', status: NodeRunningStatus.Succeeded },
|
||||
],
|
||||
}),
|
||||
},
|
||||
@@ -166,7 +166,7 @@ describe('useWorkflowNodeStarted', () => {
|
||||
|
||||
act(() => {
|
||||
result.current.handleWorkflowNodeStarted(
|
||||
{ data: { node_id: 'n1' } } as NodeStartedResponse,
|
||||
{ data: { id: 'trace-1', node_id: 'n1' } } as NodeStartedResponse,
|
||||
containerParams,
|
||||
)
|
||||
})
|
||||
@@ -175,6 +175,32 @@ describe('useWorkflowNodeStarted', () => {
|
||||
expect(tracing).toHaveLength(2)
|
||||
expect(tracing[1].status).toBe(NodeRunningStatus.Running)
|
||||
})
|
||||
|
||||
it('should append a new tracing entry when the same node starts a new execution id', () => {
|
||||
const { result, store } = renderViewportHook(() => useWorkflowNodeStarted(), {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [
|
||||
{ id: 'trace-0', node_id: 'n0', status: NodeRunningStatus.Succeeded },
|
||||
{ id: 'trace-1', node_id: 'n1', status: NodeRunningStatus.Succeeded },
|
||||
],
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
act(() => {
|
||||
result.current.handleWorkflowNodeStarted(
|
||||
{ data: { id: 'trace-2', node_id: 'n1' } } as NodeStartedResponse,
|
||||
containerParams,
|
||||
)
|
||||
})
|
||||
|
||||
const tracing = store.getState().workflowRunningData!.tracing!
|
||||
expect(tracing).toHaveLength(3)
|
||||
expect(tracing[2].id).toBe('trace-2')
|
||||
expect(tracing[2].node_id).toBe('n1')
|
||||
expect(tracing[2].status).toBe(NodeRunningStatus.Running)
|
||||
})
|
||||
})
|
||||
|
||||
describe('useWorkflowNodeIterationStarted', () => {
|
||||
|
||||
@@ -14,7 +14,7 @@ export const useWorkflowAgentLog = () => {
|
||||
} = workflowStore.getState()
|
||||
|
||||
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
|
||||
const currentIndex = draft.tracing!.findIndex(item => item.node_id === data.node_id)
|
||||
const currentIndex = draft.tracing!.findIndex(item => item.id === data.node_execution_id)
|
||||
if (currentIndex > -1) {
|
||||
const current = draft.tracing![currentIndex]
|
||||
|
||||
|
||||
@@ -33,8 +33,8 @@ export const useWorkflowNodeStarted = () => {
|
||||
transform,
|
||||
} = store.getState()
|
||||
const nodes = getNodes()
|
||||
const currentIndex = workflowRunningData?.tracing?.findIndex(item => item.node_id === data.node_id)
|
||||
if (currentIndex && currentIndex > -1) {
|
||||
const currentIndex = workflowRunningData?.tracing?.findIndex(item => item.id === data.id)
|
||||
if (currentIndex !== undefined && currentIndex > -1) {
|
||||
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
|
||||
draft.tracing![currentIndex] = {
|
||||
...data,
|
||||
|
||||
@@ -42,6 +42,12 @@ import {
|
||||
import { useHooksStore } from '../../hooks-store'
|
||||
import { useWorkflowStore } from '../../store'
|
||||
import { NodeRunningStatus, WorkflowRunningStatus } from '../../types'
|
||||
import { upsertTopLevelTracingNodeOnStart } from '../../utils/top-level-tracing'
|
||||
import {
|
||||
findTracingIndexByExecutionOrUniqueNodeId,
|
||||
mergeTracingNodePreservingExecutionMetadata,
|
||||
upsertTracingNodeOnResumeStart,
|
||||
} from '../../utils/tracing-execution'
|
||||
|
||||
type GetAbortController = (abortController: AbortController) => void
|
||||
type SendCallback = {
|
||||
@@ -468,10 +474,7 @@ export const useChat = (
|
||||
onIterationFinish: ({ data }) => {
|
||||
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
||||
if (currentTracingIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
||||
...responseItem.workflowProcess!.tracing[currentTracingIndex],
|
||||
...data,
|
||||
}
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data)
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
questionItem,
|
||||
@@ -495,10 +498,7 @@ export const useChat = (
|
||||
onLoopFinish: ({ data }) => {
|
||||
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
||||
if (currentTracingIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
||||
...responseItem.workflowProcess!.tracing[currentTracingIndex],
|
||||
...data,
|
||||
}
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data)
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
questionItem,
|
||||
@@ -508,19 +508,15 @@ export const useChat = (
|
||||
}
|
||||
},
|
||||
onNodeStarted: ({ data }) => {
|
||||
const currentIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.node_id === data.node_id)
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing![currentIndex] = {
|
||||
...data,
|
||||
status: NodeRunningStatus.Running,
|
||||
}
|
||||
}
|
||||
else {
|
||||
responseItem.workflowProcess!.tracing!.push({
|
||||
...data,
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
}
|
||||
if (params.loop_id)
|
||||
return
|
||||
|
||||
upsertTopLevelTracingNodeOnStart(responseItem.workflowProcess!.tracing!, {
|
||||
...data,
|
||||
status: NodeRunningStatus.Running,
|
||||
}, {
|
||||
reuseRunningNodeId: true,
|
||||
})
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
questionItem,
|
||||
@@ -539,12 +535,12 @@ export const useChat = (
|
||||
})
|
||||
},
|
||||
onNodeFinished: ({ data }) => {
|
||||
if (params.loop_id)
|
||||
return
|
||||
|
||||
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
||||
if (currentTracingIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
||||
...responseItem.workflowProcess!.tracing[currentTracingIndex],
|
||||
...data,
|
||||
}
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data)
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
questionItem,
|
||||
@@ -554,7 +550,10 @@ export const useChat = (
|
||||
}
|
||||
},
|
||||
onAgentLog: ({ data }) => {
|
||||
const currentNodeIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.node_id === data.node_id)
|
||||
const currentNodeIndex = findTracingIndexByExecutionOrUniqueNodeId(responseItem.workflowProcess!.tracing!, {
|
||||
executionId: data.node_execution_id,
|
||||
nodeId: data.node_id,
|
||||
})
|
||||
if (currentNodeIndex > -1) {
|
||||
const current = responseItem.workflowProcess!.tracing![currentNodeIndex]
|
||||
|
||||
@@ -769,7 +768,8 @@ export const useChat = (
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
|
||||
upsertTracingNodeOnResumeStart(responseItem.workflowProcess.tracing, {
|
||||
...iterationStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
@@ -780,12 +780,14 @@ export const useChat = (
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
|
||||
const iterationIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: iterationFinishedData.id,
|
||||
nodeId: iterationFinishedData.node_id,
|
||||
parallelId: iterationFinishedData.execution_metadata?.parallel_id,
|
||||
})
|
||||
if (iterationIndex > -1) {
|
||||
tracing[iterationIndex] = {
|
||||
...tracing[iterationIndex],
|
||||
...iterationFinishedData,
|
||||
...mergeTracingNodePreservingExecutionMetadata(tracing[iterationIndex], iterationFinishedData),
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
}
|
||||
@@ -798,22 +800,12 @@ export const useChat = (
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess.tracing[currentIndex] = {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (nodeStartedData.iteration_id)
|
||||
return
|
||||
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...nodeStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
}
|
||||
upsertTopLevelTracingNodeOnStart(responseItem.workflowProcess.tracing, {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
}, {
|
||||
reuseRunningNodeId: true,
|
||||
})
|
||||
})
|
||||
},
|
||||
onNodeFinished: ({ data: nodeFinishedData }) => {
|
||||
@@ -824,14 +816,17 @@ export const useChat = (
|
||||
if (nodeFinishedData.iteration_id)
|
||||
return
|
||||
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
|
||||
if (!item.execution_metadata?.parallel_id)
|
||||
return item.id === nodeFinishedData.id
|
||||
if (nodeFinishedData.loop_id)
|
||||
return
|
||||
|
||||
return item.id === nodeFinishedData.id && (item.execution_metadata?.parallel_id === nodeFinishedData.execution_metadata?.parallel_id)
|
||||
const currentIndex = findTracingIndexByExecutionOrUniqueNodeId(responseItem.workflowProcess.tracing, {
|
||||
executionId: nodeFinishedData.id,
|
||||
nodeId: nodeFinishedData.node_id,
|
||||
parallelId: nodeFinishedData.execution_metadata?.parallel_id,
|
||||
})
|
||||
if (currentIndex > -1)
|
||||
responseItem.workflowProcess.tracing[currentIndex] = nodeFinishedData as any
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess.tracing[currentIndex] = mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess.tracing[currentIndex], nodeFinishedData) as any
|
||||
}
|
||||
})
|
||||
},
|
||||
onLoopStart: ({ data: loopStartedData }) => {
|
||||
@@ -840,7 +835,8 @@ export const useChat = (
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
|
||||
upsertTracingNodeOnResumeStart(responseItem.workflowProcess.tracing, {
|
||||
...loopStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
@@ -851,12 +847,14 @@ export const useChat = (
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
|
||||
const loopIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: loopFinishedData.id,
|
||||
nodeId: loopFinishedData.node_id,
|
||||
parallelId: loopFinishedData.execution_metadata?.parallel_id,
|
||||
})
|
||||
if (loopIndex > -1) {
|
||||
tracing[loopIndex] = {
|
||||
...tracing[loopIndex],
|
||||
...loopFinishedData,
|
||||
...mergeTracingNodePreservingExecutionMetadata(tracing[loopIndex], loopFinishedData),
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
}
|
||||
|
||||
174
web/app/components/workflow/utils/top-level-tracing.spec.ts
Normal file
174
web/app/components/workflow/utils/top-level-tracing.spec.ts
Normal file
@@ -0,0 +1,174 @@
|
||||
import type { NodeTracing } from '@/types/workflow'
|
||||
import { NodeRunningStatus } from '@/app/components/workflow/types'
|
||||
import { upsertTopLevelTracingNodeOnStart } from './top-level-tracing'
|
||||
|
||||
const createTrace = (overrides: Partial<NodeTracing> = {}): NodeTracing => ({
|
||||
id: 'trace-1',
|
||||
index: 0,
|
||||
predecessor_node_id: '',
|
||||
node_id: 'node-1',
|
||||
node_type: 'llm' as NodeTracing['node_type'],
|
||||
title: 'Node 1',
|
||||
inputs: {},
|
||||
inputs_truncated: false,
|
||||
process_data: {},
|
||||
process_data_truncated: false,
|
||||
outputs: {},
|
||||
outputs_truncated: false,
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
elapsed_time: 0,
|
||||
metadata: {
|
||||
iterator_length: 0,
|
||||
iterator_index: 0,
|
||||
loop_length: 0,
|
||||
loop_index: 0,
|
||||
},
|
||||
created_at: 0,
|
||||
created_by: {
|
||||
id: 'user-1',
|
||||
name: 'User',
|
||||
email: 'user@example.com',
|
||||
},
|
||||
finished_at: 0,
|
||||
...overrides,
|
||||
})
|
||||
|
||||
describe('upsertTopLevelTracingNodeOnStart', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
})
|
||||
|
||||
it('should append a new top-level node when no matching trace exists', () => {
|
||||
const tracing: NodeTracing[] = []
|
||||
const startedNode = createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-2',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, startedNode)
|
||||
|
||||
expect(updated).toBe(true)
|
||||
expect(tracing).toEqual([startedNode])
|
||||
})
|
||||
|
||||
it('should update an existing top-level node when the execution id matches', () => {
|
||||
const tracing: NodeTracing[] = [
|
||||
createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}),
|
||||
]
|
||||
const startedNode = createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, startedNode)
|
||||
|
||||
expect(updated).toBe(true)
|
||||
expect(tracing).toEqual([startedNode])
|
||||
})
|
||||
|
||||
it('should append a new top-level node when the same node starts with a new execution id', () => {
|
||||
const existingTrace = createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
})
|
||||
const tracing: NodeTracing[] = [existingTrace]
|
||||
const startedNode = createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, startedNode)
|
||||
|
||||
expect(updated).toBe(true)
|
||||
expect(tracing).toEqual([existingTrace, startedNode])
|
||||
})
|
||||
|
||||
it('should update an existing running top-level node when the same node restarts with a new execution id', () => {
|
||||
const tracing: NodeTracing[] = [
|
||||
createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
}),
|
||||
]
|
||||
const startedNode = createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, startedNode, {
|
||||
reuseRunningNodeId: true,
|
||||
})
|
||||
|
||||
expect(updated).toBe(true)
|
||||
expect(tracing).toEqual([startedNode])
|
||||
})
|
||||
|
||||
it('should keep separate running top-level traces by default when a new execution id appears', () => {
|
||||
const existingTrace = createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
const tracing: NodeTracing[] = [existingTrace]
|
||||
const startedNode = createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, startedNode)
|
||||
|
||||
expect(updated).toBe(true)
|
||||
expect(tracing).toEqual([existingTrace, startedNode])
|
||||
})
|
||||
|
||||
it('should ignore nested iteration node starts even when the node id matches a top-level trace', () => {
|
||||
const existingTrace = createTrace({
|
||||
id: 'top-level-trace',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
})
|
||||
const tracing: NodeTracing[] = [existingTrace]
|
||||
const nestedIterationTrace = createTrace({
|
||||
id: 'iteration-trace',
|
||||
node_id: 'node-1',
|
||||
iteration_id: 'iteration-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, nestedIterationTrace)
|
||||
|
||||
expect(updated).toBe(false)
|
||||
expect(tracing).toEqual([existingTrace])
|
||||
})
|
||||
|
||||
it('should ignore nested loop node starts even when the node id matches a top-level trace', () => {
|
||||
const existingTrace = createTrace({
|
||||
id: 'top-level-trace',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
})
|
||||
const tracing: NodeTracing[] = [existingTrace]
|
||||
const nestedLoopTrace = createTrace({
|
||||
id: 'loop-trace',
|
||||
node_id: 'node-1',
|
||||
loop_id: 'loop-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, nestedLoopTrace)
|
||||
|
||||
expect(updated).toBe(false)
|
||||
expect(tracing).toEqual([existingTrace])
|
||||
})
|
||||
})
|
||||
34
web/app/components/workflow/utils/top-level-tracing.ts
Normal file
34
web/app/components/workflow/utils/top-level-tracing.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
import type { NodeTracing } from '@/types/workflow'
|
||||
import { NodeRunningStatus } from '../types'
|
||||
|
||||
const isNestedTracingNode = (trace: Pick<NodeTracing, 'iteration_id' | 'loop_id'>) => {
|
||||
return Boolean(trace.iteration_id || trace.loop_id)
|
||||
}
|
||||
|
||||
export const upsertTopLevelTracingNodeOnStart = (
|
||||
tracing: NodeTracing[],
|
||||
startedNode: NodeTracing,
|
||||
options?: {
|
||||
reuseRunningNodeId?: boolean
|
||||
},
|
||||
) => {
|
||||
if (isNestedTracingNode(startedNode))
|
||||
return false
|
||||
|
||||
const currentIndex = tracing.findIndex((item) => {
|
||||
if (item.id === startedNode.id)
|
||||
return true
|
||||
|
||||
if (!options?.reuseRunningNodeId)
|
||||
return false
|
||||
|
||||
return item.node_id === startedNode.node_id && item.status === NodeRunningStatus.Running
|
||||
})
|
||||
if (currentIndex > -1)
|
||||
// Started events are the authoritative snapshot for an execution; merging would retain stale client-side fields.
|
||||
tracing[currentIndex] = startedNode
|
||||
else
|
||||
tracing.push(startedNode)
|
||||
|
||||
return true
|
||||
}
|
||||
136
web/app/components/workflow/utils/tracing-execution.spec.ts
Normal file
136
web/app/components/workflow/utils/tracing-execution.spec.ts
Normal file
@@ -0,0 +1,136 @@
|
||||
import type { AgentLogItem, NodeTracing } from '@/types/workflow'
|
||||
import {
|
||||
findTracingIndexByExecutionOrUniqueNodeId,
|
||||
mergeTracingNodePreservingExecutionMetadata,
|
||||
upsertTracingNodeOnResumeStart,
|
||||
} from './tracing-execution'
|
||||
|
||||
const createTrace = (overrides: Partial<NodeTracing> = {}): NodeTracing => ({
|
||||
id: 'trace-1',
|
||||
index: 0,
|
||||
predecessor_node_id: '',
|
||||
node_id: 'node-1',
|
||||
node_type: 'llm' as NodeTracing['node_type'],
|
||||
title: 'Node 1',
|
||||
inputs: {},
|
||||
inputs_truncated: false,
|
||||
process_data: {},
|
||||
process_data_truncated: false,
|
||||
outputs: {},
|
||||
outputs_truncated: false,
|
||||
status: 'succeeded' as NodeTracing['status'],
|
||||
elapsed_time: 0,
|
||||
metadata: {
|
||||
iterator_length: 0,
|
||||
iterator_index: 0,
|
||||
loop_length: 0,
|
||||
loop_index: 0,
|
||||
},
|
||||
created_at: 0,
|
||||
created_by: {
|
||||
id: 'user-1',
|
||||
name: 'User',
|
||||
email: 'user@example.com',
|
||||
},
|
||||
finished_at: 0,
|
||||
...overrides,
|
||||
})
|
||||
|
||||
describe('tracing-execution utils', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
})
|
||||
|
||||
it('should prefer the exact execution id when the same node ran multiple times', () => {
|
||||
const tracing = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1' }),
|
||||
createTrace({ id: 'trace-2', node_id: 'node-1' }),
|
||||
]
|
||||
|
||||
expect(findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: 'trace-2',
|
||||
nodeId: 'node-1',
|
||||
})).toBe(1)
|
||||
})
|
||||
|
||||
it('should fall back to a unique node id when the execution id is missing', () => {
|
||||
const tracing = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1' }),
|
||||
]
|
||||
|
||||
expect(findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: 'missing-trace',
|
||||
nodeId: 'node-1',
|
||||
})).toBe(0)
|
||||
})
|
||||
|
||||
it('should not fall back to node id when multiple executions exist', () => {
|
||||
const tracing = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1' }),
|
||||
createTrace({ id: 'trace-2', node_id: 'node-1' }),
|
||||
]
|
||||
|
||||
expect(findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: 'missing-trace',
|
||||
nodeId: 'node-1',
|
||||
})).toBe(-1)
|
||||
})
|
||||
|
||||
it('should merge into an existing resume trace instead of appending a duplicate', () => {
|
||||
const tracing: NodeTracing[] = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1', title: 'old title' }),
|
||||
]
|
||||
|
||||
upsertTracingNodeOnResumeStart(tracing, createTrace({ node_id: 'node-1', title: 'new title' }))
|
||||
|
||||
expect(tracing).toHaveLength(1)
|
||||
expect(tracing[0].id).toBe('trace-1')
|
||||
expect(tracing[0].title).toBe('new title')
|
||||
})
|
||||
|
||||
it('should append a new trace when a new execution id appears', () => {
|
||||
const tracing: NodeTracing[] = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1' }),
|
||||
]
|
||||
|
||||
upsertTracingNodeOnResumeStart(tracing, createTrace({ id: 'trace-2', node_id: 'node-1', title: 'second run' }))
|
||||
|
||||
expect(tracing).toHaveLength(2)
|
||||
expect(tracing[1].id).toBe('trace-2')
|
||||
})
|
||||
|
||||
it('should preserve agent logs when merging finish metadata', () => {
|
||||
const agentLogItem: AgentLogItem = {
|
||||
node_execution_id: 'trace-1',
|
||||
message_id: 'm-1',
|
||||
node_id: 'node-1',
|
||||
label: 'tool',
|
||||
data: {},
|
||||
status: 'success',
|
||||
}
|
||||
|
||||
const currentNode = createTrace({
|
||||
execution_metadata: {
|
||||
total_tokens: 1,
|
||||
total_price: 0,
|
||||
currency: 'USD',
|
||||
agent_log: [agentLogItem],
|
||||
parallel_id: 'p-1',
|
||||
},
|
||||
})
|
||||
|
||||
const mergedNode = mergeTracingNodePreservingExecutionMetadata(currentNode, {
|
||||
status: 'succeeded' as NodeTracing['status'],
|
||||
execution_metadata: {
|
||||
total_tokens: 2,
|
||||
total_price: 1,
|
||||
currency: 'USD',
|
||||
parallel_id: 'p-1',
|
||||
extra: 'value',
|
||||
} as NodeTracing['execution_metadata'],
|
||||
})
|
||||
|
||||
expect(mergedNode.execution_metadata?.agent_log).toEqual([agentLogItem])
|
||||
expect((mergedNode.execution_metadata as Record<string, unknown>).extra).toBe('value')
|
||||
})
|
||||
})
|
||||
76
web/app/components/workflow/utils/tracing-execution.ts
Normal file
76
web/app/components/workflow/utils/tracing-execution.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
import type { NodeTracing } from '@/types/workflow'
|
||||
|
||||
type TracingLookup = {
|
||||
executionId?: string
|
||||
nodeId?: string
|
||||
parallelId?: string
|
||||
allowNodeIdFallbackWhenExecutionIdMissing?: boolean
|
||||
}
|
||||
|
||||
const getParallelId = (trace: Partial<NodeTracing>) => {
|
||||
return trace.execution_metadata?.parallel_id || trace.parallel_id
|
||||
}
|
||||
|
||||
export const findTracingIndexByExecutionOrUniqueNodeId = (
|
||||
tracing: Partial<NodeTracing>[],
|
||||
{ executionId, nodeId, parallelId, allowNodeIdFallbackWhenExecutionIdMissing = true }: TracingLookup,
|
||||
) => {
|
||||
if (executionId) {
|
||||
const exactIndex = tracing.findIndex(item => item.id === executionId)
|
||||
if (exactIndex > -1)
|
||||
return exactIndex
|
||||
|
||||
if (!allowNodeIdFallbackWhenExecutionIdMissing)
|
||||
return -1
|
||||
}
|
||||
|
||||
if (!nodeId)
|
||||
return -1
|
||||
|
||||
const candidates = tracing
|
||||
.map((item, index) => ({ item, index }))
|
||||
.filter(({ item }) => item.node_id === nodeId)
|
||||
.filter(({ item }) => !parallelId || getParallelId(item) === parallelId)
|
||||
|
||||
return candidates.length === 1 ? candidates[0].index : -1
|
||||
}
|
||||
|
||||
export const upsertTracingNodeOnResumeStart = (
|
||||
tracing: NodeTracing[],
|
||||
startedNode: NodeTracing,
|
||||
) => {
|
||||
const currentIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: startedNode.id,
|
||||
nodeId: startedNode.node_id,
|
||||
parallelId: getParallelId(startedNode),
|
||||
allowNodeIdFallbackWhenExecutionIdMissing: false,
|
||||
})
|
||||
|
||||
if (currentIndex > -1) {
|
||||
tracing[currentIndex] = {
|
||||
...tracing[currentIndex],
|
||||
...startedNode,
|
||||
}
|
||||
return currentIndex
|
||||
}
|
||||
|
||||
tracing.push(startedNode)
|
||||
return tracing.length - 1
|
||||
}
|
||||
|
||||
export const mergeTracingNodePreservingExecutionMetadata = (
|
||||
currentNode: NodeTracing,
|
||||
incomingNode: Partial<NodeTracing>,
|
||||
): NodeTracing => {
|
||||
return {
|
||||
...currentNode,
|
||||
...incomingNode,
|
||||
execution_metadata: incomingNode.execution_metadata
|
||||
? {
|
||||
...currentNode.execution_metadata,
|
||||
...incomingNode.execution_metadata,
|
||||
agent_log: incomingNode.execution_metadata.agent_log ?? currentNode.execution_metadata?.agent_log,
|
||||
}
|
||||
: currentNode.execution_metadata,
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "dify-web",
|
||||
"type": "module",
|
||||
"version": "1.13.3",
|
||||
"version": "1.13.2",
|
||||
"private": true,
|
||||
"packageManager": "pnpm@10.32.1",
|
||||
"imports": {
|
||||
|
||||
Reference in New Issue
Block a user