mirror of
https://github.com/langgenius/dify.git
synced 2026-01-30 03:54:26 +00:00
Compare commits
7 Commits
main
...
feat/otel-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8764e401ea | ||
|
|
775b28d3c9 | ||
|
|
fbbe8a1be9 | ||
|
|
6405a30278 | ||
|
|
6cc38c61cc | ||
|
|
df30383fe9 | ||
|
|
a8e3c5701d |
@@ -81,6 +81,7 @@ def initialize_extensions(app: DifyApp):
|
||||
ext_commands,
|
||||
ext_compress,
|
||||
ext_database,
|
||||
ext_enterprise_telemetry,
|
||||
ext_fastopenapi,
|
||||
ext_forward_refs,
|
||||
ext_hosting_provider,
|
||||
@@ -131,6 +132,7 @@ def initialize_extensions(app: DifyApp):
|
||||
ext_commands,
|
||||
ext_fastopenapi,
|
||||
ext_otel,
|
||||
ext_enterprise_telemetry,
|
||||
ext_request_logging,
|
||||
ext_session_factory,
|
||||
]
|
||||
|
||||
@@ -8,7 +8,7 @@ from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, Settings
|
||||
from libs.file_utils import search_file_upwards
|
||||
|
||||
from .deploy import DeploymentConfig
|
||||
from .enterprise import EnterpriseFeatureConfig
|
||||
from .enterprise import EnterpriseFeatureConfig, EnterpriseTelemetryConfig
|
||||
from .extra import ExtraServiceConfig
|
||||
from .feature import FeatureConfig
|
||||
from .middleware import MiddlewareConfig
|
||||
@@ -73,6 +73,8 @@ class DifyConfig(
|
||||
# Enterprise feature configs
|
||||
# **Before using, please contact business@dify.ai by email to inquire about licensing matters.**
|
||||
EnterpriseFeatureConfig,
|
||||
# Enterprise telemetry configs
|
||||
EnterpriseTelemetryConfig,
|
||||
):
|
||||
model_config = SettingsConfigDict(
|
||||
# read from dotenv format config file
|
||||
|
||||
@@ -18,3 +18,39 @@ class EnterpriseFeatureConfig(BaseSettings):
|
||||
description="Allow customization of the enterprise logo.",
|
||||
default=False,
|
||||
)
|
||||
|
||||
|
||||
class EnterpriseTelemetryConfig(BaseSettings):
|
||||
"""
|
||||
Configuration for enterprise telemetry.
|
||||
"""
|
||||
|
||||
ENTERPRISE_TELEMETRY_ENABLED: bool = Field(
|
||||
description="Enable enterprise telemetry collection (also requires ENTERPRISE_ENABLED=true).",
|
||||
default=False,
|
||||
)
|
||||
|
||||
ENTERPRISE_OTLP_ENDPOINT: str = Field(
|
||||
description="Enterprise OTEL collector endpoint.",
|
||||
default="",
|
||||
)
|
||||
|
||||
ENTERPRISE_OTLP_HEADERS: str = Field(
|
||||
description="Auth headers for OTLP export (key=value,key2=value2).",
|
||||
default="",
|
||||
)
|
||||
|
||||
ENTERPRISE_INCLUDE_CONTENT: bool = Field(
|
||||
description="Include input/output content in traces (privacy toggle).",
|
||||
default=True,
|
||||
)
|
||||
|
||||
ENTERPRISE_SERVICE_NAME: str = Field(
|
||||
description="Service name for OTEL resource.",
|
||||
default="dify",
|
||||
)
|
||||
|
||||
ENTERPRISE_OTEL_SAMPLING_RATE: float = Field(
|
||||
description="Sampling rate for enterprise traces (0.0 to 1.0, default 1.0 = 100%).",
|
||||
default=1.0,
|
||||
)
|
||||
|
||||
@@ -147,9 +147,12 @@ class WorkflowAppGenerator(BaseAppGenerator):
|
||||
|
||||
inputs: Mapping[str, Any] = args["inputs"]
|
||||
|
||||
extras = {
|
||||
extras: dict[str, Any] = {
|
||||
**extract_external_trace_id_from_args(args),
|
||||
}
|
||||
parent_trace_context = args.get("_parent_trace_context")
|
||||
if parent_trace_context:
|
||||
extras["parent_trace_context"] = parent_trace_context
|
||||
workflow_run_id = str(uuid.uuid4())
|
||||
# FIXME (Yeuoly): we need to remove the SKIP_PREPARE_USER_INPUTS_KEY from the args
|
||||
# trigger shouldn't prepare user inputs
|
||||
|
||||
@@ -373,6 +373,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
||||
|
||||
self._workflow_node_execution_repository.save(domain_execution)
|
||||
self._workflow_node_execution_repository.save_execution_data(domain_execution)
|
||||
self._enqueue_node_trace_task(domain_execution)
|
||||
|
||||
def _fail_running_node_executions(self, *, error_message: str) -> None:
|
||||
now = naive_utc_now()
|
||||
@@ -390,8 +391,10 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
||||
|
||||
conversation_id = self._system_variables().get(SystemVariableKey.CONVERSATION_ID.value)
|
||||
external_trace_id = None
|
||||
parent_trace_context = None
|
||||
if isinstance(self._application_generate_entity, (WorkflowAppGenerateEntity, AdvancedChatAppGenerateEntity)):
|
||||
external_trace_id = self._application_generate_entity.extras.get("external_trace_id")
|
||||
parent_trace_context = self._application_generate_entity.extras.get("parent_trace_context")
|
||||
|
||||
trace_task = TraceTask(
|
||||
TraceTaskName.WORKFLOW_TRACE,
|
||||
@@ -399,6 +402,55 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
||||
conversation_id=conversation_id,
|
||||
user_id=self._trace_manager.user_id,
|
||||
external_trace_id=external_trace_id,
|
||||
parent_trace_context=parent_trace_context,
|
||||
)
|
||||
self._trace_manager.add_trace_task(trace_task)
|
||||
|
||||
def _enqueue_node_trace_task(self, domain_execution: WorkflowNodeExecution) -> None:
|
||||
if not self._trace_manager:
|
||||
return
|
||||
|
||||
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
|
||||
|
||||
if not is_enterprise_telemetry_enabled():
|
||||
return
|
||||
|
||||
execution = self._get_workflow_execution()
|
||||
meta = domain_execution.metadata or {}
|
||||
|
||||
node_data: dict[str, Any] = {
|
||||
"workflow_id": domain_execution.workflow_id,
|
||||
"workflow_execution_id": execution.id_,
|
||||
"tenant_id": self._application_generate_entity.app_config.tenant_id,
|
||||
"app_id": self._application_generate_entity.app_config.app_id,
|
||||
"node_execution_id": domain_execution.id,
|
||||
"node_id": domain_execution.node_id,
|
||||
"node_type": str(domain_execution.node_type.value),
|
||||
"title": domain_execution.title,
|
||||
"status": str(domain_execution.status.value),
|
||||
"error": domain_execution.error,
|
||||
"elapsed_time": domain_execution.elapsed_time,
|
||||
"index": domain_execution.index,
|
||||
"predecessor_node_id": domain_execution.predecessor_node_id,
|
||||
"total_tokens": meta.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS, 0),
|
||||
"total_price": meta.get(WorkflowNodeExecutionMetadataKey.TOTAL_PRICE, 0.0),
|
||||
"currency": meta.get(WorkflowNodeExecutionMetadataKey.CURRENCY),
|
||||
"tool_name": (meta.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO) or {}).get("tool_name")
|
||||
if isinstance(meta.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO), dict)
|
||||
else None,
|
||||
"iteration_id": meta.get(WorkflowNodeExecutionMetadataKey.ITERATION_ID),
|
||||
"iteration_index": meta.get(WorkflowNodeExecutionMetadataKey.ITERATION_INDEX),
|
||||
"loop_id": meta.get(WorkflowNodeExecutionMetadataKey.LOOP_ID),
|
||||
"loop_index": meta.get(WorkflowNodeExecutionMetadataKey.LOOP_INDEX),
|
||||
"parallel_id": meta.get(WorkflowNodeExecutionMetadataKey.PARALLEL_ID),
|
||||
"node_inputs": dict(domain_execution.inputs) if domain_execution.inputs else None,
|
||||
"node_outputs": dict(domain_execution.outputs) if domain_execution.outputs else None,
|
||||
"process_data": dict(domain_execution.process_data) if domain_execution.process_data else None,
|
||||
}
|
||||
|
||||
trace_task = TraceTask(
|
||||
TraceTaskName.NODE_EXECUTION_TRACE,
|
||||
node_execution_data=node_data,
|
||||
)
|
||||
self._trace_manager.add_trace_task(trace_task)
|
||||
|
||||
|
||||
@@ -114,6 +114,46 @@ class GenerateNameTraceInfo(BaseTraceInfo):
|
||||
tenant_id: str
|
||||
|
||||
|
||||
class WorkflowNodeTraceInfo(BaseTraceInfo):
|
||||
workflow_id: str
|
||||
workflow_run_id: str
|
||||
tenant_id: str
|
||||
node_execution_id: str
|
||||
node_id: str
|
||||
node_type: str
|
||||
title: str
|
||||
|
||||
status: str
|
||||
error: str | None = None
|
||||
elapsed_time: float
|
||||
|
||||
index: int
|
||||
predecessor_node_id: str | None = None
|
||||
|
||||
total_tokens: int = 0
|
||||
total_price: float = 0.0
|
||||
currency: str | None = None
|
||||
|
||||
model_provider: str | None = None
|
||||
model_name: str | None = None
|
||||
prompt_tokens: int | None = None
|
||||
completion_tokens: int | None = None
|
||||
|
||||
tool_name: str | None = None
|
||||
|
||||
iteration_id: str | None = None
|
||||
iteration_index: int | None = None
|
||||
loop_id: str | None = None
|
||||
loop_index: int | None = None
|
||||
parallel_id: str | None = None
|
||||
|
||||
node_inputs: Mapping[str, Any] | None = None
|
||||
node_outputs: Mapping[str, Any] | None = None
|
||||
process_data: Mapping[str, Any] | None = None
|
||||
|
||||
model_config = ConfigDict(protected_namespaces=())
|
||||
|
||||
|
||||
class TaskData(BaseModel):
|
||||
app_id: str
|
||||
trace_info_type: str
|
||||
@@ -128,6 +168,7 @@ trace_info_info_map = {
|
||||
"DatasetRetrievalTraceInfo": DatasetRetrievalTraceInfo,
|
||||
"ToolTraceInfo": ToolTraceInfo,
|
||||
"GenerateNameTraceInfo": GenerateNameTraceInfo,
|
||||
"WorkflowNodeTraceInfo": WorkflowNodeTraceInfo,
|
||||
}
|
||||
|
||||
|
||||
@@ -141,3 +182,4 @@ class TraceTaskName(StrEnum):
|
||||
TOOL_TRACE = "tool"
|
||||
GENERATE_NAME_TRACE = "generate_conversation_name"
|
||||
DATASOURCE_TRACE = "datasource"
|
||||
NODE_EXECUTION_TRACE = "node_execution"
|
||||
|
||||
@@ -28,6 +28,7 @@ from core.ops.entities.trace_entity import (
|
||||
TaskData,
|
||||
ToolTraceInfo,
|
||||
TraceTaskName,
|
||||
WorkflowNodeTraceInfo,
|
||||
WorkflowTraceInfo,
|
||||
)
|
||||
from core.ops.utils import get_message_data
|
||||
@@ -528,6 +529,7 @@ class TraceTask:
|
||||
TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace(
|
||||
conversation_id=self.conversation_id, timer=self.timer, **self.kwargs
|
||||
),
|
||||
TraceTaskName.NODE_EXECUTION_TRACE: lambda: self.node_execution_trace(**self.kwargs),
|
||||
}
|
||||
|
||||
return preprocess_map.get(self.trace_type, lambda: None)()
|
||||
@@ -583,7 +585,7 @@ class TraceTask:
|
||||
)
|
||||
message_id = session.scalar(message_data_stmt)
|
||||
|
||||
metadata = {
|
||||
metadata: dict[str, Any] = {
|
||||
"workflow_id": workflow_id,
|
||||
"conversation_id": conversation_id,
|
||||
"workflow_run_id": workflow_run_id,
|
||||
@@ -598,6 +600,10 @@ class TraceTask:
|
||||
"app_id": workflow_run.app_id,
|
||||
}
|
||||
|
||||
parent_trace_context = self.kwargs.get("parent_trace_context")
|
||||
if parent_trace_context:
|
||||
metadata["parent_trace_context"] = parent_trace_context
|
||||
|
||||
workflow_trace_info = WorkflowTraceInfo(
|
||||
trace_id=self.trace_id,
|
||||
workflow_data=workflow_run.to_dict(),
|
||||
@@ -905,6 +911,53 @@ class TraceTask:
|
||||
|
||||
return generate_name_trace_info
|
||||
|
||||
def node_execution_trace(self, **kwargs) -> WorkflowNodeTraceInfo | dict:
|
||||
"""
|
||||
Trace for a single node execution. Used for enterprise telemetry.
|
||||
"""
|
||||
node_data: dict = kwargs.get("node_execution_data", {})
|
||||
if not node_data:
|
||||
return {}
|
||||
|
||||
metadata = {
|
||||
"tenant_id": node_data.get("tenant_id"),
|
||||
"app_id": node_data.get("app_id"),
|
||||
}
|
||||
|
||||
return WorkflowNodeTraceInfo(
|
||||
trace_id=self.trace_id,
|
||||
message_id=None,
|
||||
metadata=metadata,
|
||||
workflow_id=node_data.get("workflow_id", ""),
|
||||
workflow_run_id=node_data.get("workflow_execution_id", ""),
|
||||
tenant_id=node_data.get("tenant_id", ""),
|
||||
node_execution_id=node_data.get("node_execution_id", ""),
|
||||
node_id=node_data.get("node_id", ""),
|
||||
node_type=node_data.get("node_type", ""),
|
||||
title=node_data.get("title", ""),
|
||||
status=node_data.get("status", ""),
|
||||
error=node_data.get("error"),
|
||||
elapsed_time=node_data.get("elapsed_time", 0.0),
|
||||
index=node_data.get("index", 0),
|
||||
predecessor_node_id=node_data.get("predecessor_node_id"),
|
||||
total_tokens=node_data.get("total_tokens", 0),
|
||||
total_price=node_data.get("total_price", 0.0),
|
||||
currency=node_data.get("currency"),
|
||||
model_provider=node_data.get("model_provider"),
|
||||
model_name=node_data.get("model_name"),
|
||||
prompt_tokens=node_data.get("prompt_tokens"),
|
||||
completion_tokens=node_data.get("completion_tokens"),
|
||||
tool_name=node_data.get("tool_name"),
|
||||
iteration_id=node_data.get("iteration_id"),
|
||||
iteration_index=node_data.get("iteration_index"),
|
||||
loop_id=node_data.get("loop_id"),
|
||||
loop_index=node_data.get("loop_index"),
|
||||
parallel_id=node_data.get("parallel_id"),
|
||||
node_inputs=node_data.get("node_inputs"),
|
||||
node_outputs=node_data.get("node_outputs"),
|
||||
process_data=node_data.get("process_data"),
|
||||
)
|
||||
|
||||
def _extract_streaming_metrics(self, message_data) -> dict:
|
||||
if not message_data.message_metadata:
|
||||
return {}
|
||||
@@ -938,13 +991,17 @@ class TraceQueueManager:
|
||||
self.user_id = user_id
|
||||
self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
|
||||
self.flask_app = current_app._get_current_object() # type: ignore
|
||||
|
||||
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
|
||||
|
||||
self._enterprise_telemetry_enabled = is_enterprise_telemetry_enabled()
|
||||
if trace_manager_timer is None:
|
||||
self.start_timer()
|
||||
|
||||
def add_trace_task(self, trace_task: TraceTask):
|
||||
global trace_manager_timer, trace_manager_queue
|
||||
try:
|
||||
if self.trace_instance:
|
||||
if self._enterprise_telemetry_enabled or self.trace_instance:
|
||||
trace_task.app_id = self.app_id
|
||||
trace_manager_queue.put(trace_task)
|
||||
except Exception:
|
||||
|
||||
@@ -50,6 +50,7 @@ class WorkflowTool(Tool):
|
||||
self.workflow_call_depth = workflow_call_depth
|
||||
self.label = label
|
||||
self._latest_usage = LLMUsage.empty_usage()
|
||||
self.parent_trace_context: dict[str, str] | None = None
|
||||
|
||||
super().__init__(entity=entity, runtime=runtime)
|
||||
|
||||
@@ -90,11 +91,15 @@ class WorkflowTool(Tool):
|
||||
|
||||
self._latest_usage = LLMUsage.empty_usage()
|
||||
|
||||
args: dict[str, Any] = {"inputs": tool_parameters, "files": files}
|
||||
if self.parent_trace_context:
|
||||
args["_parent_trace_context"] = self.parent_trace_context
|
||||
|
||||
result = generator.generate(
|
||||
app_model=app,
|
||||
workflow=workflow,
|
||||
user=user,
|
||||
args={"inputs": tool_parameters, "files": files},
|
||||
args=args,
|
||||
invoke_from=self.runtime.invoke_from,
|
||||
streaming=False,
|
||||
call_depth=self.workflow_call_depth + 1,
|
||||
|
||||
@@ -105,6 +105,17 @@ class ToolNode(Node[ToolNodeData]):
|
||||
# get conversation id
|
||||
conversation_id = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.CONVERSATION_ID])
|
||||
|
||||
from core.tools.workflow_as_tool.tool import WorkflowTool
|
||||
|
||||
if isinstance(tool_runtime, WorkflowTool):
|
||||
workflow_run_id_var = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.WORKFLOW_RUN_ID])
|
||||
tool_runtime.parent_trace_context = {
|
||||
"trace_id": str(workflow_run_id_var.text) if workflow_run_id_var else "",
|
||||
"parent_node_execution_id": self.execution_id,
|
||||
"parent_workflow_run_id": str(workflow_run_id_var.text) if workflow_run_id_var else "",
|
||||
"parent_app_id": self.app_id,
|
||||
}
|
||||
|
||||
try:
|
||||
message_stream = ToolEngine.generic_invoke(
|
||||
tool=tool_runtime,
|
||||
|
||||
0
api/enterprise/__init__.py
Normal file
0
api/enterprise/__init__.py
Normal file
0
api/enterprise/telemetry/__init__.py
Normal file
0
api/enterprise/telemetry/__init__.py
Normal file
336
api/enterprise/telemetry/enterprise_trace.py
Normal file
336
api/enterprise/telemetry/enterprise_trace.py
Normal file
@@ -0,0 +1,336 @@
|
||||
"""Enterprise trace handler — duck-typed, NOT a BaseTraceInstance subclass.
|
||||
|
||||
Invoked directly in the Celery task, not through OpsTraceManager dispatch.
|
||||
Only requires a matching ``trace(trace_info)`` method signature.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from core.ops.entities.trace_entity import (
|
||||
BaseTraceInfo,
|
||||
DatasetRetrievalTraceInfo,
|
||||
GenerateNameTraceInfo,
|
||||
MessageTraceInfo,
|
||||
ModerationTraceInfo,
|
||||
SuggestedQuestionTraceInfo,
|
||||
ToolTraceInfo,
|
||||
WorkflowNodeTraceInfo,
|
||||
WorkflowTraceInfo,
|
||||
)
|
||||
from enterprise.telemetry.entities import (
|
||||
EnterpriseTelemetryCounter,
|
||||
EnterpriseTelemetryHistogram,
|
||||
EnterpriseTelemetrySpan,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EnterpriseDataTrace:
|
||||
"""Duck-typed enterprise trace handler.
|
||||
|
||||
Each ``*_trace`` method emits spans and metrics to the EnterpriseExporter.
|
||||
Content-gated attributes are only included when ``include_content`` is True.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||
|
||||
exporter = get_enterprise_exporter()
|
||||
if exporter is None:
|
||||
raise RuntimeError("EnterpriseDataTrace instantiated but exporter is not initialized")
|
||||
self._exporter = exporter
|
||||
|
||||
def trace(self, trace_info: BaseTraceInfo) -> None:
|
||||
if isinstance(trace_info, WorkflowTraceInfo):
|
||||
self._workflow_trace(trace_info)
|
||||
elif isinstance(trace_info, MessageTraceInfo):
|
||||
self._message_trace(trace_info)
|
||||
elif isinstance(trace_info, ToolTraceInfo):
|
||||
self._tool_trace(trace_info)
|
||||
elif isinstance(trace_info, WorkflowNodeTraceInfo):
|
||||
self._node_execution_trace(trace_info)
|
||||
elif isinstance(trace_info, ModerationTraceInfo):
|
||||
self._moderation_trace(trace_info)
|
||||
elif isinstance(trace_info, SuggestedQuestionTraceInfo):
|
||||
self._suggested_question_trace(trace_info)
|
||||
elif isinstance(trace_info, DatasetRetrievalTraceInfo):
|
||||
self._dataset_retrieval_trace(trace_info)
|
||||
elif isinstance(trace_info, GenerateNameTraceInfo):
|
||||
self._generate_name_trace(trace_info)
|
||||
|
||||
def _common_attrs(self, trace_info: BaseTraceInfo) -> dict[str, Any]:
|
||||
return {
|
||||
"dify.trace_id": trace_info.trace_id,
|
||||
"dify.tenant_id": trace_info.metadata.get("tenant_id"),
|
||||
"dify.app_id": trace_info.metadata.get("app_id"),
|
||||
"gen_ai.user.id": trace_info.metadata.get("user_id"),
|
||||
"dify.message.id": trace_info.message_id,
|
||||
}
|
||||
|
||||
def _maybe_json(self, value: Any) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, str):
|
||||
return value
|
||||
try:
|
||||
return json.dumps(value, default=str)
|
||||
except (TypeError, ValueError):
|
||||
return str(value)
|
||||
|
||||
def _workflow_trace(self, info: WorkflowTraceInfo) -> None:
|
||||
attrs = self._common_attrs(info)
|
||||
attrs.update(
|
||||
{
|
||||
"dify.workflow.id": info.workflow_id,
|
||||
"dify.workflow.run_id": info.workflow_run_id,
|
||||
"dify.workflow.status": info.workflow_run_status,
|
||||
"dify.workflow.elapsed_time": info.workflow_run_elapsed_time,
|
||||
"dify.workflow.version": info.workflow_run_version,
|
||||
"dify.workflow.error": info.error,
|
||||
"gen_ai.usage.total_tokens": info.total_tokens,
|
||||
"dify.invoke_from": info.metadata.get("triggered_from"),
|
||||
"dify.conversation.id": info.conversation_id,
|
||||
}
|
||||
)
|
||||
|
||||
parent_ctx = info.metadata.get("parent_trace_context")
|
||||
if parent_ctx and isinstance(parent_ctx, dict):
|
||||
attrs["dify.parent.trace_id"] = parent_ctx.get("trace_id")
|
||||
attrs["dify.parent.node.execution_id"] = parent_ctx.get("parent_node_execution_id")
|
||||
attrs["dify.parent.workflow.run_id"] = parent_ctx.get("parent_workflow_run_id")
|
||||
attrs["dify.parent.app.id"] = parent_ctx.get("parent_app_id")
|
||||
|
||||
if self._exporter.include_content:
|
||||
attrs["dify.workflow.inputs"] = self._maybe_json(info.workflow_run_inputs)
|
||||
attrs["dify.workflow.outputs"] = self._maybe_json(info.workflow_run_outputs)
|
||||
attrs["dify.workflow.query"] = info.query
|
||||
|
||||
self._exporter.export_span(EnterpriseTelemetrySpan.WORKFLOW_RUN, attrs)
|
||||
|
||||
labels = {
|
||||
"tenant_id": info.tenant_id,
|
||||
"app_id": info.metadata.get("app_id", ""),
|
||||
}
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels)
|
||||
self._exporter.increment_counter(
|
||||
EnterpriseTelemetryCounter.REQUESTS,
|
||||
1,
|
||||
{**labels, "type": "workflow", "status": info.workflow_run_status},
|
||||
)
|
||||
self._exporter.record_histogram(
|
||||
EnterpriseTelemetryHistogram.WORKFLOW_DURATION,
|
||||
float(info.workflow_run_elapsed_time),
|
||||
{**labels, "status": info.workflow_run_status},
|
||||
)
|
||||
|
||||
if info.error:
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "workflow"})
|
||||
|
||||
def _message_trace(self, info: MessageTraceInfo) -> None:
|
||||
attrs = self._common_attrs(info)
|
||||
attrs.update(
|
||||
{
|
||||
"dify.conversation.id": info.metadata.get("conversation_id"),
|
||||
"dify.conversation.mode": info.conversation_mode,
|
||||
"gen_ai.provider.name": info.metadata.get("ls_provider"),
|
||||
"gen_ai.request.model": info.metadata.get("ls_model_name"),
|
||||
"gen_ai.usage.input_tokens": info.message_tokens,
|
||||
"gen_ai.usage.output_tokens": info.answer_tokens,
|
||||
"gen_ai.usage.total_tokens": info.total_tokens,
|
||||
"dify.message.status": info.metadata.get("status"),
|
||||
"dify.message.error": info.error,
|
||||
"dify.message.from_source": info.metadata.get("from_source"),
|
||||
"dify.message.from_end_user_id": info.metadata.get("from_end_user_id"),
|
||||
"dify.message.from_account_id": info.metadata.get("from_account_id"),
|
||||
"dify.streaming": info.is_streaming_request,
|
||||
"dify.message.time_to_first_token": info.gen_ai_server_time_to_first_token,
|
||||
"dify.message.streaming_duration": info.llm_streaming_time_to_generate,
|
||||
}
|
||||
)
|
||||
|
||||
if self._exporter.include_content:
|
||||
attrs["dify.message.inputs"] = self._maybe_json(info.inputs)
|
||||
attrs["dify.message.outputs"] = self._maybe_json(info.outputs)
|
||||
|
||||
self._exporter.export_span(EnterpriseTelemetrySpan.MESSAGE_RUN, attrs)
|
||||
|
||||
labels = {
|
||||
"tenant_id": info.metadata.get("tenant_id", ""),
|
||||
"app_id": info.metadata.get("app_id", ""),
|
||||
"model_provider": info.metadata.get("ls_provider", ""),
|
||||
"model_name": info.metadata.get("ls_model_name", ""),
|
||||
}
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels)
|
||||
self._exporter.increment_counter(
|
||||
EnterpriseTelemetryCounter.REQUESTS,
|
||||
1,
|
||||
{**labels, "type": "message", "status": info.metadata.get("status", "")},
|
||||
)
|
||||
|
||||
if info.start_time and info.end_time:
|
||||
duration = (info.end_time - info.start_time).total_seconds()
|
||||
self._exporter.record_histogram(EnterpriseTelemetryHistogram.MESSAGE_DURATION, duration, labels)
|
||||
|
||||
if info.gen_ai_server_time_to_first_token is not None:
|
||||
self._exporter.record_histogram(
|
||||
EnterpriseTelemetryHistogram.MESSAGE_TTFT, info.gen_ai_server_time_to_first_token, labels
|
||||
)
|
||||
|
||||
if info.error:
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "message"})
|
||||
|
||||
def _node_execution_trace(self, info: WorkflowNodeTraceInfo) -> None:
|
||||
attrs = {
|
||||
"dify.trace_id": info.trace_id,
|
||||
"dify.tenant_id": info.tenant_id,
|
||||
"dify.workflow.id": info.workflow_id,
|
||||
"dify.workflow.run_id": info.workflow_run_id,
|
||||
"dify.node.execution_id": info.node_execution_id,
|
||||
"dify.node.id": info.node_id,
|
||||
"dify.node.type": info.node_type,
|
||||
"dify.node.title": info.title,
|
||||
"dify.node.status": info.status,
|
||||
"dify.node.error": info.error,
|
||||
"dify.node.elapsed_time": info.elapsed_time,
|
||||
"dify.node.index": info.index,
|
||||
"dify.node.predecessor_node_id": info.predecessor_node_id,
|
||||
"gen_ai.usage.total_tokens": info.total_tokens,
|
||||
"dify.node.total_price": info.total_price,
|
||||
"dify.node.currency": info.currency,
|
||||
"gen_ai.provider.name": info.model_provider,
|
||||
"gen_ai.request.model": info.model_name,
|
||||
"gen_ai.usage.input_tokens": info.prompt_tokens,
|
||||
"gen_ai.usage.output_tokens": info.completion_tokens,
|
||||
"gen_ai.tool.name": info.tool_name,
|
||||
"dify.node.iteration_id": info.iteration_id,
|
||||
"dify.node.iteration_index": info.iteration_index,
|
||||
"dify.node.loop_id": info.loop_id,
|
||||
"dify.node.loop_index": info.loop_index,
|
||||
"dify.node.parallel_id": info.parallel_id,
|
||||
}
|
||||
|
||||
if self._exporter.include_content:
|
||||
attrs["dify.node.inputs"] = self._maybe_json(info.node_inputs)
|
||||
attrs["dify.node.outputs"] = self._maybe_json(info.node_outputs)
|
||||
attrs["dify.node.process_data"] = self._maybe_json(info.process_data)
|
||||
|
||||
self._exporter.export_span(EnterpriseTelemetrySpan.NODE_EXECUTION, attrs)
|
||||
|
||||
labels = {
|
||||
"tenant_id": info.tenant_id,
|
||||
"app_id": info.metadata.get("app_id", ""),
|
||||
"node_type": info.node_type,
|
||||
"model_provider": info.model_provider or "",
|
||||
}
|
||||
if info.total_tokens:
|
||||
token_labels = {**labels, "model_name": info.model_name or ""}
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, token_labels)
|
||||
self._exporter.increment_counter(
|
||||
EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "node", "status": info.status}
|
||||
)
|
||||
self._exporter.record_histogram(EnterpriseTelemetryHistogram.NODE_DURATION, info.elapsed_time, labels)
|
||||
|
||||
if info.error:
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "node"})
|
||||
|
||||
def _tool_trace(self, info: ToolTraceInfo) -> None:
|
||||
attrs = self._common_attrs(info)
|
||||
attrs.update(
|
||||
{
|
||||
"gen_ai.tool.name": info.tool_name,
|
||||
"dify.tool.time_cost": info.time_cost,
|
||||
"dify.tool.error": info.error,
|
||||
}
|
||||
)
|
||||
|
||||
if self._exporter.include_content:
|
||||
attrs["dify.tool.inputs"] = self._maybe_json(info.tool_inputs)
|
||||
attrs["dify.tool.outputs"] = info.tool_outputs
|
||||
attrs["dify.tool.parameters"] = self._maybe_json(info.tool_parameters)
|
||||
attrs["dify.tool.config"] = self._maybe_json(info.tool_config)
|
||||
|
||||
self._exporter.export_span(EnterpriseTelemetrySpan.TOOL_EXECUTION, attrs)
|
||||
|
||||
labels = {
|
||||
"tenant_id": info.metadata.get("tenant_id", ""),
|
||||
"app_id": info.metadata.get("app_id", ""),
|
||||
"tool_name": info.tool_name,
|
||||
}
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "tool"})
|
||||
self._exporter.record_histogram(EnterpriseTelemetryHistogram.TOOL_DURATION, float(info.time_cost), labels)
|
||||
|
||||
if info.error:
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "tool"})
|
||||
|
||||
def _moderation_trace(self, info: ModerationTraceInfo) -> None:
|
||||
attrs = self._common_attrs(info)
|
||||
attrs.update(
|
||||
{
|
||||
"dify.moderation.flagged": info.flagged,
|
||||
"dify.moderation.action": info.action,
|
||||
"dify.moderation.preset_response": info.preset_response,
|
||||
}
|
||||
)
|
||||
|
||||
if self._exporter.include_content:
|
||||
attrs["dify.moderation.query"] = info.query
|
||||
|
||||
self._exporter.export_span(EnterpriseTelemetrySpan.MODERATION_CHECK, attrs)
|
||||
|
||||
labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")}
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "moderation"})
|
||||
|
||||
def _suggested_question_trace(self, info: SuggestedQuestionTraceInfo) -> None:
|
||||
attrs = self._common_attrs(info)
|
||||
attrs.update(
|
||||
{
|
||||
"gen_ai.usage.total_tokens": info.total_tokens,
|
||||
"dify.suggested_question.status": info.status,
|
||||
"dify.suggested_question.error": info.error,
|
||||
"gen_ai.provider.name": info.model_provider,
|
||||
"gen_ai.request.model": info.model_id,
|
||||
"dify.suggested_question.count": len(info.suggested_question),
|
||||
}
|
||||
)
|
||||
|
||||
if self._exporter.include_content:
|
||||
attrs["dify.suggested_question.questions"] = self._maybe_json(info.suggested_question)
|
||||
|
||||
self._exporter.export_span(EnterpriseTelemetrySpan.SUGGESTED_QUESTION_GENERATION, attrs)
|
||||
|
||||
labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")}
|
||||
self._exporter.increment_counter(
|
||||
EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "suggested_question"}
|
||||
)
|
||||
|
||||
def _dataset_retrieval_trace(self, info: DatasetRetrievalTraceInfo) -> None:
|
||||
attrs = self._common_attrs(info)
|
||||
attrs["dify.dataset.error"] = info.error
|
||||
|
||||
if self._exporter.include_content:
|
||||
attrs["dify.dataset.documents"] = self._maybe_json(info.documents)
|
||||
|
||||
self._exporter.export_span(EnterpriseTelemetrySpan.DATASET_RETRIEVAL, attrs)
|
||||
|
||||
labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")}
|
||||
self._exporter.increment_counter(
|
||||
EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "dataset_retrieval"}
|
||||
)
|
||||
|
||||
def _generate_name_trace(self, info: GenerateNameTraceInfo) -> None:
|
||||
attrs = self._common_attrs(info)
|
||||
attrs["dify.conversation.id"] = info.conversation_id
|
||||
|
||||
if self._exporter.include_content:
|
||||
attrs["dify.generate_name.inputs"] = self._maybe_json(info.inputs)
|
||||
attrs["dify.generate_name.outputs"] = self._maybe_json(info.outputs)
|
||||
|
||||
self._exporter.export_span(EnterpriseTelemetrySpan.GENERATE_NAME_EXECUTION, attrs)
|
||||
|
||||
labels = {"tenant_id": info.tenant_id, "app_id": info.metadata.get("app_id", "")}
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "generate_name"})
|
||||
38
api/enterprise/telemetry/entities/__init__.py
Normal file
38
api/enterprise/telemetry/entities/__init__.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from enum import StrEnum
|
||||
|
||||
|
||||
class EnterpriseTelemetrySpan(StrEnum):
|
||||
WORKFLOW_RUN = "dify.workflow.run"
|
||||
NODE_EXECUTION = "dify.node.execution"
|
||||
MESSAGE_RUN = "dify.message.run"
|
||||
TOOL_EXECUTION = "dify.tool.execution"
|
||||
DATASET_RETRIEVAL = "dify.dataset.retrieval"
|
||||
MODERATION_CHECK = "dify.moderation.check"
|
||||
SUGGESTED_QUESTION_GENERATION = "dify.suggested_question.generation"
|
||||
GENERATE_NAME_EXECUTION = "dify.generate_name.execution"
|
||||
APP_CREATED = "dify.app.created"
|
||||
APP_DELETED = "dify.app.deleted"
|
||||
APP_UPDATED = "dify.app.updated"
|
||||
FEEDBACK_CREATED = "dify.feedback.created"
|
||||
|
||||
|
||||
class EnterpriseTelemetryCounter(StrEnum):
|
||||
TOKENS = "tokens"
|
||||
REQUESTS = "requests"
|
||||
ERRORS = "errors"
|
||||
FEEDBACK = "feedback"
|
||||
|
||||
|
||||
class EnterpriseTelemetryHistogram(StrEnum):
|
||||
WORKFLOW_DURATION = "workflow_duration"
|
||||
NODE_DURATION = "node_duration"
|
||||
MESSAGE_DURATION = "message_duration"
|
||||
MESSAGE_TTFT = "message_ttft"
|
||||
TOOL_DURATION = "tool_duration"
|
||||
|
||||
|
||||
__all__ = [
|
||||
"EnterpriseTelemetryCounter",
|
||||
"EnterpriseTelemetryHistogram",
|
||||
"EnterpriseTelemetrySpan",
|
||||
]
|
||||
118
api/enterprise/telemetry/event_handlers.py
Normal file
118
api/enterprise/telemetry/event_handlers.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""Blinker signal handlers for enterprise telemetry.
|
||||
|
||||
Registered at import time via ``@signal.connect`` decorators.
|
||||
Import must happen during ``ext_enterprise_telemetry.init_app()`` to ensure handlers fire.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from enterprise.telemetry.entities import EnterpriseTelemetryCounter, EnterpriseTelemetrySpan
|
||||
from events.app_event import app_was_created, app_was_deleted, app_was_updated
|
||||
from events.feedback_event import feedback_was_created
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Export handlers to mark them as intentionally public (accessed via Blinker decorators)
|
||||
__all__ = [
|
||||
"_handle_app_created",
|
||||
"_handle_app_deleted",
|
||||
"_handle_app_updated",
|
||||
"_handle_feedback_created",
|
||||
]
|
||||
|
||||
|
||||
@app_was_created.connect
|
||||
def _handle_app_created(sender: object, **kwargs: object) -> None:
|
||||
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||
|
||||
exporter = get_enterprise_exporter()
|
||||
if not exporter:
|
||||
return
|
||||
exporter.export_span(
|
||||
EnterpriseTelemetrySpan.APP_CREATED,
|
||||
{
|
||||
"dify.app.id": getattr(sender, "id", None),
|
||||
"dify.tenant_id": getattr(sender, "tenant_id", None),
|
||||
"dify.app.mode": getattr(sender, "mode", None),
|
||||
},
|
||||
)
|
||||
exporter.increment_counter(
|
||||
EnterpriseTelemetryCounter.REQUESTS,
|
||||
1,
|
||||
{
|
||||
"type": "app.created",
|
||||
"tenant_id": getattr(sender, "tenant_id", ""),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@app_was_deleted.connect
|
||||
def _handle_app_deleted(sender: object, **kwargs: object) -> None:
|
||||
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||
|
||||
exporter = get_enterprise_exporter()
|
||||
if not exporter:
|
||||
return
|
||||
exporter.export_span(
|
||||
EnterpriseTelemetrySpan.APP_DELETED,
|
||||
{
|
||||
"dify.app.id": getattr(sender, "id", None),
|
||||
"dify.tenant_id": getattr(sender, "tenant_id", None),
|
||||
},
|
||||
)
|
||||
exporter.increment_counter(
|
||||
EnterpriseTelemetryCounter.REQUESTS,
|
||||
1,
|
||||
{
|
||||
"type": "app.deleted",
|
||||
"tenant_id": getattr(sender, "tenant_id", ""),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@app_was_updated.connect
|
||||
def _handle_app_updated(sender: object, **kwargs: object) -> None:
|
||||
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||
|
||||
exporter = get_enterprise_exporter()
|
||||
if not exporter:
|
||||
return
|
||||
exporter.export_span(
|
||||
EnterpriseTelemetrySpan.APP_UPDATED,
|
||||
{
|
||||
"dify.app.id": getattr(sender, "id", None),
|
||||
"dify.tenant_id": getattr(sender, "tenant_id", None),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@feedback_was_created.connect
|
||||
def _handle_feedback_created(sender: object, **kwargs: object) -> None:
|
||||
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||
|
||||
exporter = get_enterprise_exporter()
|
||||
if not exporter:
|
||||
return
|
||||
|
||||
include_content = exporter.include_content
|
||||
attrs: dict = {
|
||||
"dify.message.id": getattr(sender, "message_id", None),
|
||||
"dify.tenant_id": kwargs.get("tenant_id"),
|
||||
"dify.feedback.rating": getattr(sender, "rating", None),
|
||||
"dify.feedback.from_source": getattr(sender, "from_source", None),
|
||||
}
|
||||
if include_content:
|
||||
attrs["dify.feedback.content"] = getattr(sender, "content", None)
|
||||
|
||||
exporter.export_span(EnterpriseTelemetrySpan.FEEDBACK_CREATED, attrs)
|
||||
exporter.increment_counter(
|
||||
EnterpriseTelemetryCounter.FEEDBACK,
|
||||
1,
|
||||
{
|
||||
"tenant_id": str(kwargs.get("tenant_id", "")),
|
||||
"app_id": str(getattr(sender, "app_id", "")),
|
||||
"rating": str(getattr(sender, "rating", "")),
|
||||
},
|
||||
)
|
||||
106
api/enterprise/telemetry/exporter.py
Normal file
106
api/enterprise/telemetry/exporter.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""Enterprise OTEL exporter — shared by EnterpriseDataTrace, event handlers, and direct instrumentation.
|
||||
|
||||
Uses its own TracerProvider (configurable sampling, separate from ext_otel.py infrastructure)
|
||||
and the global MeterProvider (shared with ext_otel.py — both target the same collector).
|
||||
|
||||
Initialized once during Flask extension init (single-threaded via ext_enterprise_telemetry.py).
|
||||
Accessed via ``ext_enterprise_telemetry.get_enterprise_exporter()`` from any thread/process.
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from opentelemetry import metrics
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
|
||||
from opentelemetry.semconv.resource import ResourceAttributes
|
||||
|
||||
from configs import dify_config
|
||||
from enterprise.telemetry.entities import EnterpriseTelemetryCounter, EnterpriseTelemetryHistogram
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def is_enterprise_telemetry_enabled() -> bool:
|
||||
return bool(dify_config.ENTERPRISE_ENABLED and dify_config.ENTERPRISE_TELEMETRY_ENABLED)
|
||||
|
||||
|
||||
def _parse_otlp_headers(raw: str) -> dict[str, str]:
|
||||
"""Parse ``key=value,key2=value2`` into a dict."""
|
||||
if not raw:
|
||||
return {}
|
||||
headers: dict[str, str] = {}
|
||||
for pair in raw.split(","):
|
||||
if "=" not in pair:
|
||||
continue
|
||||
k, v = pair.split("=", 1)
|
||||
headers[k.strip()] = v.strip()
|
||||
return headers
|
||||
|
||||
|
||||
class EnterpriseExporter:
|
||||
"""Shared OTEL exporter for all enterprise telemetry.
|
||||
|
||||
``export_span`` creates standalone point-in-time spans (not distributed traces).
|
||||
``increment_counter`` / ``record_histogram`` emit OTEL metrics at 100% accuracy.
|
||||
"""
|
||||
|
||||
def __init__(self, config: object) -> None:
|
||||
endpoint: str = getattr(config, "ENTERPRISE_OTLP_ENDPOINT", "")
|
||||
headers_raw: str = getattr(config, "ENTERPRISE_OTLP_HEADERS", "")
|
||||
service_name: str = getattr(config, "ENTERPRISE_SERVICE_NAME", "dify")
|
||||
sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0)
|
||||
self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True)
|
||||
|
||||
resource = Resource(
|
||||
attributes={
|
||||
ResourceAttributes.SERVICE_NAME: service_name,
|
||||
}
|
||||
)
|
||||
sampler = ParentBasedTraceIdRatio(sampling_rate)
|
||||
self._tracer_provider = TracerProvider(resource=resource, sampler=sampler)
|
||||
|
||||
headers = _parse_otlp_headers(headers_raw)
|
||||
trace_endpoint = f"{endpoint}/v1/traces" if endpoint else ""
|
||||
self._tracer_provider.add_span_processor(
|
||||
BatchSpanProcessor(OTLPSpanExporter(endpoint=trace_endpoint, headers=headers))
|
||||
)
|
||||
self._tracer = self._tracer_provider.get_tracer("dify.enterprise")
|
||||
|
||||
meter = metrics.get_meter("dify.enterprise")
|
||||
self._counters = {
|
||||
EnterpriseTelemetryCounter.TOKENS: meter.create_counter("dify.tokens.total", unit="{token}"),
|
||||
EnterpriseTelemetryCounter.REQUESTS: meter.create_counter("dify.requests.total", unit="{request}"),
|
||||
EnterpriseTelemetryCounter.ERRORS: meter.create_counter("dify.errors.total", unit="{error}"),
|
||||
EnterpriseTelemetryCounter.FEEDBACK: meter.create_counter("dify.feedback.total", unit="{feedback}"),
|
||||
}
|
||||
self._histograms = {
|
||||
EnterpriseTelemetryHistogram.WORKFLOW_DURATION: meter.create_histogram("dify.workflow.duration", unit="s"),
|
||||
EnterpriseTelemetryHistogram.NODE_DURATION: meter.create_histogram("dify.node.duration", unit="s"),
|
||||
EnterpriseTelemetryHistogram.MESSAGE_DURATION: meter.create_histogram("dify.message.duration", unit="s"),
|
||||
EnterpriseTelemetryHistogram.MESSAGE_TTFT: meter.create_histogram(
|
||||
"dify.message.time_to_first_token", unit="s"
|
||||
),
|
||||
EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"),
|
||||
}
|
||||
|
||||
def export_span(self, name: str, attributes: dict) -> None:
|
||||
with self._tracer.start_as_current_span(name) as span:
|
||||
for key, value in attributes.items():
|
||||
if value is not None:
|
||||
span.set_attribute(key, value)
|
||||
|
||||
def increment_counter(self, name: EnterpriseTelemetryCounter, value: int, labels: dict) -> None:
|
||||
counter = self._counters.get(name)
|
||||
if counter:
|
||||
counter.add(value, labels)
|
||||
|
||||
def record_histogram(self, name: EnterpriseTelemetryHistogram, value: float, labels: dict) -> None:
|
||||
histogram = self._histograms.get(name)
|
||||
if histogram:
|
||||
histogram.record(value, labels)
|
||||
|
||||
def shutdown(self) -> None:
|
||||
self._tracer_provider.shutdown()
|
||||
@@ -3,6 +3,12 @@ from blinker import signal
|
||||
# sender: app
|
||||
app_was_created = signal("app-was-created")
|
||||
|
||||
# sender: app
|
||||
app_was_deleted = signal("app-was-deleted")
|
||||
|
||||
# sender: app
|
||||
app_was_updated = signal("app-was-updated")
|
||||
|
||||
# sender: app, kwargs: app_model_config
|
||||
app_model_config_was_updated = signal("app-model-config-was-updated")
|
||||
|
||||
|
||||
4
api/events/feedback_event.py
Normal file
4
api/events/feedback_event.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from blinker import signal
|
||||
|
||||
# sender: MessageFeedback, kwargs: tenant_id
|
||||
feedback_was_created = signal("feedback-was-created")
|
||||
48
api/extensions/ext_enterprise_telemetry.py
Normal file
48
api/extensions/ext_enterprise_telemetry.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Flask extension for enterprise telemetry lifecycle management.
|
||||
|
||||
Initializes the EnterpriseExporter singleton during ``create_app()`` (single-threaded),
|
||||
registers blinker event handlers, and hooks atexit for graceful shutdown.
|
||||
|
||||
Skipped entirely when ``ENTERPRISE_ENABLED`` and ``ENTERPRISE_TELEMETRY_ENABLED`` are false (``is_enabled()`` gate).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import atexit
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from configs import dify_config
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from dify_app import DifyApp
|
||||
from enterprise.telemetry.exporter import EnterpriseExporter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_exporter: EnterpriseExporter | None = None
|
||||
|
||||
|
||||
def is_enabled() -> bool:
|
||||
return bool(dify_config.ENTERPRISE_ENABLED and dify_config.ENTERPRISE_TELEMETRY_ENABLED)
|
||||
|
||||
|
||||
def init_app(app: DifyApp) -> None:
|
||||
global _exporter
|
||||
|
||||
if not is_enabled():
|
||||
return
|
||||
|
||||
from enterprise.telemetry.exporter import EnterpriseExporter
|
||||
|
||||
_exporter = EnterpriseExporter(dify_config)
|
||||
atexit.register(_exporter.shutdown)
|
||||
|
||||
# Import to trigger @signal.connect decorator registration
|
||||
import enterprise.telemetry.event_handlers # noqa: F401 # type: ignore[reportUnusedImport]
|
||||
|
||||
logger.info("Enterprise telemetry initialized")
|
||||
|
||||
|
||||
def get_enterprise_exporter() -> EnterpriseExporter | None:
|
||||
return _exporter
|
||||
@@ -14,7 +14,7 @@ from core.model_runtime.entities.model_entities import ModelPropertyKey, ModelTy
|
||||
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
|
||||
from core.tools.tool_manager import ToolManager
|
||||
from core.tools.utils.configuration import ToolParameterConfigurationManager
|
||||
from events.app_event import app_was_created
|
||||
from events.app_event import app_was_created, app_was_deleted
|
||||
from extensions.ext_database import db
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from libs.login import current_user
|
||||
@@ -340,6 +340,8 @@ class AppService:
|
||||
db.session.delete(app)
|
||||
db.session.commit()
|
||||
|
||||
app_was_deleted.send(app)
|
||||
|
||||
# clean up web app settings
|
||||
if FeatureService.get_system_features().webapp_auth.enabled:
|
||||
EnterpriseService.WebAppAuth.cleanup_webapp(app.id)
|
||||
|
||||
@@ -10,6 +10,7 @@ from core.model_runtime.entities.model_entities import ModelType
|
||||
from core.ops.entities.trace_entity import TraceTaskName
|
||||
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
|
||||
from core.ops.utils import measure_time
|
||||
from events.feedback_event import feedback_was_created
|
||||
from extensions.ext_database import db
|
||||
from libs.infinite_scroll_pagination import InfiniteScrollPagination
|
||||
from models import Account
|
||||
@@ -179,6 +180,9 @@ class MessageService:
|
||||
|
||||
db.session.commit()
|
||||
|
||||
if feedback and rating:
|
||||
feedback_was_created.send(feedback, tenant_id=app_model.tenant_id)
|
||||
|
||||
return feedback
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -39,12 +39,25 @@ def process_trace_tasks(file_info):
|
||||
trace_info["documents"] = [Document.model_validate(doc) for doc in trace_info["documents"]]
|
||||
|
||||
try:
|
||||
trace_type = trace_info_info_map.get(trace_info_type)
|
||||
if trace_type:
|
||||
trace_info = trace_type(**trace_info)
|
||||
|
||||
# process enterprise trace separately
|
||||
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
|
||||
|
||||
if is_enterprise_telemetry_enabled():
|
||||
from enterprise.telemetry.enterprise_trace import EnterpriseDataTrace
|
||||
|
||||
try:
|
||||
EnterpriseDataTrace().trace(trace_info)
|
||||
except Exception:
|
||||
logger.warning("Enterprise trace failed for app_id: %s", app_id, exc_info=True)
|
||||
|
||||
if trace_instance:
|
||||
with current_app.app_context():
|
||||
trace_type = trace_info_info_map.get(trace_info_type)
|
||||
if trace_type:
|
||||
trace_info = trace_type(**trace_info)
|
||||
trace_instance.trace(trace_info)
|
||||
|
||||
logger.info("Processing trace tasks success, app_id: %s", app_id)
|
||||
except Exception as e:
|
||||
logger.info("error:\n\n\n%s\n\n\n\n", e)
|
||||
|
||||
0
api/tests/integration_tests/enterprise/__init__.py
Normal file
0
api/tests/integration_tests/enterprise/__init__.py
Normal file
@@ -0,0 +1,141 @@
|
||||
"""Integration tests for enterprise telemetry end-to-end flow."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from core.ops.entities.trace_entity import TraceTaskName, WorkflowNodeTraceInfo, WorkflowTraceInfo
|
||||
from core.ops.ops_trace_manager import TraceTask
|
||||
|
||||
|
||||
class TestTraceTaskExecution:
|
||||
@patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter")
|
||||
def test_node_execution_trace_construction(self, mock_get_exporter):
|
||||
exporter = MagicMock()
|
||||
mock_get_exporter.return_value = exporter
|
||||
|
||||
node_data = {
|
||||
"tenant_id": "tenant-1",
|
||||
"app_id": "app-1",
|
||||
"workflow_id": "wf-1",
|
||||
"workflow_execution_id": "run-1",
|
||||
"node_execution_id": "ne-1",
|
||||
"node_id": "node-1",
|
||||
"node_type": "llm",
|
||||
"title": "LLM Node",
|
||||
"status": "succeeded",
|
||||
"error": None,
|
||||
"elapsed_time": 0.5,
|
||||
"index": 1,
|
||||
"predecessor_node_id": None,
|
||||
"total_tokens": 100,
|
||||
"total_price": 0.002,
|
||||
"currency": "USD",
|
||||
"model_provider": "openai",
|
||||
"model_name": "gpt-4",
|
||||
"prompt_tokens": 60,
|
||||
"completion_tokens": 40,
|
||||
"tool_name": None,
|
||||
"iteration_id": None,
|
||||
"iteration_index": None,
|
||||
"loop_id": None,
|
||||
"loop_index": None,
|
||||
"parallel_id": None,
|
||||
"node_inputs": {"prompt": "test"},
|
||||
"node_outputs": {"response": "result"},
|
||||
"process_data": {"usage": {"total": 100}},
|
||||
}
|
||||
|
||||
trace_task = TraceTask(
|
||||
trace_type=TraceTaskName.NODE_EXECUTION_TRACE,
|
||||
node_execution_data=node_data,
|
||||
)
|
||||
|
||||
trace_info = trace_task.execute()
|
||||
|
||||
assert isinstance(trace_info, WorkflowNodeTraceInfo)
|
||||
assert trace_info.node_id == "node-1"
|
||||
assert trace_info.node_type == "llm"
|
||||
assert trace_info.total_tokens == 100
|
||||
assert trace_info.model_provider == "openai"
|
||||
|
||||
|
||||
class TestHierarchyPropagation:
|
||||
@patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter")
|
||||
def test_parent_context_included_in_workflow_trace(self, mock_get_exporter):
|
||||
exporter = MagicMock()
|
||||
mock_get_exporter.return_value = exporter
|
||||
|
||||
parent_context = {
|
||||
"trace_id": "parent-trace-1",
|
||||
"parent_node_execution_id": "parent-ne-1",
|
||||
"parent_workflow_run_id": "parent-run-1",
|
||||
"parent_app_id": "parent-app-1",
|
||||
}
|
||||
|
||||
workflow_trace_info = WorkflowTraceInfo(
|
||||
trace_id="child-trace-1",
|
||||
workflow_id="child-wf-1",
|
||||
workflow_run_id="child-run-1",
|
||||
tenant_id="tenant-1",
|
||||
workflow_run_elapsed_time=1.0,
|
||||
workflow_run_status="succeeded",
|
||||
workflow_run_inputs={},
|
||||
workflow_run_outputs={},
|
||||
workflow_run_version="1.0",
|
||||
total_tokens=50,
|
||||
file_list=[],
|
||||
query="",
|
||||
metadata={
|
||||
"tenant_id": "tenant-1",
|
||||
"app_id": "child-app-1",
|
||||
"parent_trace_context": parent_context,
|
||||
},
|
||||
)
|
||||
|
||||
assert workflow_trace_info.metadata["parent_trace_context"] == parent_context
|
||||
assert workflow_trace_info.metadata["parent_trace_context"]["trace_id"] == "parent-trace-1"
|
||||
|
||||
@patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter")
|
||||
def test_enterprise_trace_emits_parent_attributes(self, mock_get_exporter):
|
||||
from enterprise.telemetry.enterprise_trace import EnterpriseDataTrace
|
||||
|
||||
exporter = MagicMock()
|
||||
exporter.include_content = True
|
||||
mock_get_exporter.return_value = exporter
|
||||
|
||||
parent_context = {
|
||||
"trace_id": "parent-trace-2",
|
||||
"parent_node_execution_id": "parent-ne-2",
|
||||
"parent_workflow_run_id": "parent-run-2",
|
||||
"parent_app_id": "parent-app-2",
|
||||
}
|
||||
|
||||
workflow_trace_info = WorkflowTraceInfo(
|
||||
trace_id="child-trace-2",
|
||||
workflow_id="child-wf-2",
|
||||
workflow_run_id="child-run-2",
|
||||
tenant_id="tenant-2",
|
||||
workflow_run_elapsed_time=0.8,
|
||||
workflow_run_status="succeeded",
|
||||
workflow_run_inputs={},
|
||||
workflow_run_outputs={},
|
||||
workflow_run_version="1.0",
|
||||
total_tokens=30,
|
||||
file_list=[],
|
||||
query="",
|
||||
metadata={
|
||||
"tenant_id": "tenant-2",
|
||||
"app_id": "child-app-2",
|
||||
"parent_trace_context": parent_context,
|
||||
},
|
||||
)
|
||||
|
||||
trace = EnterpriseDataTrace()
|
||||
trace.trace(workflow_trace_info)
|
||||
|
||||
exporter.export_span.assert_called_once()
|
||||
_, span_attrs = exporter.export_span.call_args[0]
|
||||
|
||||
assert span_attrs["dify.parent.trace_id"] == "parent-trace-2"
|
||||
assert span_attrs["dify.parent.node.execution_id"] == "parent-ne-2"
|
||||
assert span_attrs["dify.parent.workflow.run_id"] == "parent-run-2"
|
||||
assert span_attrs["dify.parent.app.id"] == "parent-app-2"
|
||||
@@ -0,0 +1,63 @@
|
||||
"""Tests for enterprise telemetry configuration fields."""
|
||||
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
from configs.app_config import DifyConfig
|
||||
|
||||
|
||||
def test_enterprise_telemetry_config_defaults(monkeypatch: pytest.MonkeyPatch):
|
||||
"""Enterprise telemetry fields have correct defaults when not set."""
|
||||
os.environ.clear()
|
||||
|
||||
monkeypatch.setenv("DB_TYPE", "postgresql")
|
||||
monkeypatch.setenv("DB_USERNAME", "postgres")
|
||||
monkeypatch.setenv("DB_PASSWORD", "postgres")
|
||||
monkeypatch.setenv("DB_HOST", "localhost")
|
||||
monkeypatch.setenv("DB_PORT", "5432")
|
||||
monkeypatch.setenv("DB_DATABASE", "dify")
|
||||
|
||||
config = DifyConfig()
|
||||
|
||||
# Existing enterprise field
|
||||
assert config.ENTERPRISE_ENABLED is False
|
||||
|
||||
# New telemetry fields — all off / empty by default
|
||||
assert config.ENTERPRISE_TELEMETRY_ENABLED is False
|
||||
assert config.ENTERPRISE_OTLP_ENDPOINT == ""
|
||||
assert config.ENTERPRISE_OTLP_HEADERS == ""
|
||||
assert config.ENTERPRISE_INCLUDE_CONTENT is True
|
||||
assert config.ENTERPRISE_SERVICE_NAME == "dify"
|
||||
assert config.ENTERPRISE_OTEL_SAMPLING_RATE == 1.0
|
||||
|
||||
|
||||
def test_enterprise_telemetry_config_from_env(monkeypatch: pytest.MonkeyPatch):
|
||||
"""Enterprise telemetry fields are populated from environment variables."""
|
||||
os.environ.clear()
|
||||
|
||||
monkeypatch.setenv("DB_TYPE", "postgresql")
|
||||
monkeypatch.setenv("DB_USERNAME", "postgres")
|
||||
monkeypatch.setenv("DB_PASSWORD", "postgres")
|
||||
monkeypatch.setenv("DB_HOST", "localhost")
|
||||
monkeypatch.setenv("DB_PORT", "5432")
|
||||
monkeypatch.setenv("DB_DATABASE", "dify")
|
||||
|
||||
# Set enterprise telemetry env vars
|
||||
monkeypatch.setenv("ENTERPRISE_ENABLED", "true")
|
||||
monkeypatch.setenv("ENTERPRISE_TELEMETRY_ENABLED", "true")
|
||||
monkeypatch.setenv("ENTERPRISE_OTLP_ENDPOINT", "https://otel.example.com:4318")
|
||||
monkeypatch.setenv("ENTERPRISE_OTLP_HEADERS", "Authorization=Bearer tok,X-Org=acme")
|
||||
monkeypatch.setenv("ENTERPRISE_INCLUDE_CONTENT", "false")
|
||||
monkeypatch.setenv("ENTERPRISE_SERVICE_NAME", "dify-prod")
|
||||
monkeypatch.setenv("ENTERPRISE_OTEL_SAMPLING_RATE", "0.5")
|
||||
|
||||
config = DifyConfig()
|
||||
|
||||
assert config.ENTERPRISE_ENABLED is True
|
||||
assert config.ENTERPRISE_TELEMETRY_ENABLED is True
|
||||
assert config.ENTERPRISE_OTLP_ENDPOINT == "https://otel.example.com:4318"
|
||||
assert config.ENTERPRISE_OTLP_HEADERS == "Authorization=Bearer tok,X-Org=acme"
|
||||
assert config.ENTERPRISE_INCLUDE_CONTENT is False
|
||||
assert config.ENTERPRISE_SERVICE_NAME == "dify-prod"
|
||||
assert config.ENTERPRISE_OTEL_SAMPLING_RATE == 0.5
|
||||
0
api/tests/unit_tests/enterprise/__init__.py
Normal file
0
api/tests/unit_tests/enterprise/__init__.py
Normal file
@@ -0,0 +1,151 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from core.ops.entities.trace_entity import WorkflowNodeTraceInfo, WorkflowTraceInfo
|
||||
from enterprise.telemetry.enterprise_trace import EnterpriseDataTrace
|
||||
|
||||
|
||||
def _make_exporter_mock(include_content: bool = True) -> MagicMock:
|
||||
exporter = MagicMock()
|
||||
exporter.include_content = include_content
|
||||
return exporter
|
||||
|
||||
|
||||
class TestEnterpriseDataTrace:
|
||||
@patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter")
|
||||
def test_workflow_trace_emits_span_and_metrics(self, mock_get_exporter):
|
||||
exporter = _make_exporter_mock()
|
||||
mock_get_exporter.return_value = exporter
|
||||
|
||||
info = WorkflowTraceInfo(
|
||||
trace_id="trace-1",
|
||||
workflow_id="wf-1",
|
||||
workflow_run_id="run-1",
|
||||
tenant_id="tenant-1",
|
||||
workflow_run_elapsed_time=1.5,
|
||||
workflow_run_status="succeeded",
|
||||
workflow_run_inputs={"q": "hello"},
|
||||
workflow_run_outputs={"a": "world"},
|
||||
workflow_run_version="1.0",
|
||||
total_tokens=100,
|
||||
file_list=[],
|
||||
query="hello",
|
||||
error=None,
|
||||
metadata={"tenant_id": "tenant-1", "app_id": "app-1", "triggered_from": "api", "user_id": "u1"},
|
||||
)
|
||||
|
||||
trace = EnterpriseDataTrace()
|
||||
trace.trace(info)
|
||||
|
||||
exporter.export_span.assert_called_once()
|
||||
span_name, span_attrs = exporter.export_span.call_args[0]
|
||||
assert span_name == "dify.workflow.run"
|
||||
assert span_attrs["dify.workflow.id"] == "wf-1"
|
||||
assert span_attrs["dify.workflow.status"] == "succeeded"
|
||||
assert span_attrs["gen_ai.usage.total_tokens"] == 100
|
||||
|
||||
assert exporter.increment_counter.call_count >= 2
|
||||
assert exporter.record_histogram.call_count >= 1
|
||||
|
||||
@patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter")
|
||||
def test_workflow_trace_privacy_gating(self, mock_get_exporter):
|
||||
exporter = _make_exporter_mock(include_content=False)
|
||||
mock_get_exporter.return_value = exporter
|
||||
|
||||
info = WorkflowTraceInfo(
|
||||
trace_id="trace-2",
|
||||
workflow_id="wf-2",
|
||||
workflow_run_id="run-2",
|
||||
tenant_id="t-2",
|
||||
workflow_run_elapsed_time=0.5,
|
||||
workflow_run_status="succeeded",
|
||||
workflow_run_inputs={"secret": "data"},
|
||||
workflow_run_outputs={"answer": "hidden"},
|
||||
workflow_run_version="1.0",
|
||||
total_tokens=50,
|
||||
file_list=[],
|
||||
query="secret query",
|
||||
metadata={"tenant_id": "t-2", "app_id": "a-2"},
|
||||
)
|
||||
|
||||
trace = EnterpriseDataTrace()
|
||||
trace.trace(info)
|
||||
|
||||
_, span_attrs = exporter.export_span.call_args[0]
|
||||
assert "dify.workflow.inputs" not in span_attrs
|
||||
assert "dify.workflow.outputs" not in span_attrs
|
||||
assert "dify.workflow.query" not in span_attrs
|
||||
|
||||
@patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter")
|
||||
def test_node_execution_trace(self, mock_get_exporter):
|
||||
exporter = _make_exporter_mock()
|
||||
mock_get_exporter.return_value = exporter
|
||||
|
||||
info = WorkflowNodeTraceInfo(
|
||||
trace_id="trace-3",
|
||||
metadata={"app_id": "app-1"},
|
||||
workflow_id="wf-1",
|
||||
workflow_run_id="run-1",
|
||||
tenant_id="t-1",
|
||||
node_execution_id="ne-1",
|
||||
node_id="n-1",
|
||||
node_type="llm",
|
||||
title="LLM Node",
|
||||
status="succeeded",
|
||||
elapsed_time=0.3,
|
||||
index=1,
|
||||
total_tokens=50,
|
||||
model_provider="openai",
|
||||
model_name="gpt-4",
|
||||
prompt_tokens=30,
|
||||
completion_tokens=20,
|
||||
)
|
||||
|
||||
trace = EnterpriseDataTrace()
|
||||
trace.trace(info)
|
||||
|
||||
exporter.export_span.assert_called_once()
|
||||
span_name, span_attrs = exporter.export_span.call_args[0]
|
||||
assert span_name == "dify.node.execution"
|
||||
assert span_attrs["dify.node.type"] == "llm"
|
||||
assert span_attrs["gen_ai.provider.name"] == "openai"
|
||||
|
||||
@patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter")
|
||||
def test_workflow_trace_with_parent_context(self, mock_get_exporter):
|
||||
exporter = _make_exporter_mock()
|
||||
mock_get_exporter.return_value = exporter
|
||||
|
||||
parent_ctx = {
|
||||
"trace_id": "parent-trace",
|
||||
"parent_node_execution_id": "parent-ne",
|
||||
"parent_workflow_run_id": "parent-run",
|
||||
"parent_app_id": "parent-app",
|
||||
}
|
||||
|
||||
info = WorkflowTraceInfo(
|
||||
trace_id="child-trace",
|
||||
workflow_id="child-wf",
|
||||
workflow_run_id="child-run",
|
||||
tenant_id="t-1",
|
||||
workflow_run_elapsed_time=0.5,
|
||||
workflow_run_status="succeeded",
|
||||
workflow_run_inputs={},
|
||||
workflow_run_outputs={},
|
||||
workflow_run_version="1.0",
|
||||
total_tokens=10,
|
||||
file_list=[],
|
||||
query="",
|
||||
metadata={
|
||||
"tenant_id": "t-1",
|
||||
"app_id": "child-app",
|
||||
"parent_trace_context": parent_ctx,
|
||||
},
|
||||
)
|
||||
|
||||
trace = EnterpriseDataTrace()
|
||||
trace.trace(info)
|
||||
|
||||
_, span_attrs = exporter.export_span.call_args[0]
|
||||
assert span_attrs["dify.parent.trace_id"] == "parent-trace"
|
||||
assert span_attrs["dify.parent.node.execution_id"] == "parent-ne"
|
||||
assert span_attrs["dify.parent.workflow.run_id"] == "parent-run"
|
||||
assert span_attrs["dify.parent.app.id"] == "parent-app"
|
||||
110
api/tests/unit_tests/enterprise/telemetry/test_event_handlers.py
Normal file
110
api/tests/unit_tests/enterprise/telemetry/test_event_handlers.py
Normal file
@@ -0,0 +1,110 @@
|
||||
"""Tests for enterprise telemetry event handlers."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from events.app_event import app_was_created, app_was_deleted, app_was_updated
|
||||
from events.feedback_event import feedback_was_created
|
||||
|
||||
|
||||
class TestAppEventHandlers:
|
||||
@patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter")
|
||||
def test_app_created_handler_emits_span_and_counter(self, mock_get_exporter):
|
||||
exporter = MagicMock()
|
||||
mock_get_exporter.return_value = exporter
|
||||
|
||||
app = MagicMock(id="app-1", tenant_id="tenant-1", mode="chat")
|
||||
app_was_created.send(app)
|
||||
|
||||
exporter.export_span.assert_called_once()
|
||||
span_name, span_attrs = exporter.export_span.call_args[0]
|
||||
assert span_name == "dify.app.created"
|
||||
assert span_attrs["dify.app.id"] == "app-1"
|
||||
assert span_attrs["dify.tenant_id"] == "tenant-1"
|
||||
assert span_attrs["dify.app.mode"] == "chat"
|
||||
|
||||
exporter.increment_counter.assert_called_once()
|
||||
counter_name, value, labels = exporter.increment_counter.call_args[0]
|
||||
assert counter_name == "requests"
|
||||
assert value == 1
|
||||
assert labels["type"] == "app.created"
|
||||
|
||||
@patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter")
|
||||
def test_app_deleted_handler_emits_span_and_counter(self, mock_get_exporter):
|
||||
exporter = MagicMock()
|
||||
mock_get_exporter.return_value = exporter
|
||||
|
||||
app = MagicMock(id="app-2", tenant_id="tenant-2")
|
||||
app_was_deleted.send(app)
|
||||
|
||||
exporter.export_span.assert_called_once()
|
||||
span_name, span_attrs = exporter.export_span.call_args[0]
|
||||
assert span_name == "dify.app.deleted"
|
||||
assert span_attrs["dify.app.id"] == "app-2"
|
||||
|
||||
exporter.increment_counter.assert_called_once()
|
||||
|
||||
@patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter")
|
||||
def test_app_updated_handler_emits_span(self, mock_get_exporter):
|
||||
exporter = MagicMock()
|
||||
mock_get_exporter.return_value = exporter
|
||||
|
||||
app = MagicMock(id="app-3", tenant_id="tenant-3")
|
||||
app_was_updated.send(app)
|
||||
|
||||
exporter.export_span.assert_called_once()
|
||||
span_name, span_attrs = exporter.export_span.call_args[0]
|
||||
assert span_name == "dify.app.updated"
|
||||
|
||||
@patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter")
|
||||
def test_handler_no_op_when_exporter_none(self, mock_get_exporter):
|
||||
mock_get_exporter.return_value = None
|
||||
|
||||
app = MagicMock(id="app-4", tenant_id="tenant-4", mode="workflow")
|
||||
app_was_created.send(app)
|
||||
|
||||
|
||||
class TestFeedbackEventHandler:
|
||||
@patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter")
|
||||
def test_feedback_created_handler_with_content(self, mock_get_exporter):
|
||||
exporter = MagicMock()
|
||||
exporter.include_content = True
|
||||
mock_get_exporter.return_value = exporter
|
||||
|
||||
feedback = MagicMock(
|
||||
message_id="msg-1",
|
||||
rating="like",
|
||||
from_source="web",
|
||||
content="Great response!",
|
||||
app_id="app-1",
|
||||
)
|
||||
feedback_was_created.send(feedback, tenant_id="tenant-1")
|
||||
|
||||
exporter.export_span.assert_called_once()
|
||||
_, span_attrs = exporter.export_span.call_args[0]
|
||||
assert span_attrs["dify.message.id"] == "msg-1"
|
||||
assert span_attrs["dify.feedback.rating"] == "like"
|
||||
assert span_attrs["dify.feedback.content"] == "Great response!"
|
||||
|
||||
exporter.increment_counter.assert_called_once()
|
||||
counter_name, value, labels = exporter.increment_counter.call_args[0]
|
||||
assert counter_name == "feedback"
|
||||
assert labels["rating"] == "like"
|
||||
|
||||
@patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter")
|
||||
def test_feedback_created_handler_privacy_gating(self, mock_get_exporter):
|
||||
exporter = MagicMock()
|
||||
exporter.include_content = False
|
||||
mock_get_exporter.return_value = exporter
|
||||
|
||||
feedback = MagicMock(
|
||||
message_id="msg-2",
|
||||
rating="dislike",
|
||||
from_source="api",
|
||||
content="Sensitive feedback",
|
||||
app_id="app-2",
|
||||
)
|
||||
feedback_was_created.send(feedback, tenant_id="tenant-2")
|
||||
|
||||
exporter.export_span.assert_called_once()
|
||||
_, span_attrs = exporter.export_span.call_args[0]
|
||||
assert "dify.feedback.content" not in span_attrs
|
||||
105
api/tests/unit_tests/enterprise/telemetry/test_exporter.py
Normal file
105
api/tests/unit_tests/enterprise/telemetry/test_exporter.py
Normal file
@@ -0,0 +1,105 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from enterprise.telemetry.exporter import EnterpriseExporter, _parse_otlp_headers, is_enterprise_telemetry_enabled
|
||||
|
||||
|
||||
class _FakeConfig:
|
||||
ENTERPRISE_OTLP_ENDPOINT = "http://localhost:4318"
|
||||
ENTERPRISE_OTLP_HEADERS = "Authorization=Bearer tok,X-Org=acme"
|
||||
ENTERPRISE_SERVICE_NAME = "dify-test"
|
||||
ENTERPRISE_OTEL_SAMPLING_RATE = 1.0
|
||||
ENTERPRISE_INCLUDE_CONTENT = True
|
||||
|
||||
|
||||
class TestParseOtlpHeaders:
|
||||
def test_empty_string(self):
|
||||
assert _parse_otlp_headers("") == {}
|
||||
|
||||
def test_single_header(self):
|
||||
assert _parse_otlp_headers("Authorization=Bearer tok") == {"Authorization": "Bearer tok"}
|
||||
|
||||
def test_multiple_headers(self):
|
||||
result = _parse_otlp_headers("Authorization=Bearer tok,X-Org=acme")
|
||||
assert result == {"Authorization": "Bearer tok", "X-Org": "acme"}
|
||||
|
||||
def test_malformed_pair_skipped(self):
|
||||
result = _parse_otlp_headers("good=value,bad_no_equals")
|
||||
assert result == {"good": "value"}
|
||||
|
||||
|
||||
class TestIsEnterpriseTelemetryEnabled:
|
||||
def test_disabled_when_enterprise_off(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
"enterprise.telemetry.exporter.dify_config",
|
||||
MagicMock(ENTERPRISE_ENABLED=False, ENTERPRISE_TELEMETRY_ENABLED=True),
|
||||
)
|
||||
assert is_enterprise_telemetry_enabled() is False
|
||||
|
||||
def test_disabled_when_telemetry_off(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
"enterprise.telemetry.exporter.dify_config",
|
||||
MagicMock(ENTERPRISE_ENABLED=True, ENTERPRISE_TELEMETRY_ENABLED=False),
|
||||
)
|
||||
assert is_enterprise_telemetry_enabled() is False
|
||||
|
||||
def test_enabled_when_both_on(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
"enterprise.telemetry.exporter.dify_config",
|
||||
MagicMock(ENTERPRISE_ENABLED=True, ENTERPRISE_TELEMETRY_ENABLED=True),
|
||||
)
|
||||
assert is_enterprise_telemetry_enabled() is True
|
||||
|
||||
|
||||
class TestEnterpriseExporter:
|
||||
@patch("enterprise.telemetry.exporter.BatchSpanProcessor")
|
||||
@patch("enterprise.telemetry.exporter.OTLPSpanExporter")
|
||||
@patch("enterprise.telemetry.exporter.metrics")
|
||||
def test_init_creates_counters_and_histograms(self, mock_metrics, mock_otlp_exporter, mock_processor):
|
||||
mock_meter = MagicMock()
|
||||
mock_metrics.get_meter.return_value = mock_meter
|
||||
|
||||
exporter = EnterpriseExporter(_FakeConfig())
|
||||
|
||||
assert exporter.include_content is True
|
||||
mock_meter.create_counter.assert_any_call("dify.tokens.total", unit="{token}")
|
||||
mock_meter.create_counter.assert_any_call("dify.requests.total", unit="{request}")
|
||||
mock_meter.create_histogram.assert_any_call("dify.workflow.duration", unit="s")
|
||||
mock_meter.create_histogram.assert_any_call("dify.node.duration", unit="s")
|
||||
|
||||
@patch("enterprise.telemetry.exporter.BatchSpanProcessor")
|
||||
@patch("enterprise.telemetry.exporter.OTLPSpanExporter")
|
||||
@patch("enterprise.telemetry.exporter.metrics")
|
||||
def test_export_span_sets_attributes(self, mock_metrics, mock_otlp_exporter, mock_processor):
|
||||
mock_metrics.get_meter.return_value = MagicMock()
|
||||
exporter = EnterpriseExporter(_FakeConfig())
|
||||
|
||||
exporter.export_span("test.span", {"key1": "val1", "key2": None})
|
||||
|
||||
@patch("enterprise.telemetry.exporter.BatchSpanProcessor")
|
||||
@patch("enterprise.telemetry.exporter.OTLPSpanExporter")
|
||||
@patch("enterprise.telemetry.exporter.metrics")
|
||||
def test_increment_counter_unknown_name_is_noop(self, mock_metrics, mock_otlp_exporter, mock_processor):
|
||||
mock_metrics.get_meter.return_value = MagicMock()
|
||||
exporter = EnterpriseExporter(_FakeConfig())
|
||||
exporter.increment_counter("nonexistent_counter", 1, {})
|
||||
|
||||
@patch("enterprise.telemetry.exporter.BatchSpanProcessor")
|
||||
@patch("enterprise.telemetry.exporter.OTLPSpanExporter")
|
||||
@patch("enterprise.telemetry.exporter.metrics")
|
||||
def test_record_histogram_unknown_name_is_noop(self, mock_metrics, mock_otlp_exporter, mock_processor):
|
||||
mock_metrics.get_meter.return_value = MagicMock()
|
||||
exporter = EnterpriseExporter(_FakeConfig())
|
||||
exporter.record_histogram("nonexistent_histogram", 1.0, {})
|
||||
|
||||
@patch("enterprise.telemetry.exporter.BatchSpanProcessor")
|
||||
@patch("enterprise.telemetry.exporter.OTLPSpanExporter")
|
||||
@patch("enterprise.telemetry.exporter.metrics")
|
||||
def test_increment_known_counter(self, mock_metrics, mock_otlp_exporter, mock_processor):
|
||||
mock_counter = MagicMock()
|
||||
mock_meter = MagicMock()
|
||||
mock_meter.create_counter.return_value = mock_counter
|
||||
mock_metrics.get_meter.return_value = mock_meter
|
||||
|
||||
exporter = EnterpriseExporter(_FakeConfig())
|
||||
exporter.increment_counter("tokens", 100, {"tenant_id": "t1"})
|
||||
mock_counter.add.assert_called_with(100, {"tenant_id": "t1"})
|
||||
Reference in New Issue
Block a user