Compare commits

...

13 Commits

Author SHA1 Message Date
jyong
62bf286220 Merge remote-tracking branch 'origin/feat/evaluation' into feat/evaluation
# Conflicts:
#	api/controllers/console/evaluation/evaluation.py
2026-03-03 16:04:59 +08:00
jyong
a3cf1a18a3 evaluation runtime 2026-03-03 16:01:13 +08:00
FFXN
2382d51136 feat: evaluation. 2026-03-03 11:41:19 +08:00
FFXN
eebd7763a5 Merge remote-tracking branch 'origin/main' into feat/evaluation 2026-03-02 14:46:31 +08:00
FFXN
1ce0610c4c feat: Inject "Start" node for snippet before running the whole snippet workflow. 2026-02-14 13:28:30 +08:00
FFXN
b2b0be6b8a Merge remote-tracking branch 'origin/feat/evaluation' into feat/evaluation
# Conflicts:
#	api/controllers/console/snippets/payloads.py
#	api/controllers/console/snippets/snippet_workflow.py
#	api/services/snippet_service.py
2026-02-14 09:55:19 +08:00
FFXN
fb4584b776 feat: Features about running and debugging snippets. 2026-02-14 09:50:34 +08:00
FFXN
632d93f475 feat: Implement the APIs of downloading evaluation dataset template file and downloading evaluation dataset file/evaluation result file. 2026-02-14 09:50:34 +08:00
jyong
36dc948520 evaluation 2026-02-14 09:50:34 +08:00
jyong
bad6fb3470 evaluations 2026-02-14 09:50:34 +08:00
FFXN
a49504bd5b feat: Implement the APIs of downloading evaluation dataset template file and downloading evaluation dataset file/evaluation result file. 2026-02-12 13:32:43 +08:00
jyong
3dfc797645 evaluation 2026-02-11 16:56:30 +08:00
jyong
bea428e308 evaluations 2026-01-30 17:35:36 +08:00
36 changed files with 4991 additions and 0 deletions

View File

@@ -1366,6 +1366,32 @@ class SandboxExpiredRecordsCleanConfig(BaseSettings):
)
class EvaluationConfig(BaseSettings):
"""
Configuration for evaluation runtime
"""
EVALUATION_FRAMEWORK: str = Field(
description="Evaluation framework to use (ragas/deepeval/none)",
default="none",
)
EVALUATION_MAX_CONCURRENT_RUNS: PositiveInt = Field(
description="Maximum number of concurrent evaluation runs per tenant",
default=3,
)
EVALUATION_MAX_DATASET_ROWS: PositiveInt = Field(
description="Maximum number of rows allowed in an evaluation dataset",
default=1000,
)
EVALUATION_TASK_TIMEOUT: PositiveInt = Field(
description="Timeout in seconds for a single evaluation task",
default=3600,
)
class FeatureConfig(
# place the configs in alphabet order
AppExecutionConfig,
@@ -1378,6 +1404,7 @@ class FeatureConfig(
MarketplaceConfig,
DataSetConfig,
EndpointConfig,
EvaluationConfig,
FileAccessConfig,
FileUploadConfig,
HttpConfig,

View File

@@ -116,6 +116,12 @@ from .explore import (
trial,
)
# Import evaluation controllers
from .evaluation import evaluation
# Import snippet controllers
from .snippets import snippet_workflow
# Import tag controllers
from .tag import tags
@@ -129,6 +135,7 @@ from .workspace import (
model_providers,
models,
plugin,
snippets,
tool_providers,
trigger_providers,
workspace,
@@ -166,6 +173,7 @@ __all__ = [
"datasource_content_preview",
"email_register",
"endpoint",
"evaluation",
"extension",
"external",
"feature",
@@ -199,6 +207,8 @@ __all__ = [
"saved_message",
"setup",
"site",
"snippet_workflow",
"snippets",
"spec",
"statistic",
"tags",

View File

@@ -0,0 +1 @@
# Evaluation controller module

View File

@@ -0,0 +1,589 @@
from __future__ import annotations
import logging
from collections.abc import Callable
from functools import wraps
from typing import TYPE_CHECKING, ParamSpec, TypeVar, Union
from urllib.parse import quote
from flask import Response, request
from flask_restx import Resource, fields
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.orm import Session
from werkzeug.exceptions import BadRequest, NotFound
from controllers.common.schema import register_schema_models
from controllers.console import console_ns
from controllers.console.wraps import (
account_initialization_required,
edit_permission_required,
setup_required,
)
from core.evaluation.entities.evaluation_entity import EvaluationCategory
from core.workflow.file import helpers as file_helpers
from extensions.ext_database import db
from libs.helper import TimestampField
from libs.login import current_account_with_tenant, login_required
from models import App
from models.model import UploadFile
from models.snippet import CustomizedSnippet
from services.errors.evaluation import (
EvaluationDatasetInvalidError,
EvaluationFrameworkNotConfiguredError,
EvaluationMaxConcurrentRunsError,
EvaluationNotFoundError,
)
from services.evaluation_service import EvaluationService
if TYPE_CHECKING:
from models.evaluation import EvaluationRun, EvaluationRunItem
logger = logging.getLogger(__name__)
P = ParamSpec("P")
R = TypeVar("R")
# Valid evaluation target types
EVALUATE_TARGET_TYPES = {"app", "snippets"}
class VersionQuery(BaseModel):
"""Query parameters for version endpoint."""
version: str
register_schema_models(
console_ns,
VersionQuery,
)
# Response field definitions
file_info_fields = {
"id": fields.String,
"name": fields.String,
}
evaluation_log_fields = {
"created_at": TimestampField,
"created_by": fields.String,
"test_file": fields.Nested(
console_ns.model(
"EvaluationTestFile",
file_info_fields,
)
),
"result_file": fields.Nested(
console_ns.model(
"EvaluationResultFile",
file_info_fields,
),
allow_null=True,
),
"version": fields.String,
}
evaluation_log_list_model = console_ns.model(
"EvaluationLogList",
{
"data": fields.List(fields.Nested(console_ns.model("EvaluationLog", evaluation_log_fields))),
},
)
customized_matrix_fields = {
"evaluation_workflow_id": fields.String,
"input_fields": fields.Raw,
"output_fields": fields.Raw,
}
condition_fields = {
"name": fields.List(fields.String),
"comparison_operator": fields.String,
"value": fields.String,
}
judgement_conditions_fields = {
"logical_operator": fields.String,
"conditions": fields.List(fields.Nested(console_ns.model("EvaluationCondition", condition_fields))),
}
evaluation_detail_fields = {
"evaluation_model": fields.String,
"evaluation_model_provider": fields.String,
"customized_matrix": fields.Nested(
console_ns.model("EvaluationCustomizedMatrix", customized_matrix_fields),
allow_null=True,
),
"judgement_conditions": fields.Nested(
console_ns.model("EvaluationJudgementConditions", judgement_conditions_fields),
allow_null=True,
),
}
evaluation_detail_model = console_ns.model("EvaluationDetail", evaluation_detail_fields)
def get_evaluation_target(view_func: Callable[P, R]):
"""
Decorator to resolve polymorphic evaluation target (app or snippet).
Validates the target_type parameter and fetches the corresponding
model (App or CustomizedSnippet) with tenant isolation.
"""
@wraps(view_func)
def decorated_view(*args: P.args, **kwargs: P.kwargs):
target_type = kwargs.get("evaluate_target_type")
target_id = kwargs.get("evaluate_target_id")
if target_type not in EVALUATE_TARGET_TYPES:
raise NotFound(f"Invalid evaluation target type: {target_type}")
_, current_tenant_id = current_account_with_tenant()
target_id = str(target_id)
# Remove path parameters
del kwargs["evaluate_target_type"]
del kwargs["evaluate_target_id"]
target: Union[App, CustomizedSnippet] | None = None
if target_type == "app":
target = (
db.session.query(App).where(App.id == target_id, App.tenant_id == current_tenant_id).first()
)
elif target_type == "snippets":
target = (
db.session.query(CustomizedSnippet)
.where(CustomizedSnippet.id == target_id, CustomizedSnippet.tenant_id == current_tenant_id)
.first()
)
if not target:
raise NotFound(f"{str(target_type)} not found")
kwargs["target"] = target
kwargs["target_type"] = target_type
return view_func(*args, **kwargs)
return decorated_view
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/dataset-template/download")
class EvaluationDatasetTemplateDownloadApi(Resource):
@console_ns.doc("download_evaluation_dataset_template")
@console_ns.response(200, "Template file streamed as XLSX attachment")
@console_ns.response(400, "Invalid target type or excluded app mode")
@console_ns.response(404, "Target not found")
@setup_required
@login_required
@account_initialization_required
@get_evaluation_target
@edit_permission_required
def post(self, target: Union[App, CustomizedSnippet], target_type: str):
"""
Download evaluation dataset template.
Generates an XLSX template based on the target's input parameters
and streams it directly as a file attachment.
"""
try:
xlsx_content, filename = EvaluationService.generate_dataset_template(
target=target,
target_type=target_type,
)
except ValueError as e:
return {"message": str(e)}, 400
encoded_filename = quote(filename)
response = Response(
xlsx_content,
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
response.headers["Content-Disposition"] = f"attachment; filename*=UTF-8''{encoded_filename}"
response.headers["Content-Length"] = str(len(xlsx_content))
return response
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation")
class EvaluationDetailApi(Resource):
@console_ns.doc("get_evaluation_detail")
@console_ns.response(200, "Evaluation details retrieved successfully", evaluation_detail_model)
@console_ns.response(404, "Target not found")
@setup_required
@login_required
@account_initialization_required
@get_evaluation_target
def get(self, target: Union[App, CustomizedSnippet], target_type: str):
"""
Get evaluation configuration for the target.
Returns evaluation configuration including model settings,
metrics config, and judgement conditions.
"""
_, current_tenant_id = current_account_with_tenant()
with Session(db.engine, expire_on_commit=False) as session:
config = EvaluationService.get_evaluation_config(
session, current_tenant_id, target_type, str(target.id)
)
if config is None:
return {
"evaluation_model": None,
"evaluation_model_provider": None,
"metrics_config": None,
"judgement_conditions": None,
}
return {
"evaluation_model": config.evaluation_model,
"evaluation_model_provider": config.evaluation_model_provider,
"metrics_config": config.metrics_config_dict,
"judgement_conditions": config.judgement_conditions_dict,
}
@console_ns.doc("save_evaluation_detail")
@console_ns.response(200, "Evaluation configuration saved successfully")
@console_ns.response(404, "Target not found")
@setup_required
@login_required
@account_initialization_required
@get_evaluation_target
@edit_permission_required
def put(self, target: Union[App, CustomizedSnippet], target_type: str):
"""
Save evaluation configuration for the target.
"""
current_account, current_tenant_id = current_account_with_tenant()
data = request.get_json(force=True)
with Session(db.engine, expire_on_commit=False) as session:
config = EvaluationService.save_evaluation_config(
session=session,
tenant_id=current_tenant_id,
target_type=target_type,
target_id=str(target.id),
account_id=str(current_account.id),
data=data,
)
return {
"evaluation_model": config.evaluation_model,
"evaluation_model_provider": config.evaluation_model_provider,
"metrics_config": config.metrics_config_dict,
"judgement_conditions": config.judgement_conditions_dict,
}
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/logs")
class EvaluationLogsApi(Resource):
@console_ns.doc("get_evaluation_logs")
@console_ns.response(200, "Evaluation logs retrieved successfully")
@console_ns.response(404, "Target not found")
@setup_required
@login_required
@account_initialization_required
@get_evaluation_target
def get(self, target: Union[App, CustomizedSnippet], target_type: str):
"""
Get evaluation run history for the target.
Returns a paginated list of evaluation runs.
"""
_, current_tenant_id = current_account_with_tenant()
page = request.args.get("page", 1, type=int)
page_size = request.args.get("page_size", 20, type=int)
with Session(db.engine, expire_on_commit=False) as session:
runs, total = EvaluationService.get_evaluation_runs(
session=session,
tenant_id=current_tenant_id,
target_type=target_type,
target_id=str(target.id),
page=page,
page_size=page_size,
)
return {
"data": [_serialize_evaluation_run(run) for run in runs],
"total": total,
"page": page,
"page_size": page_size,
}
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/run")
class EvaluationRunApi(Resource):
@console_ns.doc("start_evaluation_run")
@console_ns.response(200, "Evaluation run started")
@console_ns.response(400, "Invalid request")
@console_ns.response(404, "Target not found")
@setup_required
@login_required
@account_initialization_required
@get_evaluation_target
@edit_permission_required
def post(self, target: Union[App, CustomizedSnippet], target_type: str):
"""
Start an evaluation run.
Expects multipart form data with:
- file: XLSX dataset file
- evaluation_category: one of llm, retrieval, agent, workflow
"""
current_account, current_tenant_id = current_account_with_tenant()
# Validate file upload
if "file" not in request.files:
raise BadRequest("Dataset file is required.")
file = request.files["file"]
if not file.filename or not file.filename.endswith(".xlsx"):
raise BadRequest("Dataset file must be an XLSX file.")
dataset_content = file.read()
if not dataset_content:
raise BadRequest("Dataset file is empty.")
# Validate evaluation category
category_str = request.form.get("evaluation_category", "llm")
try:
evaluation_category = EvaluationCategory(category_str)
except ValueError:
raise BadRequest(
f"Invalid evaluation_category: {category_str}. "
f"Must be one of: {', '.join(e.value for e in EvaluationCategory)}"
)
try:
with Session(db.engine, expire_on_commit=False) as session:
evaluation_run = EvaluationService.start_evaluation_run(
session=session,
tenant_id=current_tenant_id,
target_type=target_type,
target_id=str(target.id),
account_id=str(current_account.id),
dataset_file_content=dataset_content,
evaluation_category=evaluation_category,
)
return _serialize_evaluation_run(evaluation_run), 200
except EvaluationFrameworkNotConfiguredError as e:
return {"message": str(e.description)}, 400
except EvaluationNotFoundError as e:
return {"message": str(e.description)}, 404
except EvaluationMaxConcurrentRunsError as e:
return {"message": str(e.description)}, 429
except EvaluationDatasetInvalidError as e:
return {"message": str(e.description)}, 400
@console_ns.route(
"/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/runs/<uuid:run_id>"
)
class EvaluationRunDetailApi(Resource):
@console_ns.doc("get_evaluation_run_detail")
@console_ns.response(200, "Evaluation run detail retrieved")
@console_ns.response(404, "Run not found")
@setup_required
@login_required
@account_initialization_required
@get_evaluation_target
def get(self, target: Union[App, CustomizedSnippet], target_type: str, run_id: str):
"""
Get evaluation run detail including items.
"""
_, current_tenant_id = current_account_with_tenant()
run_id = str(run_id)
page = request.args.get("page", 1, type=int)
page_size = request.args.get("page_size", 50, type=int)
try:
with Session(db.engine, expire_on_commit=False) as session:
run = EvaluationService.get_evaluation_run_detail(
session=session,
tenant_id=current_tenant_id,
run_id=run_id,
)
items, total_items = EvaluationService.get_evaluation_run_items(
session=session,
run_id=run_id,
page=page,
page_size=page_size,
)
return {
"run": _serialize_evaluation_run(run),
"items": {
"data": [_serialize_evaluation_run_item(item) for item in items],
"total": total_items,
"page": page,
"page_size": page_size,
},
}
except EvaluationNotFoundError as e:
return {"message": str(e.description)}, 404
@console_ns.route(
"/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/runs/<uuid:run_id>/cancel"
)
class EvaluationRunCancelApi(Resource):
@console_ns.doc("cancel_evaluation_run")
@console_ns.response(200, "Evaluation run cancelled")
@console_ns.response(404, "Run not found")
@setup_required
@login_required
@account_initialization_required
@get_evaluation_target
@edit_permission_required
def post(self, target: Union[App, CustomizedSnippet], target_type: str, run_id: str):
"""Cancel a running evaluation."""
_, current_tenant_id = current_account_with_tenant()
run_id = str(run_id)
try:
with Session(db.engine, expire_on_commit=False) as session:
run = EvaluationService.cancel_evaluation_run(
session=session,
tenant_id=current_tenant_id,
run_id=run_id,
)
return _serialize_evaluation_run(run)
except EvaluationNotFoundError as e:
return {"message": str(e.description)}, 404
except ValueError as e:
return {"message": str(e)}, 400
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/metrics")
class EvaluationMetricsApi(Resource):
@console_ns.doc("get_evaluation_metrics")
@console_ns.response(200, "Available metrics retrieved")
@setup_required
@login_required
@account_initialization_required
@get_evaluation_target
def get(self, target: Union[App, CustomizedSnippet], target_type: str):
"""
Get available evaluation metrics for the current framework.
"""
result = {}
for category in EvaluationCategory:
result[category.value] = EvaluationService.get_supported_metrics(category)
return {"metrics": result}
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/files/<uuid:file_id>")
class EvaluationFileDownloadApi(Resource):
@console_ns.doc("download_evaluation_file")
@console_ns.response(200, "File download URL generated successfully")
@console_ns.response(404, "Target or file not found")
@setup_required
@login_required
@account_initialization_required
@get_evaluation_target
def get(self, target: Union[App, CustomizedSnippet], target_type: str, file_id: str):
"""
Download evaluation test file or result file.
Looks up the specified file, verifies it belongs to the same tenant,
and returns file info and download URL.
"""
file_id = str(file_id)
_, current_tenant_id = current_account_with_tenant()
with Session(db.engine, expire_on_commit=False) as session:
stmt = select(UploadFile).where(
UploadFile.id == file_id,
UploadFile.tenant_id == current_tenant_id,
)
upload_file = session.execute(stmt).scalar_one_or_none()
if not upload_file:
raise NotFound("File not found")
download_url = file_helpers.get_signed_file_url(upload_file_id=upload_file.id, as_attachment=True)
return {
"id": upload_file.id,
"name": upload_file.name,
"size": upload_file.size,
"extension": upload_file.extension,
"mime_type": upload_file.mime_type,
"created_at": int(upload_file.created_at.timestamp()) if upload_file.created_at else None,
"download_url": download_url,
}
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/version")
class EvaluationVersionApi(Resource):
@console_ns.doc("get_evaluation_version_detail")
@console_ns.expect(console_ns.models.get(VersionQuery.__name__))
@console_ns.response(200, "Version details retrieved successfully")
@console_ns.response(404, "Target or version not found")
@setup_required
@login_required
@account_initialization_required
@get_evaluation_target
def get(self, target: Union[App, CustomizedSnippet], target_type: str):
"""
Get evaluation target version details.
Returns the workflow graph for the specified version.
"""
version = request.args.get("version")
if not version:
return {"message": "version parameter is required"}, 400
graph = {}
if target_type == "snippets" and isinstance(target, CustomizedSnippet):
graph = target.graph_dict
return {
"graph": graph,
}
# ---- Serialization Helpers ----
def _serialize_evaluation_run(run: EvaluationRun) -> dict[str, object]:
return {
"id": run.id,
"tenant_id": run.tenant_id,
"target_type": run.target_type,
"target_id": run.target_id,
"evaluation_config_id": run.evaluation_config_id,
"status": run.status,
"dataset_file_id": run.dataset_file_id,
"result_file_id": run.result_file_id,
"total_items": run.total_items,
"completed_items": run.completed_items,
"failed_items": run.failed_items,
"progress": run.progress,
"metrics_summary": run.metrics_summary_dict,
"error": run.error,
"created_by": run.created_by,
"started_at": int(run.started_at.timestamp()) if run.started_at else None,
"completed_at": int(run.completed_at.timestamp()) if run.completed_at else None,
"created_at": int(run.created_at.timestamp()) if run.created_at else None,
}
def _serialize_evaluation_run_item(item: EvaluationRunItem) -> dict[str, object]:
return {
"id": item.id,
"item_index": item.item_index,
"inputs": item.inputs_dict,
"expected_output": item.expected_output,
"actual_output": item.actual_output,
"metrics": item.metrics_list,
"metadata": item.metadata_dict,
"error": item.error,
"overall_score": item.overall_score,
}

View File

@@ -0,0 +1,102 @@
from typing import Any, Literal
from pydantic import BaseModel, Field
class SnippetListQuery(BaseModel):
"""Query parameters for listing snippets."""
page: int = Field(default=1, ge=1, le=99999)
limit: int = Field(default=20, ge=1, le=100)
keyword: str | None = None
class IconInfo(BaseModel):
"""Icon information model."""
icon: str | None = None
icon_type: Literal["emoji", "image"] | None = None
icon_background: str | None = None
icon_url: str | None = None
class InputFieldDefinition(BaseModel):
"""Input field definition for snippet parameters."""
default: str | None = None
hint: bool | None = None
label: str | None = None
max_length: int | None = None
options: list[str] | None = None
placeholder: str | None = None
required: bool | None = None
type: str | None = None # e.g., "text-input"
class CreateSnippetPayload(BaseModel):
"""Payload for creating a new snippet."""
name: str = Field(..., min_length=1, max_length=255)
description: str | None = Field(default=None, max_length=2000)
type: Literal["node", "group"] = "node"
icon_info: IconInfo | None = None
graph: dict[str, Any] | None = None
input_fields: list[InputFieldDefinition] | None = Field(default_factory=list)
class UpdateSnippetPayload(BaseModel):
"""Payload for updating a snippet."""
name: str | None = Field(default=None, min_length=1, max_length=255)
description: str | None = Field(default=None, max_length=2000)
icon_info: IconInfo | None = None
class SnippetDraftSyncPayload(BaseModel):
"""Payload for syncing snippet draft workflow."""
graph: dict[str, Any]
hash: str | None = None
environment_variables: list[dict[str, Any]] | None = None
conversation_variables: list[dict[str, Any]] | None = None
input_variables: list[dict[str, Any]] | None = None
class WorkflowRunQuery(BaseModel):
"""Query parameters for workflow runs."""
last_id: str | None = None
limit: int = Field(default=20, ge=1, le=100)
class SnippetDraftRunPayload(BaseModel):
"""Payload for running snippet draft workflow."""
inputs: dict[str, Any]
files: list[dict[str, Any]] | None = None
class SnippetDraftNodeRunPayload(BaseModel):
"""Payload for running a single node in snippet draft workflow."""
inputs: dict[str, Any]
query: str = ""
files: list[dict[str, Any]] | None = None
class SnippetIterationNodeRunPayload(BaseModel):
"""Payload for running an iteration node in snippet draft workflow."""
inputs: dict[str, Any] | None = None
class SnippetLoopNodeRunPayload(BaseModel):
"""Payload for running a loop node in snippet draft workflow."""
inputs: dict[str, Any] | None = None
class PublishWorkflowPayload(BaseModel):
"""Payload for publishing snippet workflow."""
knowledge_base_setting: dict[str, Any] | None = None

View File

@@ -0,0 +1,540 @@
import logging
from collections.abc import Callable
from functools import wraps
from typing import ParamSpec, TypeVar
from flask import request
from flask_restx import Resource, marshal_with
from sqlalchemy.orm import Session
from werkzeug.exceptions import InternalServerError, NotFound
from controllers.common.schema import register_schema_models
from controllers.console import console_ns
from controllers.console.app.error import DraftWorkflowNotExist, DraftWorkflowNotSync
from controllers.console.app.workflow import workflow_model
from controllers.console.app.workflow_run import (
workflow_run_detail_model,
workflow_run_node_execution_list_model,
workflow_run_node_execution_model,
workflow_run_pagination_model,
)
from controllers.console.snippets.payloads import (
PublishWorkflowPayload,
SnippetDraftNodeRunPayload,
SnippetDraftRunPayload,
SnippetDraftSyncPayload,
SnippetIterationNodeRunPayload,
SnippetLoopNodeRunPayload,
WorkflowRunQuery,
)
from controllers.console.wraps import (
account_initialization_required,
edit_permission_required,
setup_required,
)
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.graph_engine.manager import GraphEngineManager
from extensions.ext_database import db
from factories import variable_factory
from libs import helper
from libs.helper import TimestampField
from libs.login import current_account_with_tenant, login_required
from models.snippet import CustomizedSnippet
from services.errors.app import WorkflowHashNotEqualError
from services.snippet_generate_service import SnippetGenerateService
from services.snippet_service import SnippetService
logger = logging.getLogger(__name__)
P = ParamSpec("P")
R = TypeVar("R")
# Register Pydantic models with Swagger
register_schema_models(
console_ns,
SnippetDraftSyncPayload,
SnippetDraftNodeRunPayload,
SnippetDraftRunPayload,
SnippetIterationNodeRunPayload,
SnippetLoopNodeRunPayload,
WorkflowRunQuery,
PublishWorkflowPayload,
)
class SnippetNotFoundError(Exception):
"""Snippet not found error."""
pass
def get_snippet(view_func: Callable[P, R]):
"""Decorator to fetch and validate snippet access."""
@wraps(view_func)
def decorated_view(*args: P.args, **kwargs: P.kwargs):
if not kwargs.get("snippet_id"):
raise ValueError("missing snippet_id in path parameters")
_, current_tenant_id = current_account_with_tenant()
snippet_id = str(kwargs.get("snippet_id"))
del kwargs["snippet_id"]
snippet = SnippetService.get_snippet_by_id(
snippet_id=snippet_id,
tenant_id=current_tenant_id,
)
if not snippet:
raise NotFound("Snippet not found")
kwargs["snippet"] = snippet
return view_func(*args, **kwargs)
return decorated_view
@console_ns.route("/snippets/<uuid:snippet_id>/workflows/draft")
class SnippetDraftWorkflowApi(Resource):
@console_ns.doc("get_snippet_draft_workflow")
@console_ns.response(200, "Draft workflow retrieved successfully", workflow_model)
@console_ns.response(404, "Snippet or draft workflow not found")
@setup_required
@login_required
@account_initialization_required
@get_snippet
@edit_permission_required
@marshal_with(workflow_model)
def get(self, snippet: CustomizedSnippet):
"""Get draft workflow for snippet."""
snippet_service = SnippetService()
workflow = snippet_service.get_draft_workflow(snippet=snippet)
if not workflow:
raise DraftWorkflowNotExist()
return workflow
@console_ns.doc("sync_snippet_draft_workflow")
@console_ns.expect(console_ns.models.get(SnippetDraftSyncPayload.__name__))
@console_ns.response(200, "Draft workflow synced successfully")
@console_ns.response(400, "Hash mismatch")
@setup_required
@login_required
@account_initialization_required
@get_snippet
@edit_permission_required
def post(self, snippet: CustomizedSnippet):
"""Sync draft workflow for snippet."""
current_user, _ = current_account_with_tenant()
payload = SnippetDraftSyncPayload.model_validate(console_ns.payload or {})
try:
environment_variables_list = payload.environment_variables or []
environment_variables = [
variable_factory.build_environment_variable_from_mapping(obj) for obj in environment_variables_list
]
conversation_variables_list = payload.conversation_variables or []
conversation_variables = [
variable_factory.build_conversation_variable_from_mapping(obj) for obj in conversation_variables_list
]
snippet_service = SnippetService()
workflow = snippet_service.sync_draft_workflow(
snippet=snippet,
graph=payload.graph,
unique_hash=payload.hash,
account=current_user,
environment_variables=environment_variables,
conversation_variables=conversation_variables,
input_variables=payload.input_variables,
)
except WorkflowHashNotEqualError:
raise DraftWorkflowNotSync()
return {
"result": "success",
"hash": workflow.unique_hash,
"updated_at": TimestampField().format(workflow.updated_at or workflow.created_at),
}
@console_ns.route("/snippets/<uuid:snippet_id>/workflows/draft/config")
class SnippetDraftConfigApi(Resource):
@console_ns.doc("get_snippet_draft_config")
@console_ns.response(200, "Draft config retrieved successfully")
@setup_required
@login_required
@account_initialization_required
@get_snippet
@edit_permission_required
def get(self, snippet: CustomizedSnippet):
"""Get snippet draft workflow configuration limits."""
return {
"parallel_depth_limit": 3,
}
@console_ns.route("/snippets/<uuid:snippet_id>/workflows/publish")
class SnippetPublishedWorkflowApi(Resource):
@console_ns.doc("get_snippet_published_workflow")
@console_ns.response(200, "Published workflow retrieved successfully", workflow_model)
@console_ns.response(404, "Snippet not found")
@setup_required
@login_required
@account_initialization_required
@get_snippet
@edit_permission_required
@marshal_with(workflow_model)
def get(self, snippet: CustomizedSnippet):
"""Get published workflow for snippet."""
if not snippet.is_published:
return None
snippet_service = SnippetService()
workflow = snippet_service.get_published_workflow(snippet=snippet)
return workflow
@console_ns.doc("publish_snippet_workflow")
@console_ns.expect(console_ns.models.get(PublishWorkflowPayload.__name__))
@console_ns.response(200, "Workflow published successfully")
@console_ns.response(400, "No draft workflow found")
@setup_required
@login_required
@account_initialization_required
@get_snippet
@edit_permission_required
def post(self, snippet: CustomizedSnippet):
"""Publish snippet workflow."""
current_user, _ = current_account_with_tenant()
snippet_service = SnippetService()
with Session(db.engine) as session:
snippet = session.merge(snippet)
try:
workflow = snippet_service.publish_workflow(
session=session,
snippet=snippet,
account=current_user,
)
workflow_created_at = TimestampField().format(workflow.created_at)
session.commit()
except ValueError as e:
return {"message": str(e)}, 400
return {
"result": "success",
"created_at": workflow_created_at,
}
@console_ns.route("/snippets/<uuid:snippet_id>/workflows/default-workflow-block-configs")
class SnippetDefaultBlockConfigsApi(Resource):
@console_ns.doc("get_snippet_default_block_configs")
@console_ns.response(200, "Default block configs retrieved successfully")
@setup_required
@login_required
@account_initialization_required
@get_snippet
@edit_permission_required
def get(self, snippet: CustomizedSnippet):
"""Get default block configurations for snippet workflow."""
snippet_service = SnippetService()
return snippet_service.get_default_block_configs()
@console_ns.route("/snippets/<uuid:snippet_id>/workflow-runs")
class SnippetWorkflowRunsApi(Resource):
@console_ns.doc("list_snippet_workflow_runs")
@console_ns.response(200, "Workflow runs retrieved successfully", workflow_run_pagination_model)
@setup_required
@login_required
@account_initialization_required
@get_snippet
@marshal_with(workflow_run_pagination_model)
def get(self, snippet: CustomizedSnippet):
"""List workflow runs for snippet."""
query = WorkflowRunQuery.model_validate(
{
"last_id": request.args.get("last_id"),
"limit": request.args.get("limit", type=int, default=20),
}
)
args = {
"last_id": query.last_id,
"limit": query.limit,
}
snippet_service = SnippetService()
result = snippet_service.get_snippet_workflow_runs(snippet=snippet, args=args)
return result
@console_ns.route("/snippets/<uuid:snippet_id>/workflow-runs/<uuid:run_id>")
class SnippetWorkflowRunDetailApi(Resource):
@console_ns.doc("get_snippet_workflow_run_detail")
@console_ns.response(200, "Workflow run detail retrieved successfully", workflow_run_detail_model)
@console_ns.response(404, "Workflow run not found")
@setup_required
@login_required
@account_initialization_required
@get_snippet
@marshal_with(workflow_run_detail_model)
def get(self, snippet: CustomizedSnippet, run_id):
"""Get workflow run detail for snippet."""
run_id = str(run_id)
snippet_service = SnippetService()
workflow_run = snippet_service.get_snippet_workflow_run(snippet=snippet, run_id=run_id)
if not workflow_run:
raise NotFound("Workflow run not found")
return workflow_run
@console_ns.route("/snippets/<uuid:snippet_id>/workflow-runs/<uuid:run_id>/node-executions")
class SnippetWorkflowRunNodeExecutionsApi(Resource):
@console_ns.doc("list_snippet_workflow_run_node_executions")
@console_ns.response(200, "Node executions retrieved successfully", workflow_run_node_execution_list_model)
@setup_required
@login_required
@account_initialization_required
@get_snippet
@marshal_with(workflow_run_node_execution_list_model)
def get(self, snippet: CustomizedSnippet, run_id):
"""List node executions for a workflow run."""
run_id = str(run_id)
snippet_service = SnippetService()
node_executions = snippet_service.get_snippet_workflow_run_node_executions(
snippet=snippet,
run_id=run_id,
)
return {"data": node_executions}
@console_ns.route("/snippets/<uuid:snippet_id>/workflows/draft/nodes/<string:node_id>/run")
class SnippetDraftNodeRunApi(Resource):
@console_ns.doc("run_snippet_draft_node")
@console_ns.doc(description="Run a single node in snippet draft workflow (single-step debugging)")
@console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"})
@console_ns.expect(console_ns.models.get(SnippetDraftNodeRunPayload.__name__))
@console_ns.response(200, "Node run completed successfully", workflow_run_node_execution_model)
@console_ns.response(404, "Snippet or draft workflow not found")
@setup_required
@login_required
@account_initialization_required
@get_snippet
@marshal_with(workflow_run_node_execution_model)
@edit_permission_required
def post(self, snippet: CustomizedSnippet, node_id: str):
"""
Run a single node in snippet draft workflow.
Executes a specific node with provided inputs for single-step debugging.
Returns the node execution result including status, outputs, and timing.
"""
current_user, _ = current_account_with_tenant()
payload = SnippetDraftNodeRunPayload.model_validate(console_ns.payload or {})
user_inputs = payload.inputs
# Get draft workflow for file parsing
snippet_service = SnippetService()
draft_workflow = snippet_service.get_draft_workflow(snippet=snippet)
if not draft_workflow:
raise NotFound("Draft workflow not found")
files = SnippetGenerateService.parse_files(draft_workflow, payload.files)
workflow_node_execution = SnippetGenerateService.run_draft_node(
snippet=snippet,
node_id=node_id,
user_inputs=user_inputs,
account=current_user,
query=payload.query,
files=files,
)
return workflow_node_execution
@console_ns.route("/snippets/<uuid:snippet_id>/workflows/draft/nodes/<string:node_id>/last-run")
class SnippetDraftNodeLastRunApi(Resource):
@console_ns.doc("get_snippet_draft_node_last_run")
@console_ns.doc(description="Get last run result for a node in snippet draft workflow")
@console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"})
@console_ns.response(200, "Node last run retrieved successfully", workflow_run_node_execution_model)
@console_ns.response(404, "Snippet, draft workflow, or node last run not found")
@setup_required
@login_required
@account_initialization_required
@get_snippet
@marshal_with(workflow_run_node_execution_model)
def get(self, snippet: CustomizedSnippet, node_id: str):
"""
Get the last run result for a specific node in snippet draft workflow.
Returns the most recent execution record for the given node,
including status, inputs, outputs, and timing information.
"""
snippet_service = SnippetService()
draft_workflow = snippet_service.get_draft_workflow(snippet=snippet)
if not draft_workflow:
raise NotFound("Draft workflow not found")
node_exec = snippet_service.get_snippet_node_last_run(
snippet=snippet,
workflow=draft_workflow,
node_id=node_id,
)
if node_exec is None:
raise NotFound("Node last run not found")
return node_exec
@console_ns.route("/snippets/<uuid:snippet_id>/workflows/draft/iteration/nodes/<string:node_id>/run")
class SnippetDraftRunIterationNodeApi(Resource):
@console_ns.doc("run_snippet_draft_iteration_node")
@console_ns.doc(description="Run draft workflow iteration node for snippet")
@console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"})
@console_ns.expect(console_ns.models.get(SnippetIterationNodeRunPayload.__name__))
@console_ns.response(200, "Iteration node run started successfully (SSE stream)")
@console_ns.response(404, "Snippet or draft workflow not found")
@setup_required
@login_required
@account_initialization_required
@get_snippet
@edit_permission_required
def post(self, snippet: CustomizedSnippet, node_id: str):
"""
Run a draft workflow iteration node for snippet.
Iteration nodes execute their internal sub-graph multiple times over an input list.
Returns an SSE event stream with iteration progress and results.
"""
current_user, _ = current_account_with_tenant()
args = SnippetIterationNodeRunPayload.model_validate(console_ns.payload or {}).model_dump(exclude_none=True)
try:
response = SnippetGenerateService.generate_single_iteration(
snippet=snippet, user=current_user, node_id=node_id, args=args, streaming=True
)
return helper.compact_generate_response(response)
except ValueError as e:
raise e
except Exception:
logger.exception("internal server error.")
raise InternalServerError()
@console_ns.route("/snippets/<uuid:snippet_id>/workflows/draft/loop/nodes/<string:node_id>/run")
class SnippetDraftRunLoopNodeApi(Resource):
@console_ns.doc("run_snippet_draft_loop_node")
@console_ns.doc(description="Run draft workflow loop node for snippet")
@console_ns.doc(params={"snippet_id": "Snippet ID", "node_id": "Node ID"})
@console_ns.expect(console_ns.models.get(SnippetLoopNodeRunPayload.__name__))
@console_ns.response(200, "Loop node run started successfully (SSE stream)")
@console_ns.response(404, "Snippet or draft workflow not found")
@setup_required
@login_required
@account_initialization_required
@get_snippet
@edit_permission_required
def post(self, snippet: CustomizedSnippet, node_id: str):
"""
Run a draft workflow loop node for snippet.
Loop nodes execute their internal sub-graph repeatedly until a condition is met.
Returns an SSE event stream with loop progress and results.
"""
current_user, _ = current_account_with_tenant()
args = SnippetLoopNodeRunPayload.model_validate(console_ns.payload or {})
try:
response = SnippetGenerateService.generate_single_loop(
snippet=snippet, user=current_user, node_id=node_id, args=args, streaming=True
)
return helper.compact_generate_response(response)
except ValueError as e:
raise e
except Exception:
logger.exception("internal server error.")
raise InternalServerError()
@console_ns.route("/snippets/<uuid:snippet_id>/workflows/draft/run")
class SnippetDraftWorkflowRunApi(Resource):
@console_ns.doc("run_snippet_draft_workflow")
@console_ns.expect(console_ns.models.get(SnippetDraftRunPayload.__name__))
@console_ns.response(200, "Draft workflow run started successfully (SSE stream)")
@console_ns.response(404, "Snippet or draft workflow not found")
@setup_required
@login_required
@account_initialization_required
@get_snippet
@edit_permission_required
def post(self, snippet: CustomizedSnippet):
"""
Run draft workflow for snippet.
Executes the snippet's draft workflow with the provided inputs
and returns an SSE event stream with execution progress and results.
"""
current_user, _ = current_account_with_tenant()
payload = SnippetDraftRunPayload.model_validate(console_ns.payload or {})
args = payload.model_dump(exclude_none=True)
try:
response = SnippetGenerateService.generate(
snippet=snippet,
user=current_user,
args=args,
invoke_from=InvokeFrom.DEBUGGER,
streaming=True,
)
return helper.compact_generate_response(response)
except ValueError as e:
raise e
except Exception:
logger.exception("internal server error.")
raise InternalServerError()
@console_ns.route("/snippets/<uuid:snippet_id>/workflow-runs/tasks/<string:task_id>/stop")
class SnippetWorkflowTaskStopApi(Resource):
@console_ns.doc("stop_snippet_workflow_task")
@console_ns.response(200, "Task stopped successfully")
@console_ns.response(404, "Snippet not found")
@setup_required
@login_required
@account_initialization_required
@get_snippet
@edit_permission_required
def post(self, snippet: CustomizedSnippet, task_id: str):
"""
Stop a running snippet workflow task.
Uses both the legacy stop flag mechanism and the graph engine
command channel for backward compatibility.
"""
# Stop using both mechanisms for backward compatibility
# Legacy stop flag mechanism (without user check)
AppQueueManager.set_stop_flag_no_user_check(task_id)
# New graph engine command channel mechanism
GraphEngineManager.send_stop_command(task_id)
return {"result": "success"}

View File

@@ -0,0 +1,202 @@
import logging
from flask import request
from flask_restx import Resource, marshal, marshal_with
from sqlalchemy.orm import Session
from werkzeug.exceptions import NotFound
from controllers.common.schema import register_schema_models
from controllers.console import console_ns
from controllers.console.snippets.payloads import (
CreateSnippetPayload,
SnippetListQuery,
UpdateSnippetPayload,
)
from controllers.console.wraps import (
account_initialization_required,
edit_permission_required,
setup_required,
)
from extensions.ext_database import db
from fields.snippet_fields import snippet_fields, snippet_list_fields, snippet_pagination_fields
from libs.login import current_account_with_tenant, login_required
from models.snippet import SnippetType
from services.snippet_service import SnippetService
logger = logging.getLogger(__name__)
# Register Pydantic models with Swagger
register_schema_models(
console_ns,
SnippetListQuery,
CreateSnippetPayload,
UpdateSnippetPayload,
)
# Create namespace models for marshaling
snippet_model = console_ns.model("Snippet", snippet_fields)
snippet_list_model = console_ns.model("SnippetList", snippet_list_fields)
snippet_pagination_model = console_ns.model("SnippetPagination", snippet_pagination_fields)
@console_ns.route("/workspaces/current/customized-snippets")
class CustomizedSnippetsApi(Resource):
@console_ns.doc("list_customized_snippets")
@console_ns.expect(console_ns.models.get(SnippetListQuery.__name__))
@console_ns.response(200, "Snippets retrieved successfully", snippet_pagination_model)
@setup_required
@login_required
@account_initialization_required
def get(self):
"""List customized snippets with pagination and search."""
_, current_tenant_id = current_account_with_tenant()
query_params = request.args.to_dict()
query = SnippetListQuery.model_validate(query_params)
snippets, total, has_more = SnippetService.get_snippets(
tenant_id=current_tenant_id,
page=query.page,
limit=query.limit,
keyword=query.keyword,
)
return {
"data": marshal(snippets, snippet_list_fields),
"page": query.page,
"limit": query.limit,
"total": total,
"has_more": has_more,
}, 200
@console_ns.doc("create_customized_snippet")
@console_ns.expect(console_ns.models.get(CreateSnippetPayload.__name__))
@console_ns.response(201, "Snippet created successfully", snippet_model)
@console_ns.response(400, "Invalid request or name already exists")
@setup_required
@login_required
@account_initialization_required
@edit_permission_required
def post(self):
"""Create a new customized snippet."""
current_user, current_tenant_id = current_account_with_tenant()
payload = CreateSnippetPayload.model_validate(console_ns.payload or {})
try:
snippet_type = SnippetType(payload.type)
except ValueError:
snippet_type = SnippetType.NODE
try:
snippet = SnippetService.create_snippet(
tenant_id=current_tenant_id,
name=payload.name,
description=payload.description,
snippet_type=snippet_type,
icon_info=payload.icon_info.model_dump() if payload.icon_info else None,
graph=payload.graph,
input_fields=[f.model_dump() for f in payload.input_fields] if payload.input_fields else None,
account=current_user,
)
except ValueError as e:
return {"message": str(e)}, 400
return marshal(snippet, snippet_fields), 201
@console_ns.route("/workspaces/current/customized-snippets/<uuid:snippet_id>")
class CustomizedSnippetDetailApi(Resource):
@console_ns.doc("get_customized_snippet")
@console_ns.response(200, "Snippet retrieved successfully", snippet_model)
@console_ns.response(404, "Snippet not found")
@setup_required
@login_required
@account_initialization_required
def get(self, snippet_id: str):
"""Get customized snippet details."""
_, current_tenant_id = current_account_with_tenant()
snippet = SnippetService.get_snippet_by_id(
snippet_id=str(snippet_id),
tenant_id=current_tenant_id,
)
if not snippet:
raise NotFound("Snippet not found")
return marshal(snippet, snippet_fields), 200
@console_ns.doc("update_customized_snippet")
@console_ns.expect(console_ns.models.get(UpdateSnippetPayload.__name__))
@console_ns.response(200, "Snippet updated successfully", snippet_model)
@console_ns.response(400, "Invalid request or name already exists")
@console_ns.response(404, "Snippet not found")
@setup_required
@login_required
@account_initialization_required
@edit_permission_required
def patch(self, snippet_id: str):
"""Update customized snippet."""
current_user, current_tenant_id = current_account_with_tenant()
snippet = SnippetService.get_snippet_by_id(
snippet_id=str(snippet_id),
tenant_id=current_tenant_id,
)
if not snippet:
raise NotFound("Snippet not found")
payload = UpdateSnippetPayload.model_validate(console_ns.payload or {})
update_data = payload.model_dump(exclude_unset=True)
if "icon_info" in update_data and update_data["icon_info"] is not None:
update_data["icon_info"] = payload.icon_info.model_dump() if payload.icon_info else None
if not update_data:
return {"message": "No valid fields to update"}, 400
try:
with Session(db.engine, expire_on_commit=False) as session:
snippet = session.merge(snippet)
snippet = SnippetService.update_snippet(
session=session,
snippet=snippet,
account_id=current_user.id,
data=update_data,
)
session.commit()
except ValueError as e:
return {"message": str(e)}, 400
return marshal(snippet, snippet_fields), 200
@console_ns.doc("delete_customized_snippet")
@console_ns.response(204, "Snippet deleted successfully")
@console_ns.response(404, "Snippet not found")
@setup_required
@login_required
@account_initialization_required
@edit_permission_required
def delete(self, snippet_id: str):
"""Delete customized snippet."""
_, current_tenant_id = current_account_with_tenant()
snippet = SnippetService.get_snippet_by_id(
snippet_id=str(snippet_id),
tenant_id=current_tenant_id,
)
if not snippet:
raise NotFound("Snippet not found")
with Session(db.engine) as session:
snippet = session.merge(snippet)
SnippetService.delete_snippet(
session=session,
snippet=snippet,
)
session.commit()
return "", 204

View File

View File

@@ -0,0 +1,64 @@
from abc import ABC, abstractmethod
from core.evaluation.entities.evaluation_entity import (
EvaluationCategory,
EvaluationItemInput,
EvaluationItemResult,
)
class BaseEvaluationInstance(ABC):
"""Abstract base class for evaluation framework adapters."""
@abstractmethod
def evaluate_llm(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
"""Evaluate LLM outputs using the configured framework."""
...
@abstractmethod
def evaluate_retrieval(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
"""Evaluate retrieval quality using the configured framework."""
...
@abstractmethod
def evaluate_agent(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
"""Evaluate agent outputs using the configured framework."""
...
@abstractmethod
def evaluate_workflow(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
"""Evaluate workflow outputs using the configured framework."""
...
@abstractmethod
def get_supported_metrics(self, category: EvaluationCategory) -> list[str]:
"""Return the list of supported metric names for a given evaluation category."""
...

View File

View File

@@ -0,0 +1,19 @@
from enum import StrEnum
from pydantic import BaseModel
class EvaluationFrameworkEnum(StrEnum):
RAGAS = "ragas"
DEEPEVAL = "deepeval"
NONE = "none"
class BaseEvaluationConfig(BaseModel):
"""Base configuration for evaluation frameworks."""
pass
class RagasConfig(BaseEvaluationConfig):
"""RAGAS-specific configuration."""
pass

View File

@@ -0,0 +1,52 @@
from enum import StrEnum
from typing import Any
from pydantic import BaseModel, Field
class EvaluationCategory(StrEnum):
LLM = "llm"
RETRIEVAL = "retrieval"
AGENT = "agent"
WORKFLOW = "workflow"
class EvaluationMetric(BaseModel):
name: str
score: float
details: dict[str, Any] = Field(default_factory=dict)
class EvaluationItemInput(BaseModel):
index: int
inputs: dict[str, Any]
expected_output: str | None = None
context: list[str] | None = None
class EvaluationItemResult(BaseModel):
index: int
actual_output: str | None = None
metrics: list[EvaluationMetric] = Field(default_factory=list)
metadata: dict[str, Any] = Field(default_factory=dict)
error: str | None = None
@property
def overall_score(self) -> float | None:
if not self.metrics:
return None
scores = [m.score for m in self.metrics]
return sum(scores) / len(scores)
class EvaluationRunData(BaseModel):
"""Serializable data for Celery task."""
evaluation_run_id: str
tenant_id: str
target_type: str
target_id: str
evaluation_category: EvaluationCategory
evaluation_model_provider: str
evaluation_model: str
metrics_config: dict[str, Any] = Field(default_factory=dict)
items: list[EvaluationItemInput]

View File

@@ -0,0 +1,61 @@
import collections
import logging
from typing import Any
from configs import dify_config
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
from core.evaluation.entities.config_entity import EvaluationFrameworkEnum
from core.evaluation.entities.evaluation_entity import EvaluationCategory
logger = logging.getLogger(__name__)
class EvaluationFrameworkConfigMap(collections.UserDict[str, dict[str, Any]]):
"""Registry mapping framework enum -> {config_class, evaluator_class}."""
def __getitem__(self, framework: str) -> dict[str, Any]:
match framework:
case EvaluationFrameworkEnum.RAGAS:
from core.evaluation.entities.config_entity import RagasConfig
from core.evaluation.frameworks.ragas.ragas_evaluator import RagasEvaluator
return {
"config_class": RagasConfig,
"evaluator_class": RagasEvaluator,
}
case EvaluationFrameworkEnum.DEEPEVAL:
raise NotImplementedError("DeepEval adapter is not yet implemented.")
case _:
raise ValueError(f"Unknown evaluation framework: {framework}")
evaluation_framework_config_map = EvaluationFrameworkConfigMap()
class EvaluationManager:
"""Factory for evaluation instances based on global configuration."""
@staticmethod
def get_evaluation_instance() -> BaseEvaluationInstance | None:
"""Create and return an evaluation instance based on EVALUATION_FRAMEWORK env var."""
framework = dify_config.EVALUATION_FRAMEWORK
if not framework or framework == EvaluationFrameworkEnum.NONE:
return None
try:
config_map = evaluation_framework_config_map[framework]
evaluator_class = config_map["evaluator_class"]
config_class = config_map["config_class"]
config = config_class()
return evaluator_class(config)
except Exception:
logger.exception("Failed to create evaluation instance for framework: %s", framework)
return None
@staticmethod
def get_supported_metrics(category: EvaluationCategory) -> list[str]:
"""Return supported metrics for the current framework and given category."""
instance = EvaluationManager.get_evaluation_instance()
if instance is None:
return []
return instance.get_supported_metrics(category)

View File

@@ -0,0 +1,279 @@
import logging
from typing import Any
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
from core.evaluation.entities.config_entity import RagasConfig
from core.evaluation.entities.evaluation_entity import (
EvaluationCategory,
EvaluationItemInput,
EvaluationItemResult,
EvaluationMetric,
)
from core.evaluation.frameworks.ragas.ragas_model_wrapper import DifyModelWrapper
logger = logging.getLogger(__name__)
# Metric name mappings per category
LLM_METRICS = ["faithfulness", "answer_relevancy", "answer_correctness", "answer_similarity"]
RETRIEVAL_METRICS = ["context_precision", "context_recall", "context_relevancy"]
AGENT_METRICS = ["tool_call_accuracy", "answer_correctness"]
WORKFLOW_METRICS = ["faithfulness", "answer_correctness"]
class RagasEvaluator(BaseEvaluationInstance):
"""RAGAS framework adapter for evaluation."""
def __init__(self, config: RagasConfig):
self.config = config
def get_supported_metrics(self, category: EvaluationCategory) -> list[str]:
match category:
case EvaluationCategory.LLM:
return LLM_METRICS
case EvaluationCategory.RETRIEVAL:
return RETRIEVAL_METRICS
case EvaluationCategory.AGENT:
return AGENT_METRICS
case EvaluationCategory.WORKFLOW:
return WORKFLOW_METRICS
case _:
return []
def evaluate_llm(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(items, metrics_config, model_provider, model_name, tenant_id, EvaluationCategory.LLM)
def evaluate_retrieval(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(
items, metrics_config, model_provider, model_name, tenant_id, EvaluationCategory.RETRIEVAL
)
def evaluate_agent(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(items, metrics_config, model_provider, model_name, tenant_id, EvaluationCategory.AGENT)
def evaluate_workflow(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(
items, metrics_config, model_provider, model_name, tenant_id, EvaluationCategory.WORKFLOW
)
def _evaluate(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
category: EvaluationCategory,
) -> list[EvaluationItemResult]:
"""Core evaluation logic using RAGAS.
Uses the Dify model wrapper as judge LLM. Falls back to simple
string similarity if RAGAS import fails.
"""
model_wrapper = DifyModelWrapper(model_provider, model_name, tenant_id)
requested_metrics = metrics_config.get("metrics", self.get_supported_metrics(category))
try:
return self._evaluate_with_ragas(items, requested_metrics, model_wrapper, category)
except ImportError:
logger.warning("RAGAS not installed, falling back to simple evaluation")
return self._evaluate_simple(items, requested_metrics, model_wrapper)
def _evaluate_with_ragas(
self,
items: list[EvaluationItemInput],
requested_metrics: list[str],
model_wrapper: DifyModelWrapper,
category: EvaluationCategory,
) -> list[EvaluationItemResult]:
"""Evaluate using RAGAS library."""
from ragas import evaluate as ragas_evaluate
from ragas.dataset_schema import EvaluationDataset, SingleTurnSample
from ragas.llms import LangchainLLMWrapper
from ragas.metrics import (
Faithfulness,
ResponseRelevancy,
)
# Build RAGAS dataset
samples = []
for item in items:
sample = SingleTurnSample(
user_input=self._inputs_to_query(item.inputs),
response=item.expected_output or "",
retrieved_contexts=item.context or [],
)
if item.expected_output:
sample.reference = item.expected_output
samples.append(sample)
dataset = EvaluationDataset(samples=samples)
# Build metric instances
ragas_metrics = self._build_ragas_metrics(requested_metrics)
if not ragas_metrics:
logger.warning("No valid RAGAS metrics found for: %s", requested_metrics)
return [EvaluationItemResult(index=item.index) for item in items]
# Run RAGAS evaluation
try:
result = ragas_evaluate(
dataset=dataset,
metrics=ragas_metrics,
)
# Convert RAGAS results to our format
results = []
result_df = result.to_pandas()
for i, item in enumerate(items):
metrics = []
for metric_name in requested_metrics:
if metric_name in result_df.columns:
score = result_df.iloc[i][metric_name]
if score is not None and not (isinstance(score, float) and score != score): # NaN check
metrics.append(EvaluationMetric(name=metric_name, score=float(score)))
results.append(EvaluationItemResult(index=item.index, metrics=metrics))
return results
except Exception:
logger.exception("RAGAS evaluation failed, falling back to simple evaluation")
return self._evaluate_simple(items, requested_metrics, model_wrapper)
def _evaluate_simple(
self,
items: list[EvaluationItemInput],
requested_metrics: list[str],
model_wrapper: DifyModelWrapper,
) -> list[EvaluationItemResult]:
"""Simple LLM-as-judge fallback when RAGAS is not available."""
results = []
for item in items:
metrics = []
query = self._inputs_to_query(item.inputs)
for metric_name in requested_metrics:
try:
score = self._judge_with_llm(model_wrapper, metric_name, query, item)
metrics.append(EvaluationMetric(name=metric_name, score=score))
except Exception:
logger.exception("Failed to compute metric %s for item %d", metric_name, item.index)
results.append(EvaluationItemResult(index=item.index, metrics=metrics))
return results
def _judge_with_llm(
self,
model_wrapper: DifyModelWrapper,
metric_name: str,
query: str,
item: EvaluationItemInput,
) -> float:
"""Use the LLM to judge a single metric for a single item."""
prompt = self._build_judge_prompt(metric_name, query, item)
response = model_wrapper.invoke(prompt)
return self._parse_score(response)
def _build_judge_prompt(self, metric_name: str, query: str, item: EvaluationItemInput) -> str:
"""Build a scoring prompt for the LLM judge."""
parts = [
f"Evaluate the following on the metric '{metric_name}' using a scale of 0.0 to 1.0.",
f"\nQuery: {query}",
]
if item.expected_output:
parts.append(f"\nExpected Output: {item.expected_output}")
if item.context:
parts.append(f"\nContext: {'; '.join(item.context)}")
parts.append(
"\nRespond with ONLY a single floating point number between 0.0 and 1.0, nothing else."
)
return "\n".join(parts)
@staticmethod
def _parse_score(response: str) -> float:
"""Parse a float score from LLM response."""
cleaned = response.strip()
try:
score = float(cleaned)
return max(0.0, min(1.0, score))
except ValueError:
# Try to extract first number from response
import re
match = re.search(r"(\d+\.?\d*)", cleaned)
if match:
score = float(match.group(1))
return max(0.0, min(1.0, score))
return 0.0
@staticmethod
def _inputs_to_query(inputs: dict[str, Any]) -> str:
"""Convert input dict to a query string."""
if "query" in inputs:
return str(inputs["query"])
if "question" in inputs:
return str(inputs["question"])
# Fallback: concatenate all input values
return " ".join(str(v) for v in inputs.values())
@staticmethod
def _build_ragas_metrics(requested_metrics: list[str]) -> list[Any]:
"""Build RAGAS metric instances from metric names."""
try:
from ragas.metrics import (
AnswerCorrectness,
AnswerRelevancy,
AnswerSimilarity,
ContextPrecision,
ContextRecall,
ContextRelevancy,
Faithfulness,
)
metric_map: dict[str, Any] = {
"faithfulness": Faithfulness,
"answer_relevancy": AnswerRelevancy,
"answer_correctness": AnswerCorrectness,
"answer_similarity": AnswerSimilarity,
"context_precision": ContextPrecision,
"context_recall": ContextRecall,
"context_relevancy": ContextRelevancy,
}
metrics = []
for name in requested_metrics:
metric_class = metric_map.get(name)
if metric_class:
metrics.append(metric_class())
else:
logger.warning("Unknown RAGAS metric: %s", name)
return metrics
except ImportError:
logger.warning("RAGAS metrics not available")
return []

View File

@@ -0,0 +1,48 @@
import logging
from typing import Any
logger = logging.getLogger(__name__)
class DifyModelWrapper:
"""Wraps Dify's model invocation interface for use by RAGAS as an LLM judge.
RAGAS requires an LLM to compute certain metrics (faithfulness, answer_relevancy, etc.).
This wrapper bridges Dify's ModelInstance to a callable that RAGAS can use.
"""
def __init__(self, model_provider: str, model_name: str, tenant_id: str):
self.model_provider = model_provider
self.model_name = model_name
self.tenant_id = tenant_id
def _get_model_instance(self) -> Any:
from core.model_manager import ModelManager
from core.model_runtime.entities.model_entities import ModelType
model_manager = ModelManager()
model_instance = model_manager.get_model_instance(
tenant_id=self.tenant_id,
provider=self.model_provider,
model_type=ModelType.LLM,
model=self.model_name,
)
return model_instance
def invoke(self, prompt: str) -> str:
"""Invoke the model with a text prompt and return the text response."""
from core.model_runtime.entities.message_entities import (
SystemPromptMessage,
UserPromptMessage,
)
model_instance = self._get_model_instance()
result = model_instance.invoke_llm(
prompt_messages=[
SystemPromptMessage(content="You are an evaluation judge. Answer precisely and concisely."),
UserPromptMessage(content=prompt),
],
model_parameters={"temperature": 0.0, "max_tokens": 2048},
stream=False,
)
return result.message.content

View File

@@ -0,0 +1,32 @@
from sqlalchemy import select
from sqlalchemy.orm import Session
from models import Account, App, TenantAccountJoin
def get_service_account_for_app(session: Session, app_id: str) -> Account:
"""Get the creator account for an app with tenant context set up.
This follows the same pattern as BaseTraceInstance.get_service_account_with_tenant().
"""
app = session.scalar(select(App).where(App.id == app_id))
if not app:
raise ValueError(f"App with id {app_id} not found")
if not app.created_by:
raise ValueError(f"App with id {app_id} has no creator")
account = session.scalar(select(Account).where(Account.id == app.created_by))
if not account:
raise ValueError(f"Creator account not found for app {app_id}")
current_tenant = (
session.query(TenantAccountJoin)
.filter_by(account_id=account.id, current=True)
.first()
)
if not current_tenant:
raise ValueError(f"Current tenant not found for account {account.id}")
account.set_tenant_id(current_tenant.tenant_id)
return account

View File

@@ -0,0 +1,152 @@
import logging
from typing import Any, Mapping, Union
from sqlalchemy.orm import Session
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
from core.evaluation.entities.evaluation_entity import (
EvaluationItemInput,
EvaluationItemResult,
)
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
from models.model import App, AppMode
logger = logging.getLogger(__name__)
class AgentEvaluationRunner(BaseEvaluationRunner):
"""Runner for agent evaluation: executes agent-type App, collects tool calls and final output."""
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
super().__init__(evaluation_instance, session)
def execute_target(
self,
tenant_id: str,
target_id: str,
target_type: str,
item: EvaluationItemInput,
) -> EvaluationItemResult:
"""Execute agent app and collect response with tool call information."""
from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator
from core.evaluation.runners import get_service_account_for_app
from core.app.entities.app_invoke_entities import InvokeFrom
app = self.session.query(App).filter_by(id=target_id).first()
if not app:
raise ValueError(f"App {target_id} not found")
service_account = get_service_account_for_app(self.session, target_id)
query = self._extract_query(item.inputs)
args: dict[str, Any] = {
"inputs": item.inputs,
"query": query,
}
generator = AgentChatAppGenerator()
# Agent chat requires streaming - collect full response
response_generator = generator.generate(
app_model=app,
user=service_account,
args=args,
invoke_from=InvokeFrom.SERVICE_API,
streaming=True,
)
# Consume the stream to get the full response
actual_output, tool_calls = self._consume_agent_stream(response_generator)
return EvaluationItemResult(
index=item.index,
actual_output=actual_output,
metadata={"tool_calls": tool_calls},
)
def evaluate_metrics(
self,
items: list[EvaluationItemInput],
results: list[EvaluationItemResult],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
"""Compute agent evaluation metrics."""
result_by_index = {r.index: r for r in results}
merged_items = []
for item in items:
result = result_by_index.get(item.index)
context = []
if result and result.actual_output:
context.append(result.actual_output)
merged_items.append(
EvaluationItemInput(
index=item.index,
inputs=item.inputs,
expected_output=item.expected_output,
context=context + (item.context or []),
)
)
evaluated = self.evaluation_instance.evaluate_agent(
merged_items, metrics_config, model_provider, model_name, tenant_id
)
# Merge metrics back preserving metadata
eval_by_index = {r.index: r for r in evaluated}
final_results = []
for result in results:
if result.index in eval_by_index:
eval_result = eval_by_index[result.index]
final_results.append(
EvaluationItemResult(
index=result.index,
actual_output=result.actual_output,
metrics=eval_result.metrics,
metadata=result.metadata,
error=result.error,
)
)
else:
final_results.append(result)
return final_results
@staticmethod
def _extract_query(inputs: dict[str, Any]) -> str:
for key in ("query", "question", "input", "text"):
if key in inputs:
return str(inputs[key])
values = list(inputs.values())
return str(values[0]) if values else ""
@staticmethod
def _consume_agent_stream(response_generator: Any) -> tuple[str, list[dict]]:
"""Consume agent streaming response and extract final answer + tool calls."""
answer_parts: list[str] = []
tool_calls: list[dict] = []
try:
for chunk in response_generator:
if isinstance(chunk, Mapping):
event = chunk.get("event")
if event == "agent_thought":
thought = chunk.get("thought", "")
if thought:
answer_parts.append(thought)
tool = chunk.get("tool")
if tool:
tool_calls.append({
"tool": tool,
"tool_input": chunk.get("tool_input", ""),
})
elif event == "message":
answer = chunk.get("answer", "")
if answer:
answer_parts.append(answer)
elif isinstance(chunk, str):
answer_parts.append(chunk)
except Exception:
logger.exception("Error consuming agent stream")
return "".join(answer_parts), tool_calls

View File

@@ -0,0 +1,130 @@
import json
import logging
from abc import ABC, abstractmethod
from sqlalchemy.orm import Session
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
from core.evaluation.entities.evaluation_entity import (
EvaluationItemInput,
EvaluationItemResult,
)
from libs.datetime_utils import naive_utc_now
from models.evaluation import EvaluationRun, EvaluationRunItem, EvaluationRunStatus
logger = logging.getLogger(__name__)
class BaseEvaluationRunner(ABC):
"""Abstract base class for evaluation runners.
Runners are responsible for executing the target (App/Snippet/Retrieval)
to collect actual outputs, then delegating to the evaluation instance
for metric computation.
"""
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
self.evaluation_instance = evaluation_instance
self.session = session
@abstractmethod
def execute_target(
self,
tenant_id: str,
target_id: str,
target_type: str,
item: EvaluationItemInput,
) -> EvaluationItemResult:
"""Execute the evaluation target for a single item and return the result with actual_output populated."""
...
@abstractmethod
def evaluate_metrics(
self,
items: list[EvaluationItemInput],
results: list[EvaluationItemResult],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
"""Compute evaluation metrics on the collected results."""
...
def run(
self,
evaluation_run_id: str,
tenant_id: str,
target_id: str,
target_type: str,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
) -> list[EvaluationItemResult]:
"""Orchestrate target execution + metric evaluation for all items."""
evaluation_run = self.session.query(EvaluationRun).filter_by(id=evaluation_run_id).first()
if not evaluation_run:
raise ValueError(f"EvaluationRun {evaluation_run_id} not found")
# Update status to running
evaluation_run.status = EvaluationRunStatus.RUNNING
evaluation_run.started_at = naive_utc_now()
self.session.commit()
results: list[EvaluationItemResult] = []
# Phase 1: Execute target for each item
for item in items:
try:
result = self.execute_target(tenant_id, target_id, target_type, item)
results.append(result)
evaluation_run.completed_items += 1
except Exception as e:
logger.exception("Failed to execute target for item %d", item.index)
results.append(
EvaluationItemResult(
index=item.index,
error=str(e),
)
)
evaluation_run.failed_items += 1
self.session.commit()
# Phase 2: Compute metrics on successful results
successful_items = [item for item, result in zip(items, results) if result.error is None]
successful_results = [r for r in results if r.error is None]
if successful_items and successful_results:
try:
evaluated_results = self.evaluate_metrics(
successful_items, successful_results, metrics_config, model_provider, model_name, tenant_id
)
# Merge evaluated metrics back into results
evaluated_by_index = {r.index: r for r in evaluated_results}
for i, result in enumerate(results):
if result.index in evaluated_by_index:
results[i] = evaluated_by_index[result.index]
except Exception:
logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id)
# Phase 3: Persist individual items
for result in results:
item_input = next((item for item in items if item.index == result.index), None)
run_item = EvaluationRunItem(
evaluation_run_id=evaluation_run_id,
item_index=result.index,
inputs=json.dumps(item_input.inputs) if item_input else None,
expected_output=item_input.expected_output if item_input else None,
context=json.dumps(item_input.context) if item_input and item_input.context else None,
actual_output=result.actual_output,
metrics=json.dumps([m.model_dump() for m in result.metrics]) if result.metrics else None,
metadata_json=json.dumps(result.metadata) if result.metadata else None,
error=result.error,
overall_score=result.overall_score,
)
self.session.add(run_item)
self.session.commit()
return results

View File

@@ -0,0 +1,152 @@
import logging
from typing import Any, Mapping, Union
from sqlalchemy.orm import Session
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
from core.evaluation.entities.evaluation_entity import (
EvaluationItemInput,
EvaluationItemResult,
)
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
from models.model import App, AppMode
logger = logging.getLogger(__name__)
class LLMEvaluationRunner(BaseEvaluationRunner):
"""Runner for LLM evaluation: executes App to get responses, then evaluates."""
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
super().__init__(evaluation_instance, session)
def execute_target(
self,
tenant_id: str,
target_id: str,
target_type: str,
item: EvaluationItemInput,
) -> EvaluationItemResult:
"""Execute the App/Snippet with the given inputs and collect the response."""
from core.app.apps.completion.app_generator import CompletionAppGenerator
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.evaluation.runners import get_service_account_for_app
from core.app.entities.app_invoke_entities import InvokeFrom
from services.workflow_service import WorkflowService
app = self.session.query(App).filter_by(id=target_id).first()
if not app:
raise ValueError(f"App {target_id} not found")
# Get a service account for invocation
service_account = get_service_account_for_app(self.session, target_id)
app_mode = AppMode.value_of(app.mode)
# Build args from evaluation item inputs
args: dict[str, Any] = {
"inputs": item.inputs,
}
# For completion/chat modes, first text input becomes query
if app_mode in (AppMode.COMPLETION, AppMode.CHAT):
query = self._extract_query(item.inputs)
args["query"] = query
if app_mode in (AppMode.WORKFLOW, AppMode.ADVANCED_CHAT):
workflow_service = WorkflowService()
workflow = workflow_service.get_published_workflow(app_model=app)
if not workflow:
raise ValueError(f"No published workflow found for app {target_id}")
generator = WorkflowAppGenerator()
response: Mapping[str, Any] = generator.generate(
app_model=app,
workflow=workflow,
user=service_account,
args=args,
invoke_from=InvokeFrom.SERVICE_API,
streaming=False,
)
elif app_mode == AppMode.COMPLETION:
generator = CompletionAppGenerator()
response = generator.generate(
app_model=app,
user=service_account,
args=args,
invoke_from=InvokeFrom.SERVICE_API,
streaming=False,
)
else:
raise ValueError(f"Unsupported app mode for LLM evaluation: {app_mode}")
actual_output = self._extract_output(response)
return EvaluationItemResult(
index=item.index,
actual_output=actual_output,
)
def evaluate_metrics(
self,
items: list[EvaluationItemInput],
results: list[EvaluationItemResult],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
"""Use the evaluation instance to compute LLM metrics."""
# Merge actual_output into items for evaluation
merged_items = self._merge_results_into_items(items, results)
return self.evaluation_instance.evaluate_llm(
merged_items, metrics_config, model_provider, model_name, tenant_id
)
@staticmethod
def _extract_query(inputs: dict[str, Any]) -> str:
"""Extract query from inputs."""
for key in ("query", "question", "input", "text"):
if key in inputs:
return str(inputs[key])
values = list(inputs.values())
return str(values[0]) if values else ""
@staticmethod
def _extract_output(response: Union[Mapping[str, Any], Any]) -> str:
"""Extract text output from app response."""
if isinstance(response, Mapping):
# Workflow response
if "data" in response and isinstance(response["data"], Mapping):
outputs = response["data"].get("outputs", {})
if isinstance(outputs, Mapping):
values = list(outputs.values())
return str(values[0]) if values else ""
return str(outputs)
# Completion response
if "answer" in response:
return str(response["answer"])
if "text" in response:
return str(response["text"])
return str(response)
@staticmethod
def _merge_results_into_items(
items: list[EvaluationItemInput],
results: list[EvaluationItemResult],
) -> list[EvaluationItemInput]:
"""Create new items with actual_output set as expected_output context for metrics."""
result_by_index = {r.index: r for r in results}
merged = []
for item in items:
result = result_by_index.get(item.index)
if result and result.actual_output:
merged.append(
EvaluationItemInput(
index=item.index,
inputs=item.inputs,
expected_output=item.expected_output,
context=[result.actual_output] + (item.context or []),
)
)
else:
merged.append(item)
return merged

View File

@@ -0,0 +1,111 @@
import logging
from typing import Any
from sqlalchemy.orm import Session
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
from core.evaluation.entities.evaluation_entity import (
EvaluationItemInput,
EvaluationItemResult,
)
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
logger = logging.getLogger(__name__)
class RetrievalEvaluationRunner(BaseEvaluationRunner):
"""Runner for retrieval evaluation: performs knowledge base retrieval, then evaluates."""
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
super().__init__(evaluation_instance, session)
def execute_target(
self,
tenant_id: str,
target_id: str,
target_type: str,
item: EvaluationItemInput,
) -> EvaluationItemResult:
"""Execute retrieval using DatasetRetrieval and collect context documents."""
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
query = self._extract_query(item.inputs)
dataset_retrieval = DatasetRetrieval()
# Use knowledge_retrieval for structured results
try:
from core.rag.retrieval.dataset_retrieval import KnowledgeRetrievalRequest
request = KnowledgeRetrievalRequest(
query=query,
app_id=target_id,
tenant_id=tenant_id,
)
sources = dataset_retrieval.knowledge_retrieval(request)
retrieved_contexts = [source.content for source in sources if source.content]
except (ImportError, AttributeError):
logger.warning("KnowledgeRetrievalRequest not available, using simple retrieval")
retrieved_contexts = []
return EvaluationItemResult(
index=item.index,
actual_output="\n\n".join(retrieved_contexts) if retrieved_contexts else "",
metadata={"retrieved_contexts": retrieved_contexts},
)
def evaluate_metrics(
self,
items: list[EvaluationItemInput],
results: list[EvaluationItemResult],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
"""Compute retrieval evaluation metrics."""
# Merge retrieved contexts into items
result_by_index = {r.index: r for r in results}
merged_items = []
for item in items:
result = result_by_index.get(item.index)
contexts = result.metadata.get("retrieved_contexts", []) if result else []
merged_items.append(
EvaluationItemInput(
index=item.index,
inputs=item.inputs,
expected_output=item.expected_output,
context=contexts,
)
)
evaluated = self.evaluation_instance.evaluate_retrieval(
merged_items, metrics_config, model_provider, model_name, tenant_id
)
# Merge metrics back into original results (preserve actual_output and metadata)
eval_by_index = {r.index: r for r in evaluated}
final_results = []
for result in results:
if result.index in eval_by_index:
eval_result = eval_by_index[result.index]
final_results.append(
EvaluationItemResult(
index=result.index,
actual_output=result.actual_output,
metrics=eval_result.metrics,
metadata=result.metadata,
error=result.error,
)
)
else:
final_results.append(result)
return final_results
@staticmethod
def _extract_query(inputs: dict[str, Any]) -> str:
for key in ("query", "question", "input", "text"):
if key in inputs:
return str(inputs[key])
values = list(inputs.values())
return str(values[0]) if values else ""

View File

@@ -0,0 +1,133 @@
import logging
from typing import Any, Mapping
from sqlalchemy.orm import Session
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
from core.evaluation.entities.evaluation_entity import (
EvaluationItemInput,
EvaluationItemResult,
)
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
from models.model import App
logger = logging.getLogger(__name__)
class WorkflowEvaluationRunner(BaseEvaluationRunner):
"""Runner for workflow evaluation: executes workflow App in non-streaming mode."""
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
super().__init__(evaluation_instance, session)
def execute_target(
self,
tenant_id: str,
target_id: str,
target_type: str,
item: EvaluationItemInput,
) -> EvaluationItemResult:
"""Execute workflow and collect outputs."""
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.evaluation.runners import get_service_account_for_app
from core.app.entities.app_invoke_entities import InvokeFrom
from services.workflow_service import WorkflowService
app = self.session.query(App).filter_by(id=target_id).first()
if not app:
raise ValueError(f"App {target_id} not found")
service_account = get_service_account_for_app(self.session, target_id)
workflow_service = WorkflowService()
workflow = workflow_service.get_published_workflow(app_model=app)
if not workflow:
raise ValueError(f"No published workflow found for app {target_id}")
args: dict[str, Any] = {"inputs": item.inputs}
generator = WorkflowAppGenerator()
response: Mapping[str, Any] = generator.generate(
app_model=app,
workflow=workflow,
user=service_account,
args=args,
invoke_from=InvokeFrom.SERVICE_API,
streaming=False,
)
actual_output = self._extract_output(response)
node_executions = self._extract_node_executions(response)
return EvaluationItemResult(
index=item.index,
actual_output=actual_output,
metadata={"node_executions": node_executions},
)
def evaluate_metrics(
self,
items: list[EvaluationItemInput],
results: list[EvaluationItemResult],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
"""Compute workflow evaluation metrics (end-to-end)."""
result_by_index = {r.index: r for r in results}
merged_items = []
for item in items:
result = result_by_index.get(item.index)
context = []
if result and result.actual_output:
context.append(result.actual_output)
merged_items.append(
EvaluationItemInput(
index=item.index,
inputs=item.inputs,
expected_output=item.expected_output,
context=context + (item.context or []),
)
)
evaluated = self.evaluation_instance.evaluate_workflow(
merged_items, metrics_config, model_provider, model_name, tenant_id
)
# Merge metrics back preserving metadata
eval_by_index = {r.index: r for r in evaluated}
final_results = []
for result in results:
if result.index in eval_by_index:
eval_result = eval_by_index[result.index]
final_results.append(
EvaluationItemResult(
index=result.index,
actual_output=result.actual_output,
metrics=eval_result.metrics,
metadata=result.metadata,
error=result.error,
)
)
else:
final_results.append(result)
return final_results
@staticmethod
def _extract_output(response: Mapping[str, Any]) -> str:
"""Extract text output from workflow response."""
if "data" in response and isinstance(response["data"], Mapping):
outputs = response["data"].get("outputs", {})
if isinstance(outputs, Mapping):
values = list(outputs.values())
return str(values[0]) if values else ""
return str(outputs)
return str(response)
@staticmethod
def _extract_node_executions(response: Mapping[str, Any]) -> list[dict]:
"""Extract node execution trace from workflow response."""
data = response.get("data", {})
if isinstance(data, Mapping):
return data.get("node_executions", [])
return []

View File

@@ -0,0 +1,45 @@
from flask_restx import fields
from fields.member_fields import simple_account_fields
from libs.helper import TimestampField
# Snippet list item fields (lightweight for list display)
snippet_list_fields = {
"id": fields.String,
"name": fields.String,
"description": fields.String,
"type": fields.String,
"version": fields.Integer,
"use_count": fields.Integer,
"is_published": fields.Boolean,
"icon_info": fields.Raw,
"created_at": TimestampField,
"updated_at": TimestampField,
}
# Full snippet fields (includes creator info and graph data)
snippet_fields = {
"id": fields.String,
"name": fields.String,
"description": fields.String,
"type": fields.String,
"version": fields.Integer,
"use_count": fields.Integer,
"is_published": fields.Boolean,
"icon_info": fields.Raw,
"graph": fields.Raw(attribute="graph_dict"),
"input_fields": fields.Raw(attribute="input_fields_list"),
"created_by": fields.Nested(simple_account_fields, attribute="created_by_account", allow_null=True),
"created_at": TimestampField,
"updated_by": fields.Nested(simple_account_fields, attribute="updated_by_account", allow_null=True),
"updated_at": TimestampField,
}
# Pagination response fields
snippet_pagination_fields = {
"data": fields.List(fields.Nested(snippet_list_fields)),
"page": fields.Integer,
"limit": fields.Integer,
"total": fields.Integer,
"has_more": fields.Boolean,
}

View File

@@ -0,0 +1,83 @@
"""add_customized_snippets_table
Revision ID: 1c05e80d2380
Revises: 788d3099ae3a
Create Date: 2026-01-29 12:00:00.000000
"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
import models as models
def _is_pg(conn):
return conn.dialect.name == "postgresql"
# revision identifiers, used by Alembic.
revision = "1c05e80d2380"
down_revision = "788d3099ae3a"
branch_labels = None
depends_on = None
def upgrade():
conn = op.get_bind()
if _is_pg(conn):
op.create_table(
"customized_snippets",
sa.Column("id", models.types.StringUUID(), server_default=sa.text("uuidv7()"), nullable=False),
sa.Column("tenant_id", models.types.StringUUID(), nullable=False),
sa.Column("name", sa.String(length=255), nullable=False),
sa.Column("description", sa.Text(), nullable=True),
sa.Column("type", sa.String(length=50), server_default=sa.text("'node'"), nullable=False),
sa.Column("workflow_id", models.types.StringUUID(), nullable=True),
sa.Column("is_published", sa.Boolean(), server_default=sa.text("false"), nullable=False),
sa.Column("version", sa.Integer(), server_default=sa.text("1"), nullable=False),
sa.Column("use_count", sa.Integer(), server_default=sa.text("0"), nullable=False),
sa.Column("icon_info", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column("graph", sa.Text(), nullable=True),
sa.Column("input_fields", sa.Text(), nullable=True),
sa.Column("created_by", models.types.StringUUID(), nullable=True),
sa.Column("created_at", sa.DateTime(), server_default=sa.text("CURRENT_TIMESTAMP"), nullable=False),
sa.Column("updated_by", models.types.StringUUID(), nullable=True),
sa.Column("updated_at", sa.DateTime(), server_default=sa.text("CURRENT_TIMESTAMP"), nullable=False),
sa.PrimaryKeyConstraint("id", name="customized_snippet_pkey"),
sa.UniqueConstraint("tenant_id", "name", name="customized_snippet_tenant_name_key"),
)
else:
op.create_table(
"customized_snippets",
sa.Column("id", models.types.StringUUID(), nullable=False),
sa.Column("tenant_id", models.types.StringUUID(), nullable=False),
sa.Column("name", sa.String(length=255), nullable=False),
sa.Column("description", models.types.LongText(), nullable=True),
sa.Column("type", sa.String(length=50), server_default=sa.text("'node'"), nullable=False),
sa.Column("workflow_id", models.types.StringUUID(), nullable=True),
sa.Column("is_published", sa.Boolean(), server_default=sa.text("false"), nullable=False),
sa.Column("version", sa.Integer(), server_default=sa.text("1"), nullable=False),
sa.Column("use_count", sa.Integer(), server_default=sa.text("0"), nullable=False),
sa.Column("icon_info", models.types.AdjustedJSON(astext_type=sa.Text()), nullable=True),
sa.Column("graph", models.types.LongText(), nullable=True),
sa.Column("input_fields", models.types.LongText(), nullable=True),
sa.Column("created_by", models.types.StringUUID(), nullable=True),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.Column("updated_by", models.types.StringUUID(), nullable=True),
sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.PrimaryKeyConstraint("id", name="customized_snippet_pkey"),
sa.UniqueConstraint("tenant_id", "name", name="customized_snippet_tenant_name_key"),
)
with op.batch_alter_table("customized_snippets", schema=None) as batch_op:
batch_op.create_index("customized_snippet_tenant_idx", ["tenant_id"], unique=False)
def downgrade():
with op.batch_alter_table("customized_snippets", schema=None) as batch_op:
batch_op.drop_index("customized_snippet_tenant_idx")
op.drop_table("customized_snippets")

View File

@@ -0,0 +1,113 @@
"""add_evaluation_tables
Revision ID: a1b2c3d4e5f6
Revises: 1c05e80d2380
Create Date: 2026-03-03 00:01:00.000000
"""
import sqlalchemy as sa
from alembic import op
import models as models
# revision identifiers, used by Alembic.
revision = "a1b2c3d4e5f6"
down_revision = "1c05e80d2380"
branch_labels = None
depends_on = None
def upgrade():
# evaluation_configurations
op.create_table(
"evaluation_configurations",
sa.Column("id", models.types.StringUUID(), nullable=False),
sa.Column("tenant_id", models.types.StringUUID(), nullable=False),
sa.Column("target_type", sa.String(length=20), nullable=False),
sa.Column("target_id", models.types.StringUUID(), nullable=False),
sa.Column("evaluation_model_provider", sa.String(length=255), nullable=True),
sa.Column("evaluation_model", sa.String(length=255), nullable=True),
sa.Column("metrics_config", models.types.LongText(), nullable=True),
sa.Column("judgement_conditions", models.types.LongText(), nullable=True),
sa.Column("created_by", models.types.StringUUID(), nullable=False),
sa.Column("updated_by", models.types.StringUUID(), nullable=False),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.PrimaryKeyConstraint("id", name="evaluation_configuration_pkey"),
sa.UniqueConstraint("tenant_id", "target_type", "target_id", name="evaluation_configuration_unique"),
)
with op.batch_alter_table("evaluation_configurations", schema=None) as batch_op:
batch_op.create_index(
"evaluation_configuration_target_idx", ["tenant_id", "target_type", "target_id"], unique=False
)
# evaluation_runs
op.create_table(
"evaluation_runs",
sa.Column("id", models.types.StringUUID(), nullable=False),
sa.Column("tenant_id", models.types.StringUUID(), nullable=False),
sa.Column("target_type", sa.String(length=20), nullable=False),
sa.Column("target_id", models.types.StringUUID(), nullable=False),
sa.Column("evaluation_config_id", models.types.StringUUID(), nullable=False),
sa.Column("status", sa.String(length=20), nullable=False, server_default=sa.text("'pending'")),
sa.Column("dataset_file_id", models.types.StringUUID(), nullable=True),
sa.Column("result_file_id", models.types.StringUUID(), nullable=True),
sa.Column("total_items", sa.Integer(), nullable=False, server_default=sa.text("0")),
sa.Column("completed_items", sa.Integer(), nullable=False, server_default=sa.text("0")),
sa.Column("failed_items", sa.Integer(), nullable=False, server_default=sa.text("0")),
sa.Column("metrics_summary", models.types.LongText(), nullable=True),
sa.Column("error", sa.Text(), nullable=True),
sa.Column("celery_task_id", sa.String(length=255), nullable=True),
sa.Column("created_by", models.types.StringUUID(), nullable=False),
sa.Column("started_at", sa.DateTime(), nullable=True),
sa.Column("completed_at", sa.DateTime(), nullable=True),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.PrimaryKeyConstraint("id", name="evaluation_run_pkey"),
)
with op.batch_alter_table("evaluation_runs", schema=None) as batch_op:
batch_op.create_index(
"evaluation_run_target_idx", ["tenant_id", "target_type", "target_id"], unique=False
)
batch_op.create_index("evaluation_run_status_idx", ["tenant_id", "status"], unique=False)
# evaluation_run_items
op.create_table(
"evaluation_run_items",
sa.Column("id", models.types.StringUUID(), nullable=False),
sa.Column("evaluation_run_id", models.types.StringUUID(), nullable=False),
sa.Column("item_index", sa.Integer(), nullable=False),
sa.Column("inputs", models.types.LongText(), nullable=True),
sa.Column("expected_output", models.types.LongText(), nullable=True),
sa.Column("context", models.types.LongText(), nullable=True),
sa.Column("actual_output", models.types.LongText(), nullable=True),
sa.Column("metrics", models.types.LongText(), nullable=True),
sa.Column("metadata_json", models.types.LongText(), nullable=True),
sa.Column("error", sa.Text(), nullable=True),
sa.Column("overall_score", sa.Float(), nullable=True),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.PrimaryKeyConstraint("id", name="evaluation_run_item_pkey"),
)
with op.batch_alter_table("evaluation_run_items", schema=None) as batch_op:
batch_op.create_index("evaluation_run_item_run_idx", ["evaluation_run_id"], unique=False)
batch_op.create_index(
"evaluation_run_item_index_idx", ["evaluation_run_id", "item_index"], unique=False
)
def downgrade():
with op.batch_alter_table("evaluation_run_items", schema=None) as batch_op:
batch_op.drop_index("evaluation_run_item_index_idx")
batch_op.drop_index("evaluation_run_item_run_idx")
op.drop_table("evaluation_run_items")
with op.batch_alter_table("evaluation_runs", schema=None) as batch_op:
batch_op.drop_index("evaluation_run_status_idx")
batch_op.drop_index("evaluation_run_target_idx")
op.drop_table("evaluation_runs")
with op.batch_alter_table("evaluation_configurations", schema=None) as batch_op:
batch_op.drop_index("evaluation_configuration_target_idx")
op.drop_table("evaluation_configurations")

View File

@@ -26,6 +26,13 @@ from .dataset import (
TidbAuthBinding,
Whitelist,
)
from .evaluation import (
EvaluationConfiguration,
EvaluationRun,
EvaluationRunItem,
EvaluationRunStatus,
EvaluationTargetType,
)
from .enums import (
AppTriggerStatus,
AppTriggerType,
@@ -81,6 +88,7 @@ from .provider import (
TenantDefaultModel,
TenantPreferredModelProvider,
)
from .snippet import CustomizedSnippet, SnippetType
from .source import DataSourceApiKeyAuthBinding, DataSourceOauthBinding
from .task import CeleryTask, CeleryTaskSet
from .tools import (
@@ -140,6 +148,7 @@ __all__ = [
"Conversation",
"ConversationVariable",
"CreatorUserRole",
"CustomizedSnippet",
"DataSourceApiKeyAuthBinding",
"DataSourceOauthBinding",
"Dataset",
@@ -156,6 +165,11 @@ __all__ = [
"Document",
"DocumentSegment",
"Embedding",
"EvaluationConfiguration",
"EvaluationRun",
"EvaluationRunItem",
"EvaluationRunStatus",
"EvaluationTargetType",
"EndUser",
"ExecutionExtraContent",
"ExporleBanner",
@@ -184,6 +198,7 @@ __all__ = [
"RecommendedApp",
"SavedMessage",
"Site",
"SnippetType",
"Tag",
"TagBinding",
"Tenant",

193
api/models/evaluation.py Normal file
View File

@@ -0,0 +1,193 @@
from __future__ import annotations
import json
from datetime import datetime
from enum import StrEnum
from typing import Any
import sqlalchemy as sa
from sqlalchemy import DateTime, Float, Integer, String, Text, func
from sqlalchemy.orm import Mapped, mapped_column
from libs.uuid_utils import uuidv7
from .base import Base
from .types import LongText, StringUUID
class EvaluationRunStatus(StrEnum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class EvaluationTargetType(StrEnum):
APP = "app"
SNIPPETS = "snippets"
class EvaluationConfiguration(Base):
"""Stores evaluation configuration for each target (App or Snippet)."""
__tablename__ = "evaluation_configurations"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="evaluation_configuration_pkey"),
sa.Index("evaluation_configuration_target_idx", "tenant_id", "target_type", "target_id"),
sa.UniqueConstraint("tenant_id", "target_type", "target_id", name="evaluation_configuration_unique"),
)
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuidv7()))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
target_type: Mapped[str] = mapped_column(String(20), nullable=False)
target_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
evaluation_model_provider: Mapped[str | None] = mapped_column(String(255), nullable=True)
evaluation_model: Mapped[str | None] = mapped_column(String(255), nullable=True)
metrics_config: Mapped[str | None] = mapped_column(LongText, nullable=True)
judgement_conditions: Mapped[str | None] = mapped_column(LongText, nullable=True)
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
updated_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
@property
def metrics_config_dict(self) -> dict[str, Any]:
if self.metrics_config:
return json.loads(self.metrics_config)
return {}
@metrics_config_dict.setter
def metrics_config_dict(self, value: dict[str, Any]) -> None:
self.metrics_config = json.dumps(value)
@property
def judgement_conditions_dict(self) -> dict[str, Any]:
if self.judgement_conditions:
return json.loads(self.judgement_conditions)
return {}
@judgement_conditions_dict.setter
def judgement_conditions_dict(self, value: dict[str, Any]) -> None:
self.judgement_conditions = json.dumps(value)
def __repr__(self) -> str:
return f"<EvaluationConfiguration(id={self.id}, target={self.target_type}:{self.target_id})>"
class EvaluationRun(Base):
"""Stores each evaluation run record."""
__tablename__ = "evaluation_runs"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="evaluation_run_pkey"),
sa.Index("evaluation_run_target_idx", "tenant_id", "target_type", "target_id"),
sa.Index("evaluation_run_status_idx", "tenant_id", "status"),
)
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuidv7()))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
target_type: Mapped[str] = mapped_column(String(20), nullable=False)
target_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
evaluation_config_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
status: Mapped[str] = mapped_column(
String(20), nullable=False, default=EvaluationRunStatus.PENDING
)
dataset_file_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
result_file_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
total_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
completed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
failed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
metrics_summary: Mapped[str | None] = mapped_column(LongText, nullable=True)
error: Mapped[str | None] = mapped_column(Text, nullable=True)
celery_task_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
@property
def metrics_summary_dict(self) -> dict[str, Any]:
if self.metrics_summary:
return json.loads(self.metrics_summary)
return {}
@metrics_summary_dict.setter
def metrics_summary_dict(self, value: dict[str, Any]) -> None:
self.metrics_summary = json.dumps(value)
@property
def progress(self) -> float:
if self.total_items == 0:
return 0.0
return (self.completed_items + self.failed_items) / self.total_items
def __repr__(self) -> str:
return f"<EvaluationRun(id={self.id}, status={self.status})>"
class EvaluationRunItem(Base):
"""Stores per-row evaluation results."""
__tablename__ = "evaluation_run_items"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="evaluation_run_item_pkey"),
sa.Index("evaluation_run_item_run_idx", "evaluation_run_id"),
sa.Index("evaluation_run_item_index_idx", "evaluation_run_id", "item_index"),
)
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuidv7()))
evaluation_run_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
item_index: Mapped[int] = mapped_column(Integer, nullable=False)
inputs: Mapped[str | None] = mapped_column(LongText, nullable=True)
expected_output: Mapped[str | None] = mapped_column(LongText, nullable=True)
context: Mapped[str | None] = mapped_column(LongText, nullable=True)
actual_output: Mapped[str | None] = mapped_column(LongText, nullable=True)
metrics: Mapped[str | None] = mapped_column(LongText, nullable=True)
metadata_json: Mapped[str | None] = mapped_column(LongText, nullable=True)
error: Mapped[str | None] = mapped_column(Text, nullable=True)
overall_score: Mapped[float | None] = mapped_column(Float, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp()
)
@property
def inputs_dict(self) -> dict[str, Any]:
if self.inputs:
return json.loads(self.inputs)
return {}
@property
def metrics_list(self) -> list[dict[str, Any]]:
if self.metrics:
return json.loads(self.metrics)
return []
@property
def metadata_dict(self) -> dict[str, Any]:
if self.metadata_json:
return json.loads(self.metadata_json)
return {}
def __repr__(self) -> str:
return f"<EvaluationRunItem(id={self.id}, run={self.evaluation_run_id}, index={self.item_index})>"

101
api/models/snippet.py Normal file
View File

@@ -0,0 +1,101 @@
import json
from datetime import datetime
from enum import StrEnum
from typing import Any
import sqlalchemy as sa
from sqlalchemy import DateTime, String, func
from sqlalchemy.orm import Mapped, mapped_column
from libs.uuid_utils import uuidv7
from .account import Account
from .base import Base
from .engine import db
from .types import AdjustedJSON, LongText, StringUUID
class SnippetType(StrEnum):
"""Snippet Type Enum"""
NODE = "node"
GROUP = "group"
class CustomizedSnippet(Base):
"""
Customized Snippet Model
Stores reusable workflow components (nodes or node groups) that can be
shared across applications within a workspace.
"""
__tablename__ = "customized_snippets"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="customized_snippet_pkey"),
sa.Index("customized_snippet_tenant_idx", "tenant_id"),
sa.UniqueConstraint("tenant_id", "name", name="customized_snippet_tenant_name_key"),
)
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuidv7()))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(LongText, nullable=True)
type: Mapped[str] = mapped_column(String(50), nullable=False, server_default=sa.text("'node'"))
# Workflow reference for published version
workflow_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
# State flags
is_published: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("false"))
version: Mapped[int] = mapped_column(sa.Integer, nullable=False, server_default=sa.text("1"))
use_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, server_default=sa.text("0"))
# Visual customization
icon_info: Mapped[dict | None] = mapped_column(AdjustedJSON, nullable=True)
# Snippet configuration (stored as JSON text)
input_fields: Mapped[str | None] = mapped_column(LongText, nullable=True)
# Audit fields
created_by: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_by: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
@property
def graph_dict(self) -> dict[str, Any]:
"""Get graph from associated workflow."""
if self.workflow_id:
from .workflow import Workflow
workflow = db.session.get(Workflow, self.workflow_id)
if workflow:
return json.loads(workflow.graph) if workflow.graph else {}
return {}
@property
def input_fields_list(self) -> list[dict[str, Any]]:
"""Parse input_fields JSON to list."""
return json.loads(self.input_fields) if self.input_fields else []
@property
def created_by_account(self) -> Account | None:
"""Get the account that created this snippet."""
if self.created_by:
return db.session.get(Account, self.created_by)
return None
@property
def updated_by_account(self) -> Account | None:
"""Get the account that last updated this snippet."""
if self.updated_by:
return db.session.get(Account, self.updated_by)
return None
@property
def version_str(self) -> str:
"""Get version as string for API response."""
return str(self.version)

View File

@@ -67,6 +67,7 @@ class WorkflowType(StrEnum):
WORKFLOW = "workflow"
CHAT = "chat"
RAG_PIPELINE = "rag-pipeline"
SNIPPET = "snippet"
@classmethod
def value_of(cls, value: str) -> "WorkflowType":

View File

@@ -199,6 +199,12 @@ storage = [
############################################################
tools = ["cloudscraper~=1.2.71", "nltk~=3.9.1"]
############################################################
# [ Evaluation ] dependency group
# Required for evaluation frameworks
############################################################
evaluation = ["ragas>=0.2.0"]
############################################################
# [ VDB ] dependency group
# Required by vector store clients

View File

@@ -0,0 +1,21 @@
from services.errors.base import BaseServiceError
class EvaluationFrameworkNotConfiguredError(BaseServiceError):
def __init__(self, description: str | None = None):
super().__init__(description or "Evaluation framework is not configured. Set EVALUATION_FRAMEWORK env var.")
class EvaluationNotFoundError(BaseServiceError):
def __init__(self, description: str | None = None):
super().__init__(description or "Evaluation not found.")
class EvaluationDatasetInvalidError(BaseServiceError):
def __init__(self, description: str | None = None):
super().__init__(description or "Evaluation dataset is invalid.")
class EvaluationMaxConcurrentRunsError(BaseServiceError):
def __init__(self, description: str | None = None):
super().__init__(description or "Maximum number of concurrent evaluation runs reached.")

View File

@@ -0,0 +1,460 @@
import io
import json
import logging
from typing import Any, Union
from openpyxl import Workbook, load_workbook
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from openpyxl.utils import get_column_letter
from sqlalchemy.orm import Session
from configs import dify_config
from core.evaluation.entities.evaluation_entity import (
EvaluationCategory,
EvaluationItemInput,
EvaluationRunData,
)
from core.evaluation.evaluation_manager import EvaluationManager
from models.evaluation import (
EvaluationConfiguration,
EvaluationRun,
EvaluationRunItem,
EvaluationRunStatus,
)
from models.model import App, AppMode
from models.snippet import CustomizedSnippet
from services.errors.evaluation import (
EvaluationDatasetInvalidError,
EvaluationFrameworkNotConfiguredError,
EvaluationMaxConcurrentRunsError,
EvaluationNotFoundError,
)
from services.snippet_service import SnippetService
from services.workflow_service import WorkflowService
logger = logging.getLogger(__name__)
class EvaluationService:
"""
Service for evaluation-related operations.
Provides functionality to generate evaluation dataset templates
based on App or Snippet input parameters.
"""
# Excluded app modes that don't support evaluation templates
EXCLUDED_APP_MODES = {AppMode.RAG_PIPELINE}
@classmethod
def generate_dataset_template(
cls,
target: Union[App, CustomizedSnippet],
target_type: str,
) -> tuple[bytes, str]:
"""
Generate evaluation dataset template as XLSX bytes.
Creates an XLSX file with headers based on the evaluation target's input parameters.
The first column is index, followed by input parameter columns.
:param target: App or CustomizedSnippet instance
:param target_type: Target type string ("app" or "snippet")
:return: Tuple of (xlsx_content_bytes, filename)
:raises ValueError: If target type is not supported or app mode is excluded
"""
# Validate target type
if target_type == "app":
if not isinstance(target, App):
raise ValueError("Invalid target: expected App instance")
if AppMode.value_of(target.mode) in cls.EXCLUDED_APP_MODES:
raise ValueError(f"App mode '{target.mode}' does not support evaluation templates")
input_fields = cls._get_app_input_fields(target)
elif target_type == "snippet":
if not isinstance(target, CustomizedSnippet):
raise ValueError("Invalid target: expected CustomizedSnippet instance")
input_fields = cls._get_snippet_input_fields(target)
else:
raise ValueError(f"Unsupported target type: {target_type}")
# Generate XLSX template
xlsx_content = cls._generate_xlsx_template(input_fields, target.name)
# Build filename
truncated_name = target.name[:10] + "..." if len(target.name) > 10 else target.name
filename = f"{truncated_name}-evaluation-dataset.xlsx"
return xlsx_content, filename
@classmethod
def _get_app_input_fields(cls, app: App) -> list[dict]:
"""
Get input fields from App's workflow.
:param app: App instance
:return: List of input field definitions
"""
workflow_service = WorkflowService()
workflow = workflow_service.get_published_workflow(app_model=app)
if not workflow:
workflow = workflow_service.get_draft_workflow(app_model=app)
if not workflow:
return []
# Get user input form from workflow
user_input_form = workflow.user_input_form()
return user_input_form
@classmethod
def _get_snippet_input_fields(cls, snippet: CustomizedSnippet) -> list[dict]:
"""
Get input fields from Snippet.
Tries to get from snippet's own input_fields first,
then falls back to workflow's user_input_form.
:param snippet: CustomizedSnippet instance
:return: List of input field definitions
"""
# Try snippet's own input_fields first
input_fields = snippet.input_fields_list
if input_fields:
return input_fields
# Fallback to workflow's user_input_form
snippet_service = SnippetService()
workflow = snippet_service.get_published_workflow(snippet=snippet)
if not workflow:
workflow = snippet_service.get_draft_workflow(snippet=snippet)
if workflow:
return workflow.user_input_form()
return []
@classmethod
def _generate_xlsx_template(cls, input_fields: list[dict], target_name: str) -> bytes:
"""
Generate XLSX template file content.
Creates a workbook with:
- First row as header row with "index" and input field names
- Styled header with background color and borders
- Empty data rows ready for user input
:param input_fields: List of input field definitions
:param target_name: Name of the target (for sheet name)
:return: XLSX file content as bytes
"""
wb = Workbook()
ws = wb.active
sheet_name = "Evaluation Dataset"
ws.title = sheet_name
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_alignment = Alignment(horizontal="center", vertical="center")
thin_border = Border(
left=Side(style="thin"),
right=Side(style="thin"),
top=Side(style="thin"),
bottom=Side(style="thin"),
)
# Build header row
headers = ["index"]
for field in input_fields:
field_label = field.get("label") or field.get("variable")
headers.append(field_label)
# Write header row
for col_idx, header in enumerate(headers, start=1):
cell = ws.cell(row=1, column=col_idx, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
cell.border = thin_border
# Set column widths
ws.column_dimensions["A"].width = 10 # index column
for col_idx in range(2, len(headers) + 1):
ws.column_dimensions[get_column_letter(col_idx)].width = 20
# Add one empty row with row number for user reference
for col_idx in range(1, len(headers) + 1):
cell = ws.cell(row=2, column=col_idx, value="")
cell.border = thin_border
if col_idx == 1:
cell.value = 1
cell.alignment = Alignment(horizontal="center")
# Save to bytes
output = io.BytesIO()
wb.save(output)
output.seek(0)
return output.getvalue()
# ---- Evaluation Configuration CRUD ----
@classmethod
def get_evaluation_config(
cls,
session: Session,
tenant_id: str,
target_type: str,
target_id: str,
) -> EvaluationConfiguration | None:
return (
session.query(EvaluationConfiguration)
.filter_by(tenant_id=tenant_id, target_type=target_type, target_id=target_id)
.first()
)
@classmethod
def save_evaluation_config(
cls,
session: Session,
tenant_id: str,
target_type: str,
target_id: str,
account_id: str,
data: dict[str, Any],
) -> EvaluationConfiguration:
config = cls.get_evaluation_config(session, tenant_id, target_type, target_id)
if config is None:
config = EvaluationConfiguration(
tenant_id=tenant_id,
target_type=target_type,
target_id=target_id,
created_by=account_id,
updated_by=account_id,
)
session.add(config)
config.evaluation_model_provider = data.get("evaluation_model_provider")
config.evaluation_model = data.get("evaluation_model")
config.metrics_config = json.dumps(data.get("metrics_config", {}))
config.judgement_conditions = json.dumps(data.get("judgement_conditions", {}))
config.updated_by = account_id
session.commit()
session.refresh(config)
return config
# ---- Evaluation Run Management ----
@classmethod
def start_evaluation_run(
cls,
session: Session,
tenant_id: str,
target_type: str,
target_id: str,
account_id: str,
dataset_file_content: bytes,
evaluation_category: EvaluationCategory,
) -> EvaluationRun:
"""Validate dataset, create run record, dispatch Celery task."""
# Check framework is configured
evaluation_instance = EvaluationManager.get_evaluation_instance()
if evaluation_instance is None:
raise EvaluationFrameworkNotConfiguredError()
# Check evaluation config exists
config = cls.get_evaluation_config(session, tenant_id, target_type, target_id)
if config is None:
raise EvaluationNotFoundError("Evaluation configuration not found. Please configure evaluation first.")
# Check concurrent run limit
active_runs = (
session.query(EvaluationRun)
.filter_by(tenant_id=tenant_id)
.filter(EvaluationRun.status.in_([EvaluationRunStatus.PENDING, EvaluationRunStatus.RUNNING]))
.count()
)
max_concurrent = dify_config.EVALUATION_MAX_CONCURRENT_RUNS
if active_runs >= max_concurrent:
raise EvaluationMaxConcurrentRunsError(
f"Maximum concurrent runs ({max_concurrent}) reached."
)
# Parse dataset
items = cls._parse_dataset(dataset_file_content)
max_rows = dify_config.EVALUATION_MAX_DATASET_ROWS
if len(items) > max_rows:
raise EvaluationDatasetInvalidError(f"Dataset has {len(items)} rows, max is {max_rows}.")
# Create evaluation run
evaluation_run = EvaluationRun(
tenant_id=tenant_id,
target_type=target_type,
target_id=target_id,
evaluation_config_id=config.id,
status=EvaluationRunStatus.PENDING,
total_items=len(items),
created_by=account_id,
)
session.add(evaluation_run)
session.commit()
session.refresh(evaluation_run)
# Build Celery task data
run_data = EvaluationRunData(
evaluation_run_id=evaluation_run.id,
tenant_id=tenant_id,
target_type=target_type,
target_id=target_id,
evaluation_category=evaluation_category,
evaluation_model_provider=config.evaluation_model_provider or "",
evaluation_model=config.evaluation_model or "",
metrics_config=config.metrics_config_dict,
items=items,
)
# Dispatch Celery task
from tasks.evaluation_task import run_evaluation
task = run_evaluation.delay(run_data.model_dump())
evaluation_run.celery_task_id = task.id
session.commit()
return evaluation_run
@classmethod
def get_evaluation_runs(
cls,
session: Session,
tenant_id: str,
target_type: str,
target_id: str,
page: int = 1,
page_size: int = 20,
) -> tuple[list[EvaluationRun], int]:
"""Query evaluation run history with pagination."""
query = (
session.query(EvaluationRun)
.filter_by(tenant_id=tenant_id, target_type=target_type, target_id=target_id)
.order_by(EvaluationRun.created_at.desc())
)
total = query.count()
runs = query.offset((page - 1) * page_size).limit(page_size).all()
return runs, total
@classmethod
def get_evaluation_run_detail(
cls,
session: Session,
tenant_id: str,
run_id: str,
) -> EvaluationRun:
run = (
session.query(EvaluationRun)
.filter_by(id=run_id, tenant_id=tenant_id)
.first()
)
if not run:
raise EvaluationNotFoundError("Evaluation run not found.")
return run
@classmethod
def get_evaluation_run_items(
cls,
session: Session,
run_id: str,
page: int = 1,
page_size: int = 50,
) -> tuple[list[EvaluationRunItem], int]:
"""Query evaluation run items with pagination."""
query = (
session.query(EvaluationRunItem)
.filter_by(evaluation_run_id=run_id)
.order_by(EvaluationRunItem.item_index.asc())
)
total = query.count()
items = query.offset((page - 1) * page_size).limit(page_size).all()
return items, total
@classmethod
def cancel_evaluation_run(
cls,
session: Session,
tenant_id: str,
run_id: str,
) -> EvaluationRun:
run = cls.get_evaluation_run_detail(session, tenant_id, run_id)
if run.status not in (EvaluationRunStatus.PENDING, EvaluationRunStatus.RUNNING):
raise ValueError(f"Cannot cancel evaluation run in status: {run.status}")
run.status = EvaluationRunStatus.CANCELLED
# Revoke Celery task if running
if run.celery_task_id:
try:
from celery import current_app as celery_app
celery_app.control.revoke(run.celery_task_id, terminate=True)
except Exception:
logger.exception("Failed to revoke Celery task %s", run.celery_task_id)
session.commit()
return run
@classmethod
def get_supported_metrics(cls, category: EvaluationCategory) -> list[str]:
return EvaluationManager.get_supported_metrics(category)
# ---- Dataset Parsing ----
@classmethod
def _parse_dataset(cls, xlsx_content: bytes) -> list[EvaluationItemInput]:
"""Parse evaluation dataset from XLSX bytes."""
wb = load_workbook(io.BytesIO(xlsx_content), read_only=True)
ws = wb.active
if ws is None:
raise EvaluationDatasetInvalidError("XLSX file has no active worksheet.")
rows = list(ws.iter_rows(values_only=True))
if len(rows) < 2:
raise EvaluationDatasetInvalidError("Dataset must have at least a header row and one data row.")
headers = [str(h).strip() if h is not None else "" for h in rows[0]]
if not headers or headers[0].lower() != "index":
raise EvaluationDatasetInvalidError("First column header must be 'index'.")
input_headers = headers[1:] # Skip 'index'
items = []
for row_idx, row in enumerate(rows[1:], start=1):
values = list(row)
if all(v is None or str(v).strip() == "" for v in values):
continue # Skip empty rows
index_val = values[0] if values else row_idx
try:
index = int(index_val)
except (TypeError, ValueError):
index = row_idx
inputs: dict[str, Any] = {}
for col_idx, header in enumerate(input_headers):
val = values[col_idx + 1] if col_idx + 1 < len(values) else None
inputs[header] = str(val) if val is not None else ""
# Check for expected_output column
expected_output = inputs.pop("expected_output", None)
context_str = inputs.pop("context", None)
context = context_str.split(";") if context_str else None
items.append(
EvaluationItemInput(
index=index,
inputs=inputs,
expected_output=expected_output,
context=context,
)
)
wb.close()
return items

View File

@@ -0,0 +1,374 @@
"""
Service for generating snippet workflow executions.
Uses an adapter pattern to bridge CustomizedSnippet with the App-based
WorkflowAppGenerator. The adapter (_SnippetAsApp) provides the minimal App-like
interface needed by the generator, avoiding modifications to core workflow
infrastructure.
Key invariants:
- Snippets always run as WORKFLOW mode (not CHAT or ADVANCED_CHAT).
- The adapter maps snippet.id to app_id in workflow execution records.
- Snippet debugging has no rate limiting (max_active_requests = 0).
Supported execution modes:
- Full workflow run (generate): Runs the entire draft workflow as SSE stream.
- Single node run (run_draft_node): Synchronous single-step debugging for regular nodes.
- Single iteration run (generate_single_iteration): SSE stream for iteration container nodes.
- Single loop run (generate_single_loop): SSE stream for loop container nodes.
"""
import json
import logging
from collections.abc import Generator, Mapping, Sequence
from typing import Any, Union
from sqlalchemy.orm import make_transient
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.file import File
from factories import file_factory
from models import Account
from models.model import AppMode, EndUser
from models.snippet import CustomizedSnippet
from models.workflow import Workflow, WorkflowNodeExecutionModel
from services.snippet_service import SnippetService
from services.workflow_service import WorkflowService
logger = logging.getLogger(__name__)
class _SnippetAsApp:
"""
Minimal adapter that wraps a CustomizedSnippet to satisfy the App-like
interface required by WorkflowAppGenerator, WorkflowAppConfigManager,
and WorkflowService.run_draft_workflow_node.
Used properties:
- id: maps to snippet.id (stored as app_id in workflows table)
- tenant_id: maps to snippet.tenant_id
- mode: hardcoded to AppMode.WORKFLOW since snippets always run as workflows
- max_active_requests: defaults to 0 (no limit) for snippet debugging
- app_model_config_id: None (snippets don't have app model configs)
"""
id: str
tenant_id: str
mode: str
max_active_requests: int
app_model_config_id: str | None
def __init__(self, snippet: CustomizedSnippet) -> None:
self.id = snippet.id
self.tenant_id = snippet.tenant_id
self.mode = AppMode.WORKFLOW.value
self.max_active_requests = 0
self.app_model_config_id = None
class SnippetGenerateService:
"""
Service for running snippet workflow executions.
Adapts CustomizedSnippet to work with the existing App-based
WorkflowAppGenerator infrastructure, avoiding duplication of the
complex workflow execution pipeline.
"""
# Specific ID for the injected virtual Start node so it can be recognised
_VIRTUAL_START_NODE_ID = "__snippet_virtual_start__"
@classmethod
def generate(
cls,
snippet: CustomizedSnippet,
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool = True,
) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]:
"""
Run a snippet's draft workflow.
Retrieves the draft workflow, adapts the snippet to an App-like proxy,
then delegates execution to WorkflowAppGenerator.
If the workflow graph has no Start node, a virtual Start node is injected
in-memory so that:
1. Graph validation passes (root node must have execution_type=ROOT).
2. User inputs are processed into the variable pool by the StartNode logic.
:param snippet: CustomizedSnippet instance
:param user: Account or EndUser initiating the run
:param args: Workflow inputs (must include "inputs" key)
:param invoke_from: Source of invocation (typically DEBUGGER)
:param streaming: Whether to stream the response
:return: Blocking response mapping or SSE streaming generator
:raises ValueError: If the snippet has no draft workflow
"""
snippet_service = SnippetService()
workflow = snippet_service.get_draft_workflow(snippet=snippet)
if not workflow:
raise ValueError("Workflow not initialized")
# Inject a virtual Start node when the graph doesn't have one.
workflow = cls._ensure_start_node(workflow, snippet)
# Adapt snippet to App-like interface for WorkflowAppGenerator
app_proxy = _SnippetAsApp(snippet)
return WorkflowAppGenerator.convert_to_event_stream(
WorkflowAppGenerator().generate(
app_model=app_proxy, # type: ignore[arg-type]
workflow=workflow,
user=user,
args=args,
invoke_from=invoke_from,
streaming=streaming,
call_depth=0,
)
)
@classmethod
def _ensure_start_node(cls, workflow: Workflow, snippet: CustomizedSnippet) -> Workflow:
"""
Return *workflow* with a Start node.
If the graph already contains a Start node, the original workflow is
returned unchanged. Otherwise a virtual Start node is injected and the
workflow object is detached from the SQLAlchemy session so the in-memory
change is never flushed to the database.
"""
graph_dict = workflow.graph_dict
nodes: list[dict[str, Any]] = graph_dict.get("nodes", [])
has_start = any(node.get("data", {}).get("type") == "start" for node in nodes)
if has_start:
return workflow
modified_graph = cls._inject_virtual_start_node(
graph_dict=graph_dict,
input_fields=snippet.input_fields_list,
)
# Detach from session to prevent accidental DB persistence of the
# modified graph. All attributes remain accessible for read.
make_transient(workflow)
workflow.graph = json.dumps(modified_graph)
return workflow
@classmethod
def _inject_virtual_start_node(
cls,
graph_dict: Mapping[str, Any],
input_fields: list[dict[str, Any]],
) -> dict[str, Any]:
"""
Build a new graph dict with a virtual Start node prepended.
The virtual Start node is wired to every existing node that has no
incoming edges (i.e. the current root candidates). This guarantees:
:param graph_dict: Original graph configuration.
:param input_fields: Snippet input field definitions from
``CustomizedSnippet.input_fields_list``.
:return: New graph dict containing the virtual Start node and edges.
"""
nodes: list[dict[str, Any]] = list(graph_dict.get("nodes", []))
edges: list[dict[str, Any]] = list(graph_dict.get("edges", []))
# Identify nodes with no incoming edges.
nodes_with_incoming: set[str] = set()
for edge in edges:
target = edge.get("target")
if isinstance(target, str):
nodes_with_incoming.add(target)
root_candidate_ids = [n["id"] for n in nodes if n["id"] not in nodes_with_incoming]
# Build Start node ``variables`` from snippet input fields.
start_variables: list[dict[str, Any]] = []
for field in input_fields:
var: dict[str, Any] = {
"variable": field.get("variable", ""),
"label": field.get("label", field.get("variable", "")),
"type": field.get("type", "text-input"),
"required": field.get("required", False),
"options": field.get("options", []),
}
if field.get("max_length") is not None:
var["max_length"] = field["max_length"]
start_variables.append(var)
virtual_start_node: dict[str, Any] = {
"id": cls._VIRTUAL_START_NODE_ID,
"data": {
"type": "start",
"title": "Start",
"variables": start_variables,
},
}
# Create edges from virtual Start to each root candidate.
new_edges: list[dict[str, Any]] = [
{
"source": cls._VIRTUAL_START_NODE_ID,
"sourceHandle": "source",
"target": root_id,
"targetHandle": "target",
}
for root_id in root_candidate_ids
]
return {
**graph_dict,
"nodes": [virtual_start_node, *nodes],
"edges": [*edges, *new_edges],
}
@classmethod
def run_draft_node(
cls,
snippet: CustomizedSnippet,
node_id: str,
user_inputs: Mapping[str, Any],
account: Account,
query: str = "",
files: Sequence[File] | None = None,
) -> WorkflowNodeExecutionModel:
"""
Run a single node in a snippet's draft workflow (single-step debugging).
Retrieves the draft workflow, adapts the snippet to an App-like proxy,
parses file inputs, then delegates to WorkflowService.run_draft_workflow_node.
:param snippet: CustomizedSnippet instance
:param node_id: ID of the node to run
:param user_inputs: User input values for the node
:param account: Account initiating the run
:param query: Optional query string
:param files: Optional parsed file objects
:return: WorkflowNodeExecutionModel with execution results
:raises ValueError: If the snippet has no draft workflow
"""
snippet_service = SnippetService()
draft_workflow = snippet_service.get_draft_workflow(snippet=snippet)
if not draft_workflow:
raise ValueError("Workflow not initialized")
app_proxy = _SnippetAsApp(snippet)
workflow_service = WorkflowService()
return workflow_service.run_draft_workflow_node(
app_model=app_proxy, # type: ignore[arg-type]
draft_workflow=draft_workflow,
node_id=node_id,
user_inputs=user_inputs,
account=account,
query=query,
files=files,
)
@classmethod
def generate_single_iteration(
cls,
snippet: CustomizedSnippet,
user: Union[Account, EndUser],
node_id: str,
args: Mapping[str, Any],
streaming: bool = True,
) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]:
"""
Run a single iteration node in a snippet's draft workflow.
Iteration nodes are container nodes that execute their sub-graph multiple
times, producing many events. Therefore, this uses the full WorkflowAppGenerator
pipeline with SSE streaming (unlike regular single-step node run).
:param snippet: CustomizedSnippet instance
:param user: Account or EndUser initiating the run
:param node_id: ID of the iteration node to run
:param args: Dict containing 'inputs' key with iteration input data
:param streaming: Whether to stream the response (should be True)
:return: SSE streaming generator
:raises ValueError: If the snippet has no draft workflow
"""
snippet_service = SnippetService()
workflow = snippet_service.get_draft_workflow(snippet=snippet)
if not workflow:
raise ValueError("Workflow not initialized")
app_proxy = _SnippetAsApp(snippet)
return WorkflowAppGenerator.convert_to_event_stream(
WorkflowAppGenerator().single_iteration_generate(
app_model=app_proxy, # type: ignore[arg-type]
workflow=workflow,
node_id=node_id,
user=user,
args=args,
streaming=streaming,
)
)
@classmethod
def generate_single_loop(
cls,
snippet: CustomizedSnippet,
user: Union[Account, EndUser],
node_id: str,
args: Any,
streaming: bool = True,
) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]:
"""
Run a single loop node in a snippet's draft workflow.
Loop nodes are container nodes that execute their sub-graph repeatedly,
producing many events. Therefore, this uses the full WorkflowAppGenerator
pipeline with SSE streaming (unlike regular single-step node run).
:param snippet: CustomizedSnippet instance
:param user: Account or EndUser initiating the run
:param node_id: ID of the loop node to run
:param args: Pydantic model with 'inputs' attribute containing loop input data
:param streaming: Whether to stream the response (should be True)
:return: SSE streaming generator
:raises ValueError: If the snippet has no draft workflow
"""
snippet_service = SnippetService()
workflow = snippet_service.get_draft_workflow(snippet=snippet)
if not workflow:
raise ValueError("Workflow not initialized")
app_proxy = _SnippetAsApp(snippet)
return WorkflowAppGenerator.convert_to_event_stream(
WorkflowAppGenerator().single_loop_generate(
app_model=app_proxy, # type: ignore[arg-type]
workflow=workflow,
node_id=node_id,
user=user,
args=args, # type: ignore[arg-type]
streaming=streaming,
)
)
@staticmethod
def parse_files(workflow: Workflow, files: list[dict] | None = None) -> Sequence[File]:
"""
Parse file mappings into File objects based on workflow configuration.
:param workflow: Workflow instance for file upload config
:param files: Raw file mapping dicts
:return: Parsed File objects
"""
files = files or []
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
if file_extra_config is None:
return []
return file_factory.build_from_mappings(
mappings=files,
tenant_id=workflow.tenant_id,
config=file_extra_config,
)

View File

@@ -0,0 +1,566 @@
import json
import logging
from collections.abc import Mapping, Sequence
from datetime import UTC, datetime
from typing import Any
from sqlalchemy import func, select
from sqlalchemy.orm import Session, sessionmaker
from core.workflow.variables.variables import VariableBase
from core.workflow.enums import NodeType
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from extensions.ext_database import db
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models import Account
from models.enums import WorkflowRunTriggeredFrom
from models.snippet import CustomizedSnippet, SnippetType
from models.workflow import (
Workflow,
WorkflowNodeExecutionModel,
WorkflowRun,
WorkflowType,
)
from repositories.factory import DifyAPIRepositoryFactory
from services.errors.app import WorkflowHashNotEqualError
logger = logging.getLogger(__name__)
class SnippetService:
"""Service for managing customized snippets."""
def __init__(self, session_maker: sessionmaker | None = None):
"""Initialize SnippetService with repository dependencies."""
if session_maker is None:
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
self._node_execution_service_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(
session_maker
)
self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
# --- CRUD Operations ---
@staticmethod
def get_snippets(
*,
tenant_id: str,
page: int = 1,
limit: int = 20,
keyword: str | None = None,
) -> tuple[Sequence[CustomizedSnippet], int, bool]:
"""
Get paginated list of snippets with optional search.
:param tenant_id: Tenant ID
:param page: Page number (1-indexed)
:param limit: Number of items per page
:param keyword: Optional search keyword for name/description
:return: Tuple of (snippets list, total count, has_more flag)
"""
stmt = (
select(CustomizedSnippet)
.where(CustomizedSnippet.tenant_id == tenant_id)
.order_by(CustomizedSnippet.created_at.desc())
)
if keyword:
stmt = stmt.where(
CustomizedSnippet.name.ilike(f"%{keyword}%") | CustomizedSnippet.description.ilike(f"%{keyword}%")
)
# Get total count
count_stmt = select(func.count()).select_from(stmt.subquery())
total = db.session.scalar(count_stmt) or 0
# Apply pagination
stmt = stmt.limit(limit + 1).offset((page - 1) * limit)
snippets = list(db.session.scalars(stmt).all())
has_more = len(snippets) > limit
if has_more:
snippets = snippets[:-1]
return snippets, total, has_more
@staticmethod
def get_snippet_by_id(
*,
snippet_id: str,
tenant_id: str,
) -> CustomizedSnippet | None:
"""
Get snippet by ID with tenant isolation.
:param snippet_id: Snippet ID
:param tenant_id: Tenant ID
:return: CustomizedSnippet or None
"""
return (
db.session.query(CustomizedSnippet)
.where(
CustomizedSnippet.id == snippet_id,
CustomizedSnippet.tenant_id == tenant_id,
)
.first()
)
@staticmethod
def create_snippet(
*,
tenant_id: str,
name: str,
description: str | None,
snippet_type: SnippetType,
icon_info: dict | None,
graph: dict | None,
input_fields: list[dict] | None,
account: Account,
) -> CustomizedSnippet:
"""
Create a new snippet.
:param tenant_id: Tenant ID
:param name: Snippet name (must be unique per tenant)
:param description: Snippet description
:param snippet_type: Type of snippet (node or group)
:param icon_info: Icon information
:param graph: Workflow graph structure
:param input_fields: Input field definitions
:param account: Creator account
:return: Created CustomizedSnippet
:raises ValueError: If name already exists
"""
# Check if name already exists for this tenant
existing = (
db.session.query(CustomizedSnippet)
.where(
CustomizedSnippet.tenant_id == tenant_id,
CustomizedSnippet.name == name,
)
.first()
)
if existing:
raise ValueError(f"Snippet with name '{name}' already exists")
snippet = CustomizedSnippet(
tenant_id=tenant_id,
name=name,
description=description or "",
type=snippet_type.value,
icon_info=icon_info,
graph=json.dumps(graph) if graph else None,
input_fields=json.dumps(input_fields) if input_fields else None,
created_by=account.id,
)
db.session.add(snippet)
db.session.commit()
return snippet
@staticmethod
def update_snippet(
*,
session: Session,
snippet: CustomizedSnippet,
account_id: str,
data: dict,
) -> CustomizedSnippet:
"""
Update snippet attributes.
:param session: Database session
:param snippet: Snippet to update
:param account_id: ID of account making the update
:param data: Dictionary of fields to update
:return: Updated CustomizedSnippet
"""
if "name" in data:
# Check if new name already exists for this tenant
existing = (
session.query(CustomizedSnippet)
.where(
CustomizedSnippet.tenant_id == snippet.tenant_id,
CustomizedSnippet.name == data["name"],
CustomizedSnippet.id != snippet.id,
)
.first()
)
if existing:
raise ValueError(f"Snippet with name '{data['name']}' already exists")
snippet.name = data["name"]
if "description" in data:
snippet.description = data["description"]
if "icon_info" in data:
snippet.icon_info = data["icon_info"]
snippet.updated_by = account_id
snippet.updated_at = datetime.now(UTC).replace(tzinfo=None)
session.add(snippet)
return snippet
@staticmethod
def delete_snippet(
*,
session: Session,
snippet: CustomizedSnippet,
) -> bool:
"""
Delete a snippet.
:param session: Database session
:param snippet: Snippet to delete
:return: True if deleted successfully
"""
session.delete(snippet)
return True
# --- Workflow Operations ---
def get_draft_workflow(self, snippet: CustomizedSnippet) -> Workflow | None:
"""
Get draft workflow for snippet.
:param snippet: CustomizedSnippet instance
:return: Draft Workflow or None
"""
workflow = (
db.session.query(Workflow)
.where(
Workflow.tenant_id == snippet.tenant_id,
Workflow.app_id == snippet.id,
Workflow.type == WorkflowType.SNIPPET.value,
Workflow.version == "draft",
)
.first()
)
return workflow
def get_published_workflow(self, snippet: CustomizedSnippet) -> Workflow | None:
"""
Get published workflow for snippet.
:param snippet: CustomizedSnippet instance
:return: Published Workflow or None
"""
if not snippet.workflow_id:
return None
workflow = (
db.session.query(Workflow)
.where(
Workflow.tenant_id == snippet.tenant_id,
Workflow.app_id == snippet.id,
Workflow.type == WorkflowType.SNIPPET.value,
Workflow.id == snippet.workflow_id,
)
.first()
)
return workflow
def sync_draft_workflow(
self,
*,
snippet: CustomizedSnippet,
graph: dict,
unique_hash: str | None,
account: Account,
environment_variables: Sequence[VariableBase],
conversation_variables: Sequence[VariableBase],
input_variables: list[dict] | None = None,
) -> Workflow:
"""
Sync draft workflow for snippet.
:param snippet: CustomizedSnippet instance
:param graph: Workflow graph configuration
:param unique_hash: Hash for conflict detection
:param account: Account making the change
:param environment_variables: Environment variables
:param conversation_variables: Conversation variables
:param input_variables: Input variables for snippet
:return: Synced Workflow
:raises WorkflowHashNotEqualError: If hash mismatch
"""
workflow = self.get_draft_workflow(snippet=snippet)
if workflow and workflow.unique_hash != unique_hash:
raise WorkflowHashNotEqualError()
# Create draft workflow if not found
if not workflow:
workflow = Workflow(
tenant_id=snippet.tenant_id,
app_id=snippet.id,
features="{}",
type=WorkflowType.SNIPPET.value,
version="draft",
graph=json.dumps(graph),
created_by=account.id,
environment_variables=environment_variables,
conversation_variables=conversation_variables,
)
db.session.add(workflow)
db.session.flush()
else:
# Update existing draft workflow
workflow.graph = json.dumps(graph)
workflow.updated_by = account.id
workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
workflow.environment_variables = environment_variables
workflow.conversation_variables = conversation_variables
# Update snippet's input_fields if provided
if input_variables is not None:
snippet.input_fields = json.dumps(input_variables)
snippet.updated_by = account.id
snippet.updated_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()
return workflow
def publish_workflow(
self,
*,
session: Session,
snippet: CustomizedSnippet,
account: Account,
) -> Workflow:
"""
Publish the draft workflow as a new version.
:param session: Database session
:param snippet: CustomizedSnippet instance
:param account: Account making the change
:return: Published Workflow
:raises ValueError: If no draft workflow exists
"""
draft_workflow_stmt = select(Workflow).where(
Workflow.tenant_id == snippet.tenant_id,
Workflow.app_id == snippet.id,
Workflow.type == WorkflowType.SNIPPET.value,
Workflow.version == "draft",
)
draft_workflow = session.scalar(draft_workflow_stmt)
if not draft_workflow:
raise ValueError("No valid workflow found.")
# Create new published workflow
workflow = Workflow.new(
tenant_id=snippet.tenant_id,
app_id=snippet.id,
type=draft_workflow.type,
version=str(datetime.now(UTC).replace(tzinfo=None)),
graph=draft_workflow.graph,
features=draft_workflow.features,
created_by=account.id,
environment_variables=draft_workflow.environment_variables,
conversation_variables=draft_workflow.conversation_variables,
marked_name="",
marked_comment="",
)
session.add(workflow)
# Update snippet version
snippet.version += 1
snippet.is_published = True
snippet.workflow_id = workflow.id
snippet.updated_by = account.id
session.add(snippet)
return workflow
def get_all_published_workflows(
self,
*,
session: Session,
snippet: CustomizedSnippet,
page: int,
limit: int,
) -> tuple[Sequence[Workflow], bool]:
"""
Get all published workflow versions for snippet.
:param session: Database session
:param snippet: CustomizedSnippet instance
:param page: Page number
:param limit: Items per page
:return: Tuple of (workflows list, has_more flag)
"""
if not snippet.workflow_id:
return [], False
stmt = (
select(Workflow)
.where(
Workflow.app_id == snippet.id,
Workflow.type == WorkflowType.SNIPPET.value,
Workflow.version != "draft",
)
.order_by(Workflow.version.desc())
.limit(limit + 1)
.offset((page - 1) * limit)
)
workflows = list(session.scalars(stmt).all())
has_more = len(workflows) > limit
if has_more:
workflows = workflows[:-1]
return workflows, has_more
# --- Default Block Configs ---
def get_default_block_configs(self) -> list[dict]:
"""
Get default block configurations for all node types.
:return: List of default configurations
"""
default_block_configs: list[dict[str, Any]] = []
for node_class_mapping in NODE_TYPE_CLASSES_MAPPING.values():
node_class = node_class_mapping[LATEST_VERSION]
default_config = node_class.get_default_config()
if default_config:
default_block_configs.append(dict(default_config))
return default_block_configs
def get_default_block_config(self, node_type: str, filters: dict | None = None) -> Mapping[str, object] | None:
"""
Get default config for specific node type.
:param node_type: Node type string
:param filters: Optional filters
:return: Default configuration or None
"""
node_type_enum = NodeType(node_type)
if node_type_enum not in NODE_TYPE_CLASSES_MAPPING:
return None
node_class = NODE_TYPE_CLASSES_MAPPING[node_type_enum][LATEST_VERSION]
default_config = node_class.get_default_config(filters=filters)
if not default_config:
return None
return default_config
# --- Workflow Run Operations ---
def get_snippet_workflow_runs(
self,
*,
snippet: CustomizedSnippet,
args: dict,
) -> InfiniteScrollPagination:
"""
Get paginated workflow runs for snippet.
:param snippet: CustomizedSnippet instance
:param args: Request arguments (last_id, limit)
:return: InfiniteScrollPagination result
"""
limit = int(args.get("limit", 20))
last_id = args.get("last_id")
triggered_from_values = [
WorkflowRunTriggeredFrom.DEBUGGING,
]
return self._workflow_run_repo.get_paginated_workflow_runs(
tenant_id=snippet.tenant_id,
app_id=snippet.id,
triggered_from=triggered_from_values,
limit=limit,
last_id=last_id,
)
def get_snippet_workflow_run(
self,
*,
snippet: CustomizedSnippet,
run_id: str,
) -> WorkflowRun | None:
"""
Get workflow run details.
:param snippet: CustomizedSnippet instance
:param run_id: Workflow run ID
:return: WorkflowRun or None
"""
return self._workflow_run_repo.get_workflow_run_by_id(
tenant_id=snippet.tenant_id,
app_id=snippet.id,
run_id=run_id,
)
def get_snippet_workflow_run_node_executions(
self,
*,
snippet: CustomizedSnippet,
run_id: str,
) -> Sequence[WorkflowNodeExecutionModel]:
"""
Get workflow run node execution list.
:param snippet: CustomizedSnippet instance
:param run_id: Workflow run ID
:return: List of WorkflowNodeExecutionModel
"""
workflow_run = self.get_snippet_workflow_run(snippet=snippet, run_id=run_id)
if not workflow_run:
return []
node_executions = self._node_execution_service_repo.get_executions_by_workflow_run(
tenant_id=snippet.tenant_id,
app_id=snippet.id,
workflow_run_id=workflow_run.id,
)
return node_executions
# --- Node Execution Operations ---
def get_snippet_node_last_run(
self,
*,
snippet: CustomizedSnippet,
workflow: Workflow,
node_id: str,
) -> WorkflowNodeExecutionModel | None:
"""
Get the most recent execution for a specific node in a snippet workflow.
:param snippet: CustomizedSnippet instance
:param workflow: Workflow instance
:param node_id: Node identifier
:return: WorkflowNodeExecutionModel or None
"""
return self._node_execution_service_repo.get_node_last_execution(
tenant_id=snippet.tenant_id,
app_id=snippet.id,
workflow_id=workflow.id,
node_id=node_id,
)
# --- Use Count ---
@staticmethod
def increment_use_count(
*,
session: Session,
snippet: CustomizedSnippet,
) -> None:
"""
Increment the use_count when snippet is used.
:param session: Database session
:param snippet: CustomizedSnippet instance
"""
snippet.use_count += 1
session.add(snippet)

View File

@@ -0,0 +1,309 @@
import io
import json
import logging
from typing import Any
from celery import shared_task
from openpyxl import Workbook
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from openpyxl.utils import get_column_letter
from core.evaluation.entities.evaluation_entity import (
EvaluationCategory,
EvaluationItemResult,
EvaluationRunData,
)
from core.evaluation.evaluation_manager import EvaluationManager
from core.evaluation.runners.agent_evaluation_runner import AgentEvaluationRunner
from core.evaluation.runners.llm_evaluation_runner import LLMEvaluationRunner
from core.evaluation.runners.retrieval_evaluation_runner import RetrievalEvaluationRunner
from core.evaluation.runners.workflow_evaluation_runner import WorkflowEvaluationRunner
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.evaluation import EvaluationRun, EvaluationRunStatus
logger = logging.getLogger(__name__)
@shared_task(queue="evaluation")
def run_evaluation(run_data_dict: dict[str, Any]) -> None:
"""Celery task for running evaluations asynchronously.
Workflow:
1. Deserialize EvaluationRunData
2. Update status to RUNNING
3. Select appropriate Runner based on evaluation_category
4. Execute runner.run() which handles target execution + metric computation
5. Generate result XLSX
6. Update EvaluationRun status to COMPLETED
"""
run_data = EvaluationRunData.model_validate(run_data_dict)
with db.engine.connect() as connection:
from sqlalchemy.orm import Session
session = Session(bind=connection)
try:
_execute_evaluation(session, run_data)
except Exception as e:
logger.exception("Evaluation run %s failed", run_data.evaluation_run_id)
_mark_run_failed(session, run_data.evaluation_run_id, str(e))
finally:
session.close()
def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None:
"""Core evaluation execution logic."""
evaluation_run = session.query(EvaluationRun).filter_by(id=run_data.evaluation_run_id).first()
if not evaluation_run:
logger.error("EvaluationRun %s not found", run_data.evaluation_run_id)
return
# Check if cancelled
if evaluation_run.status == EvaluationRunStatus.CANCELLED:
logger.info("EvaluationRun %s was cancelled", run_data.evaluation_run_id)
return
# Get evaluation instance
evaluation_instance = EvaluationManager.get_evaluation_instance()
if evaluation_instance is None:
raise ValueError("Evaluation framework not configured")
# Select runner based on category
runner = _create_runner(run_data.evaluation_category, evaluation_instance, session)
# Execute evaluation
results = runner.run(
evaluation_run_id=run_data.evaluation_run_id,
tenant_id=run_data.tenant_id,
target_id=run_data.target_id,
target_type=run_data.target_type,
items=run_data.items,
metrics_config=run_data.metrics_config,
model_provider=run_data.evaluation_model_provider,
model_name=run_data.evaluation_model,
)
# Compute summary metrics
metrics_summary = _compute_metrics_summary(results)
# Generate result XLSX
result_xlsx = _generate_result_xlsx(run_data.items, results)
# Store result file
result_file_id = _store_result_file(
run_data.tenant_id, run_data.evaluation_run_id, result_xlsx, session
)
# Update run to completed
evaluation_run = session.query(EvaluationRun).filter_by(id=run_data.evaluation_run_id).first()
if evaluation_run:
evaluation_run.status = EvaluationRunStatus.COMPLETED
evaluation_run.completed_at = naive_utc_now()
evaluation_run.metrics_summary = json.dumps(metrics_summary)
if result_file_id:
evaluation_run.result_file_id = result_file_id
session.commit()
logger.info("Evaluation run %s completed successfully", run_data.evaluation_run_id)
def _create_runner(
category: EvaluationCategory,
evaluation_instance: Any,
session: Any,
) -> Any:
"""Create the appropriate runner for the evaluation category."""
match category:
case EvaluationCategory.LLM:
return LLMEvaluationRunner(evaluation_instance, session)
case EvaluationCategory.RETRIEVAL:
return RetrievalEvaluationRunner(evaluation_instance, session)
case EvaluationCategory.AGENT:
return AgentEvaluationRunner(evaluation_instance, session)
case EvaluationCategory.WORKFLOW:
return WorkflowEvaluationRunner(evaluation_instance, session)
case _:
raise ValueError(f"Unknown evaluation category: {category}")
def _mark_run_failed(session: Any, run_id: str, error: str) -> None:
"""Mark an evaluation run as failed."""
try:
evaluation_run = session.query(EvaluationRun).filter_by(id=run_id).first()
if evaluation_run:
evaluation_run.status = EvaluationRunStatus.FAILED
evaluation_run.error = error[:2000] # Truncate error
evaluation_run.completed_at = naive_utc_now()
session.commit()
except Exception:
logger.exception("Failed to mark run %s as failed", run_id)
def _compute_metrics_summary(results: list[EvaluationItemResult]) -> dict[str, Any]:
"""Compute average scores per metric across all results."""
metric_scores: dict[str, list[float]] = {}
for result in results:
if result.error:
continue
for metric in result.metrics:
if metric.name not in metric_scores:
metric_scores[metric.name] = []
metric_scores[metric.name].append(metric.score)
summary: dict[str, Any] = {}
for name, scores in metric_scores.items():
summary[name] = {
"average": sum(scores) / len(scores) if scores else 0.0,
"min": min(scores) if scores else 0.0,
"max": max(scores) if scores else 0.0,
"count": len(scores),
}
# Overall average
all_scores = [s for scores in metric_scores.values() for s in scores]
summary["_overall"] = {
"average": sum(all_scores) / len(all_scores) if all_scores else 0.0,
"total_items": len(results),
"successful_items": sum(1 for r in results if r.error is None),
"failed_items": sum(1 for r in results if r.error is not None),
}
return summary
def _generate_result_xlsx(
items: list[Any],
results: list[EvaluationItemResult],
) -> bytes:
"""Generate result XLSX with input data, actual output, and metric scores."""
wb = Workbook()
ws = wb.active
ws.title = "Evaluation Results"
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_alignment = Alignment(horizontal="center", vertical="center")
thin_border = Border(
left=Side(style="thin"),
right=Side(style="thin"),
top=Side(style="thin"),
bottom=Side(style="thin"),
)
# Collect all metric names
all_metric_names: list[str] = []
for result in results:
for metric in result.metrics:
if metric.name not in all_metric_names:
all_metric_names.append(metric.name)
# Collect all input keys
input_keys: list[str] = []
for item in items:
for key in item.inputs:
if key not in input_keys:
input_keys.append(key)
# Build headers
headers = ["index"] + input_keys + ["expected_output", "actual_output"] + all_metric_names + ["overall_score", "error"]
# Write header row
for col_idx, header in enumerate(headers, start=1):
cell = ws.cell(row=1, column=col_idx, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
cell.border = thin_border
# Set column widths
ws.column_dimensions["A"].width = 10
for col_idx in range(2, len(headers) + 1):
ws.column_dimensions[get_column_letter(col_idx)].width = 25
# Build result lookup
result_by_index = {r.index: r for r in results}
# Write data rows
for row_idx, item in enumerate(items, start=2):
result = result_by_index.get(item.index)
col = 1
# Index
ws.cell(row=row_idx, column=col, value=item.index).border = thin_border
col += 1
# Input values
for key in input_keys:
val = item.inputs.get(key, "")
ws.cell(row=row_idx, column=col, value=str(val)).border = thin_border
col += 1
# Expected output
ws.cell(row=row_idx, column=col, value=item.expected_output or "").border = thin_border
col += 1
# Actual output
ws.cell(row=row_idx, column=col, value=result.actual_output if result else "").border = thin_border
col += 1
# Metric scores
metric_scores = {m.name: m.score for m in result.metrics} if result else {}
for metric_name in all_metric_names:
score = metric_scores.get(metric_name)
ws.cell(row=row_idx, column=col, value=score if score is not None else "").border = thin_border
col += 1
# Overall score
ws.cell(
row=row_idx, column=col, value=result.overall_score if result else ""
).border = thin_border
col += 1
# Error
ws.cell(row=row_idx, column=col, value=result.error if result else "").border = thin_border
output = io.BytesIO()
wb.save(output)
output.seek(0)
return output.getvalue()
def _store_result_file(
tenant_id: str,
run_id: str,
xlsx_content: bytes,
session: Any,
) -> str | None:
"""Store result XLSX file and return the UploadFile ID."""
try:
from extensions.ext_storage import storage
from models.model import UploadFile
from libs.uuid_utils import uuidv7
file_id = str(uuidv7())
filename = f"evaluation-result-{run_id[:8]}.xlsx"
storage_key = f"evaluation_results/{tenant_id}/{file_id}.xlsx"
storage.save(storage_key, xlsx_content)
upload_file = UploadFile(
id=file_id,
tenant_id=tenant_id,
storage_type="evaluation_result",
key=storage_key,
name=filename,
size=len(xlsx_content),
extension="xlsx",
mime_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
created_by_role="account",
created_by="system",
)
session.add(upload_file)
session.commit()
return file_id
except Exception:
logger.exception("Failed to store result file for run %s", run_id)
return None