mirror of
https://github.com/langgenius/dify.git
synced 2026-03-01 12:55:13 +00:00
refactor(telemetry): move gateway to core as stateless module-level functions
Move routing table, emit(), and is_enterprise_telemetry_enabled() from enterprise/telemetry/gateway.py into core/telemetry/gateway.py so both CE and EE share one code path. The ce_eligible flag in CASE_ROUTING controls which events flow in CE — flipping it is the only change needed to enable an event in community edition. - Delete enterprise/telemetry/gateway.py (class-based singleton) - Create core/telemetry/gateway.py (stateless functions, no shared state) - Simplify core/telemetry/__init__.py to thin facade over gateway - Remove TelemetryGateway class and get_gateway() from ext_enterprise_telemetry - Single-source is_enterprise_telemetry_enabled in core.telemetry.gateway - Fix pre-existing test bugs (missing dify.event.id in metric handler tests) - Update all imports and mock paths across 7 test files
This commit is contained in:
@@ -989,6 +989,9 @@ class TraceQueueManager:
|
||||
self.user_id = user_id
|
||||
self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
|
||||
self.flask_app = current_app._get_current_object() # type: ignore
|
||||
from core.telemetry.gateway import is_enterprise_telemetry_enabled
|
||||
|
||||
self._enterprise_telemetry_enabled = is_enterprise_telemetry_enabled()
|
||||
if trace_manager_timer is None:
|
||||
self.start_timer()
|
||||
|
||||
|
||||
43
api/core/telemetry/__init__.py
Normal file
43
api/core/telemetry/__init__.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""Telemetry facade.
|
||||
|
||||
Thin public API for emitting telemetry events. All routing logic
|
||||
lives in ``core.telemetry.gateway`` which is shared by both CE and EE.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from core.ops.entities.trace_entity import TraceTaskName
|
||||
from core.telemetry.events import TelemetryContext, TelemetryEvent
|
||||
from core.telemetry.gateway import TRACE_TASK_TO_CASE
|
||||
from core.telemetry.gateway import emit as gateway_emit
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.ops.ops_trace_manager import TraceQueueManager
|
||||
|
||||
|
||||
def emit(event: TelemetryEvent, trace_manager: TraceQueueManager | None = None) -> None:
|
||||
"""Emit a telemetry event.
|
||||
|
||||
Translates the ``TelemetryEvent`` (keyed by ``TraceTaskName``) into a
|
||||
``TelemetryCase`` and delegates to ``core.telemetry.gateway.emit()``.
|
||||
"""
|
||||
case = TRACE_TASK_TO_CASE.get(event.name)
|
||||
if case is None:
|
||||
return
|
||||
|
||||
context: dict[str, object] = {
|
||||
"tenant_id": event.context.tenant_id,
|
||||
"user_id": event.context.user_id,
|
||||
"app_id": event.context.app_id,
|
||||
}
|
||||
gateway_emit(case, context, event.payload, trace_manager)
|
||||
|
||||
|
||||
__all__ = [
|
||||
"TelemetryContext",
|
||||
"TelemetryEvent",
|
||||
"TraceTaskName",
|
||||
"emit",
|
||||
]
|
||||
206
api/core/telemetry/gateway.py
Normal file
206
api/core/telemetry/gateway.py
Normal file
@@ -0,0 +1,206 @@
|
||||
"""Telemetry gateway — single routing layer for all editions.
|
||||
|
||||
Maps ``TelemetryCase`` → ``CaseRoute`` and dispatches events to either
|
||||
the CE/EE trace pipeline (``TraceQueueManager``) or the enterprise-only
|
||||
metric/log Celery queue.
|
||||
|
||||
This module lives in ``core/`` so both CE and EE share one routing table
|
||||
and one ``emit()`` entry point. No separate enterprise gateway module is
|
||||
needed — enterprise-specific dispatch (Celery task, payload offloading)
|
||||
is handled here behind lazy imports that no-op in CE.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from core.ops.entities.trace_entity import TraceTaskName
|
||||
from enterprise.telemetry.contracts import CaseRoute, SignalType, TelemetryCase, TelemetryEnvelope
|
||||
from extensions.ext_storage import storage
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.ops.ops_trace_manager import TraceQueueManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PAYLOAD_SIZE_THRESHOLD_BYTES = 1 * 1024 * 1024
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Routing table — authoritative mapping for all editions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
CASE_TO_TRACE_TASK: dict[TelemetryCase, TraceTaskName] = {
|
||||
TelemetryCase.WORKFLOW_RUN: TraceTaskName.WORKFLOW_TRACE,
|
||||
TelemetryCase.MESSAGE_RUN: TraceTaskName.MESSAGE_TRACE,
|
||||
TelemetryCase.NODE_EXECUTION: TraceTaskName.NODE_EXECUTION_TRACE,
|
||||
TelemetryCase.DRAFT_NODE_EXECUTION: TraceTaskName.DRAFT_NODE_EXECUTION_TRACE,
|
||||
TelemetryCase.PROMPT_GENERATION: TraceTaskName.PROMPT_GENERATION_TRACE,
|
||||
TelemetryCase.TOOL_EXECUTION: TraceTaskName.TOOL_TRACE,
|
||||
TelemetryCase.MODERATION_CHECK: TraceTaskName.MODERATION_TRACE,
|
||||
TelemetryCase.SUGGESTED_QUESTION: TraceTaskName.SUGGESTED_QUESTION_TRACE,
|
||||
TelemetryCase.DATASET_RETRIEVAL: TraceTaskName.DATASET_RETRIEVAL_TRACE,
|
||||
TelemetryCase.GENERATE_NAME: TraceTaskName.GENERATE_NAME_TRACE,
|
||||
}
|
||||
|
||||
TRACE_TASK_TO_CASE: dict[TraceTaskName, TelemetryCase] = {v: k for k, v in CASE_TO_TRACE_TASK.items()}
|
||||
|
||||
CASE_ROUTING: dict[TelemetryCase, CaseRoute] = {
|
||||
# TRACE — CE-eligible (flow in both CE and EE)
|
||||
TelemetryCase.WORKFLOW_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.MESSAGE_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.TOOL_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.MODERATION_CHECK: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.SUGGESTED_QUESTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.DATASET_RETRIEVAL: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.GENERATE_NAME: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
# TRACE — enterprise-only
|
||||
TelemetryCase.NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False),
|
||||
TelemetryCase.DRAFT_NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False),
|
||||
TelemetryCase.PROMPT_GENERATION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False),
|
||||
# METRIC_LOG — enterprise-only (signal-driven, not trace)
|
||||
TelemetryCase.APP_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.APP_UPDATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.APP_DELETED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.FEEDBACK_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def is_enterprise_telemetry_enabled() -> bool:
|
||||
try:
|
||||
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
|
||||
|
||||
return is_enterprise_telemetry_enabled()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _handle_payload_sizing(
|
||||
payload: dict[str, Any],
|
||||
tenant_id: str,
|
||||
event_id: str,
|
||||
) -> tuple[dict[str, Any], str | None]:
|
||||
"""Inline or offload payload based on size.
|
||||
|
||||
Returns ``(payload_for_envelope, storage_key | None)``. Payloads
|
||||
exceeding ``PAYLOAD_SIZE_THRESHOLD_BYTES`` are written to object
|
||||
storage and replaced with an empty dict in the envelope.
|
||||
"""
|
||||
try:
|
||||
payload_json = json.dumps(payload)
|
||||
payload_size = len(payload_json.encode("utf-8"))
|
||||
except (TypeError, ValueError):
|
||||
logger.warning("Failed to serialize payload for sizing: event_id=%s", event_id)
|
||||
return payload, None
|
||||
|
||||
if payload_size <= PAYLOAD_SIZE_THRESHOLD_BYTES:
|
||||
return payload, None
|
||||
|
||||
storage_key = f"telemetry/{tenant_id}/{event_id}.json"
|
||||
try:
|
||||
storage.save(storage_key, payload_json.encode("utf-8"))
|
||||
logger.debug("Stored large payload to storage: key=%s, size=%d", storage_key, payload_size)
|
||||
return {}, storage_key
|
||||
except Exception:
|
||||
logger.warning("Failed to store large payload, inlining instead: event_id=%s", event_id, exc_info=True)
|
||||
return payload, None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def emit(
|
||||
case: TelemetryCase,
|
||||
context: dict[str, Any],
|
||||
payload: dict[str, Any],
|
||||
trace_manager: TraceQueueManager | None = None,
|
||||
) -> None:
|
||||
"""Route a telemetry event to the correct pipeline.
|
||||
|
||||
TRACE events are enqueued into ``TraceQueueManager`` (works in both CE
|
||||
and EE). Enterprise-only traces are silently dropped when EE is
|
||||
disabled.
|
||||
|
||||
METRIC_LOG events are dispatched to the enterprise Celery queue;
|
||||
silently dropped when enterprise telemetry is unavailable.
|
||||
"""
|
||||
route = CASE_ROUTING.get(case)
|
||||
if route is None:
|
||||
logger.warning("Unknown telemetry case: %s, dropping event", case)
|
||||
return
|
||||
|
||||
if not route.ce_eligible and not is_enterprise_telemetry_enabled():
|
||||
logger.debug("Dropping EE-only event: case=%s (EE disabled)", case)
|
||||
return
|
||||
|
||||
if route.signal_type is SignalType.TRACE:
|
||||
_emit_trace(case, context, payload, trace_manager)
|
||||
else:
|
||||
_emit_metric_log(case, context, payload)
|
||||
|
||||
|
||||
def _emit_trace(
|
||||
case: TelemetryCase,
|
||||
context: dict[str, Any],
|
||||
payload: dict[str, Any],
|
||||
trace_manager: TraceQueueManager | None,
|
||||
) -> None:
|
||||
from core.ops.ops_trace_manager import TraceQueueManager as LocalTraceQueueManager
|
||||
from core.ops.ops_trace_manager import TraceTask
|
||||
|
||||
trace_task_name = CASE_TO_TRACE_TASK.get(case)
|
||||
if trace_task_name is None:
|
||||
logger.warning("No TraceTaskName mapping for case: %s", case)
|
||||
return
|
||||
|
||||
queue_manager = trace_manager or LocalTraceQueueManager(
|
||||
app_id=context.get("app_id"),
|
||||
user_id=context.get("user_id"),
|
||||
)
|
||||
queue_manager.add_trace_task(TraceTask(trace_task_name, **payload))
|
||||
logger.debug("Enqueued trace task: case=%s, app_id=%s", case, context.get("app_id"))
|
||||
|
||||
|
||||
def _emit_metric_log(
|
||||
case: TelemetryCase,
|
||||
context: dict[str, Any],
|
||||
payload: dict[str, Any],
|
||||
) -> None:
|
||||
"""Build envelope and dispatch to enterprise Celery queue.
|
||||
|
||||
No-ops when the enterprise telemetry task is not importable (CE mode).
|
||||
"""
|
||||
try:
|
||||
from tasks.enterprise_telemetry_task import process_enterprise_telemetry
|
||||
except ImportError:
|
||||
logger.debug("Enterprise metric/log dispatch unavailable, dropping: case=%s", case)
|
||||
return
|
||||
|
||||
tenant_id = context.get("tenant_id", "")
|
||||
event_id = str(uuid.uuid4())
|
||||
|
||||
payload_for_envelope, payload_ref = _handle_payload_sizing(payload, tenant_id, event_id)
|
||||
|
||||
envelope = TelemetryEnvelope(
|
||||
case=case,
|
||||
tenant_id=tenant_id,
|
||||
event_id=event_id,
|
||||
payload=payload_for_envelope,
|
||||
metadata={"payload_ref": payload_ref} if payload_ref else None,
|
||||
)
|
||||
|
||||
process_enterprise_telemetry.delay(envelope.model_dump_json())
|
||||
logger.debug(
|
||||
"Enqueued metric/log event: case=%s, tenant_id=%s, event_id=%s",
|
||||
case,
|
||||
tenant_id,
|
||||
event_id,
|
||||
)
|
||||
84
api/enterprise/telemetry/event_handlers.py
Normal file
84
api/enterprise/telemetry/event_handlers.py
Normal file
@@ -0,0 +1,84 @@
|
||||
"""Blinker signal handlers for enterprise telemetry.
|
||||
|
||||
Registered at import time via ``@signal.connect`` decorators.
|
||||
Import must happen during ``ext_enterprise_telemetry.init_app()`` to
|
||||
ensure handlers fire. Each handler delegates to ``core.telemetry.gateway``
|
||||
which handles routing, EE-gating, and dispatch.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from events.app_event import app_was_created, app_was_deleted, app_was_updated
|
||||
from events.feedback_event import feedback_was_created
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
__all__ = [
|
||||
"_handle_app_created",
|
||||
"_handle_app_deleted",
|
||||
"_handle_app_updated",
|
||||
"_handle_feedback_created",
|
||||
]
|
||||
|
||||
|
||||
@app_was_created.connect
|
||||
def _handle_app_created(sender: object, **kwargs: object) -> None:
|
||||
from core.telemetry.gateway import emit as gateway_emit
|
||||
from enterprise.telemetry.contracts import TelemetryCase
|
||||
|
||||
gateway_emit(
|
||||
case=TelemetryCase.APP_CREATED,
|
||||
context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")},
|
||||
payload={
|
||||
"app_id": getattr(sender, "id", None),
|
||||
"mode": getattr(sender, "mode", None),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@app_was_deleted.connect
|
||||
def _handle_app_deleted(sender: object, **kwargs: object) -> None:
|
||||
from core.telemetry.gateway import emit as gateway_emit
|
||||
from enterprise.telemetry.contracts import TelemetryCase
|
||||
|
||||
gateway_emit(
|
||||
case=TelemetryCase.APP_DELETED,
|
||||
context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")},
|
||||
payload={"app_id": getattr(sender, "id", None)},
|
||||
)
|
||||
|
||||
|
||||
@app_was_updated.connect
|
||||
def _handle_app_updated(sender: object, **kwargs: object) -> None:
|
||||
from core.telemetry.gateway import emit as gateway_emit
|
||||
from enterprise.telemetry.contracts import TelemetryCase
|
||||
|
||||
gateway_emit(
|
||||
case=TelemetryCase.APP_UPDATED,
|
||||
context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")},
|
||||
payload={"app_id": getattr(sender, "id", None)},
|
||||
)
|
||||
|
||||
|
||||
@feedback_was_created.connect
|
||||
def _handle_feedback_created(sender: object, **kwargs: object) -> None:
|
||||
from core.telemetry.gateway import emit as gateway_emit
|
||||
from enterprise.telemetry.contracts import TelemetryCase
|
||||
|
||||
tenant_id = str(kwargs.get("tenant_id", "") or "")
|
||||
gateway_emit(
|
||||
case=TelemetryCase.FEEDBACK_CREATED,
|
||||
context={"tenant_id": tenant_id},
|
||||
payload={
|
||||
"message_id": getattr(sender, "message_id", None),
|
||||
"app_id": getattr(sender, "app_id", None),
|
||||
"conversation_id": getattr(sender, "conversation_id", None),
|
||||
"from_end_user_id": getattr(sender, "from_end_user_id", None),
|
||||
"from_account_id": getattr(sender, "from_account_id", None),
|
||||
"rating": getattr(sender, "rating", None),
|
||||
"from_source": getattr(sender, "from_source", None),
|
||||
"content": getattr(sender, "content", None),
|
||||
},
|
||||
)
|
||||
50
api/extensions/ext_enterprise_telemetry.py
Normal file
50
api/extensions/ext_enterprise_telemetry.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""Flask extension for enterprise telemetry lifecycle management.
|
||||
|
||||
Initializes the EnterpriseExporter singleton during ``create_app()``
|
||||
(single-threaded), registers blinker event handlers, and hooks atexit
|
||||
for graceful shutdown.
|
||||
|
||||
Skipped entirely when ``ENTERPRISE_ENABLED`` and ``ENTERPRISE_TELEMETRY_ENABLED``
|
||||
are false (``is_enabled()`` gate).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import atexit
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from configs import dify_config
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from dify_app import DifyApp
|
||||
from enterprise.telemetry.exporter import EnterpriseExporter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_exporter: EnterpriseExporter | None = None
|
||||
|
||||
|
||||
def is_enabled() -> bool:
|
||||
return bool(dify_config.ENTERPRISE_ENABLED and dify_config.ENTERPRISE_TELEMETRY_ENABLED)
|
||||
|
||||
|
||||
def init_app(app: DifyApp) -> None:
|
||||
global _exporter
|
||||
|
||||
if not is_enabled():
|
||||
return
|
||||
|
||||
from enterprise.telemetry.exporter import EnterpriseExporter
|
||||
|
||||
_exporter = EnterpriseExporter(dify_config)
|
||||
atexit.register(_exporter.shutdown)
|
||||
|
||||
# Import to trigger @signal.connect decorator registration
|
||||
import enterprise.telemetry.event_handlers # noqa: F401 # type: ignore[reportUnusedImport]
|
||||
|
||||
logger.info("Enterprise telemetry initialized")
|
||||
|
||||
|
||||
def get_enterprise_exporter() -> EnterpriseExporter | None:
|
||||
return _exporter
|
||||
200
api/tests/unit_tests/core/ops/test_trace_queue_manager.py
Normal file
200
api/tests/unit_tests/core/ops/test_trace_queue_manager.py
Normal file
@@ -0,0 +1,200 @@
|
||||
"""Unit tests for TraceQueueManager telemetry guard.
|
||||
|
||||
This test suite verifies that TraceQueueManager correctly drops trace tasks
|
||||
when telemetry is disabled, proving Bug 1 from code review is a false positive.
|
||||
|
||||
The guard logic moved from persistence.py to TraceQueueManager.add_trace_task()
|
||||
at line 1282 of ops_trace_manager.py:
|
||||
if self._enterprise_telemetry_enabled or self.trace_instance:
|
||||
trace_task.app_id = self.app_id
|
||||
trace_manager_queue.put(trace_task)
|
||||
|
||||
Tasks are only enqueued if EITHER:
|
||||
- Enterprise telemetry is enabled (_enterprise_telemetry_enabled=True), OR
|
||||
- A third-party trace instance (Langfuse, etc.) is configured
|
||||
|
||||
When BOTH are false, tasks are silently dropped (correct behavior).
|
||||
"""
|
||||
|
||||
import queue
|
||||
import sys
|
||||
import types
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def trace_queue_manager_and_task(monkeypatch):
|
||||
"""Fixture to provide TraceQueueManager and TraceTask with delayed imports."""
|
||||
module_name = "core.ops.ops_trace_manager"
|
||||
if module_name not in sys.modules:
|
||||
ops_stub = types.ModuleType(module_name)
|
||||
|
||||
class StubTraceTask:
|
||||
def __init__(self, trace_type):
|
||||
self.trace_type = trace_type
|
||||
self.app_id = None
|
||||
|
||||
class StubTraceQueueManager:
|
||||
def __init__(self, app_id=None):
|
||||
self.app_id = app_id
|
||||
from core.telemetry.gateway import is_enterprise_telemetry_enabled
|
||||
|
||||
self._enterprise_telemetry_enabled = is_enterprise_telemetry_enabled()
|
||||
self.trace_instance = StubOpsTraceManager.get_ops_trace_instance(app_id)
|
||||
|
||||
def add_trace_task(self, trace_task):
|
||||
if self._enterprise_telemetry_enabled or self.trace_instance:
|
||||
trace_task.app_id = self.app_id
|
||||
from core.ops.ops_trace_manager import trace_manager_queue
|
||||
|
||||
trace_manager_queue.put(trace_task)
|
||||
|
||||
class StubOpsTraceManager:
|
||||
@staticmethod
|
||||
def get_ops_trace_instance(app_id):
|
||||
return None
|
||||
|
||||
ops_stub.TraceQueueManager = StubTraceQueueManager
|
||||
ops_stub.TraceTask = StubTraceTask
|
||||
ops_stub.OpsTraceManager = StubOpsTraceManager
|
||||
ops_stub.trace_manager_queue = MagicMock(spec=queue.Queue)
|
||||
monkeypatch.setitem(sys.modules, module_name, ops_stub)
|
||||
|
||||
from core.ops.entities.trace_entity import TraceTaskName
|
||||
|
||||
ops_module = __import__(module_name, fromlist=["TraceQueueManager", "TraceTask"])
|
||||
TraceQueueManager = ops_module.TraceQueueManager
|
||||
TraceTask = ops_module.TraceTask
|
||||
|
||||
return TraceQueueManager, TraceTask, TraceTaskName
|
||||
|
||||
|
||||
class TestTraceQueueManagerTelemetryGuard:
|
||||
"""Test TraceQueueManager's telemetry guard in add_trace_task()."""
|
||||
|
||||
def test_task_not_enqueued_when_telemetry_disabled_and_no_trace_instance(self, trace_queue_manager_and_task):
|
||||
"""Verify task is NOT enqueued when telemetry disabled and no trace instance.
|
||||
|
||||
This is the core guard: when _enterprise_telemetry_enabled=False AND
|
||||
trace_instance=None, the task should be silently dropped.
|
||||
"""
|
||||
TraceQueueManager, TraceTask, TraceTaskName = trace_queue_manager_and_task
|
||||
|
||||
mock_queue = MagicMock(spec=queue.Queue)
|
||||
|
||||
trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE)
|
||||
|
||||
with (
|
||||
patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False),
|
||||
patch("core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=None),
|
||||
patch("core.ops.ops_trace_manager.trace_manager_queue", mock_queue),
|
||||
):
|
||||
manager = TraceQueueManager(app_id="test-app-id")
|
||||
manager.add_trace_task(trace_task)
|
||||
|
||||
mock_queue.put.assert_not_called()
|
||||
|
||||
def test_task_enqueued_when_telemetry_enabled(self, trace_queue_manager_and_task):
|
||||
"""Verify task IS enqueued when enterprise telemetry is enabled.
|
||||
|
||||
When _enterprise_telemetry_enabled=True, the task should be enqueued
|
||||
regardless of trace_instance state.
|
||||
"""
|
||||
TraceQueueManager, TraceTask, TraceTaskName = trace_queue_manager_and_task
|
||||
|
||||
mock_queue = MagicMock(spec=queue.Queue)
|
||||
|
||||
trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE)
|
||||
|
||||
with (
|
||||
patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True),
|
||||
patch("core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=None),
|
||||
patch("core.ops.ops_trace_manager.trace_manager_queue", mock_queue),
|
||||
):
|
||||
manager = TraceQueueManager(app_id="test-app-id")
|
||||
manager.add_trace_task(trace_task)
|
||||
|
||||
mock_queue.put.assert_called_once()
|
||||
called_task = mock_queue.put.call_args[0][0]
|
||||
assert called_task.app_id == "test-app-id"
|
||||
|
||||
def test_task_enqueued_when_trace_instance_configured(self, trace_queue_manager_and_task):
|
||||
"""Verify task IS enqueued when third-party trace instance is configured.
|
||||
|
||||
When trace_instance is not None (e.g., Langfuse configured), the task
|
||||
should be enqueued even if enterprise telemetry is disabled.
|
||||
"""
|
||||
TraceQueueManager, TraceTask, TraceTaskName = trace_queue_manager_and_task
|
||||
|
||||
mock_queue = MagicMock(spec=queue.Queue)
|
||||
|
||||
mock_trace_instance = MagicMock()
|
||||
|
||||
trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE)
|
||||
|
||||
with (
|
||||
patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False),
|
||||
patch(
|
||||
"core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=mock_trace_instance
|
||||
),
|
||||
patch("core.ops.ops_trace_manager.trace_manager_queue", mock_queue),
|
||||
):
|
||||
manager = TraceQueueManager(app_id="test-app-id")
|
||||
manager.add_trace_task(trace_task)
|
||||
|
||||
mock_queue.put.assert_called_once()
|
||||
called_task = mock_queue.put.call_args[0][0]
|
||||
assert called_task.app_id == "test-app-id"
|
||||
|
||||
def test_task_enqueued_when_both_telemetry_and_trace_instance_enabled(self, trace_queue_manager_and_task):
|
||||
"""Verify task IS enqueued when both telemetry and trace instance are enabled.
|
||||
|
||||
When both _enterprise_telemetry_enabled=True AND trace_instance is set,
|
||||
the task should definitely be enqueued.
|
||||
"""
|
||||
TraceQueueManager, TraceTask, TraceTaskName = trace_queue_manager_and_task
|
||||
|
||||
mock_queue = MagicMock(spec=queue.Queue)
|
||||
|
||||
mock_trace_instance = MagicMock()
|
||||
|
||||
trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE)
|
||||
|
||||
with (
|
||||
patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True),
|
||||
patch(
|
||||
"core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=mock_trace_instance
|
||||
),
|
||||
patch("core.ops.ops_trace_manager.trace_manager_queue", mock_queue),
|
||||
):
|
||||
manager = TraceQueueManager(app_id="test-app-id")
|
||||
manager.add_trace_task(trace_task)
|
||||
|
||||
mock_queue.put.assert_called_once()
|
||||
called_task = mock_queue.put.call_args[0][0]
|
||||
assert called_task.app_id == "test-app-id"
|
||||
|
||||
def test_app_id_set_before_enqueue(self, trace_queue_manager_and_task):
|
||||
"""Verify app_id is set on the task before enqueuing.
|
||||
|
||||
The guard logic sets trace_task.app_id = self.app_id before calling
|
||||
trace_manager_queue.put(trace_task). This test verifies that behavior.
|
||||
"""
|
||||
TraceQueueManager, TraceTask, TraceTaskName = trace_queue_manager_and_task
|
||||
|
||||
mock_queue = MagicMock(spec=queue.Queue)
|
||||
|
||||
trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE)
|
||||
|
||||
with (
|
||||
patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True),
|
||||
patch("core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=None),
|
||||
patch("core.ops.ops_trace_manager.trace_manager_queue", mock_queue),
|
||||
):
|
||||
manager = TraceQueueManager(app_id="expected-app-id")
|
||||
manager.add_trace_task(trace_task)
|
||||
|
||||
called_task = mock_queue.put.call_args[0][0]
|
||||
assert called_task.app_id == "expected-app-id"
|
||||
181
api/tests/unit_tests/core/telemetry/test_facade.py
Normal file
181
api/tests/unit_tests/core/telemetry/test_facade.py
Normal file
@@ -0,0 +1,181 @@
|
||||
"""Unit tests for core.telemetry.emit() routing and enterprise-only filtering."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import queue
|
||||
import sys
|
||||
import types
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from core.ops.entities.trace_entity import TraceTaskName
|
||||
from core.telemetry.events import TelemetryContext, TelemetryEvent
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def telemetry_test_setup(monkeypatch):
|
||||
module_name = "core.ops.ops_trace_manager"
|
||||
ops_stub = types.ModuleType(module_name)
|
||||
|
||||
class StubTraceTask:
|
||||
def __init__(self, trace_type, **kwargs):
|
||||
self.trace_type = trace_type
|
||||
self.app_id = None
|
||||
self.kwargs = kwargs
|
||||
|
||||
class StubTraceQueueManager:
|
||||
def __init__(self, app_id=None, user_id=None):
|
||||
self.app_id = app_id
|
||||
self.user_id = user_id
|
||||
self.trace_instance = StubOpsTraceManager.get_ops_trace_instance(app_id)
|
||||
|
||||
def add_trace_task(self, trace_task):
|
||||
trace_task.app_id = self.app_id
|
||||
from core.ops.ops_trace_manager import trace_manager_queue
|
||||
|
||||
trace_manager_queue.put(trace_task)
|
||||
|
||||
class StubOpsTraceManager:
|
||||
@staticmethod
|
||||
def get_ops_trace_instance(app_id):
|
||||
return None
|
||||
|
||||
ops_stub.TraceQueueManager = StubTraceQueueManager
|
||||
ops_stub.TraceTask = StubTraceTask
|
||||
ops_stub.OpsTraceManager = StubOpsTraceManager
|
||||
ops_stub.trace_manager_queue = MagicMock(spec=queue.Queue)
|
||||
monkeypatch.setitem(sys.modules, module_name, ops_stub)
|
||||
|
||||
from core.telemetry import emit
|
||||
|
||||
return emit, ops_stub.trace_manager_queue
|
||||
|
||||
|
||||
class TestTelemetryEmit:
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_emit_enterprise_trace_creates_trace_task(self, _mock_ee, telemetry_test_setup):
|
||||
emit_fn, mock_queue = telemetry_test_setup
|
||||
|
||||
event = TelemetryEvent(
|
||||
name=TraceTaskName.DRAFT_NODE_EXECUTION_TRACE,
|
||||
context=TelemetryContext(
|
||||
tenant_id="test-tenant",
|
||||
user_id="test-user",
|
||||
app_id="test-app",
|
||||
),
|
||||
payload={"key": "value"},
|
||||
)
|
||||
|
||||
emit_fn(event)
|
||||
|
||||
mock_queue.put.assert_called_once()
|
||||
called_task = mock_queue.put.call_args[0][0]
|
||||
assert called_task.trace_type == TraceTaskName.DRAFT_NODE_EXECUTION_TRACE
|
||||
|
||||
def test_emit_community_trace_enqueued(self, telemetry_test_setup):
|
||||
emit_fn, mock_queue = telemetry_test_setup
|
||||
|
||||
event = TelemetryEvent(
|
||||
name=TraceTaskName.WORKFLOW_TRACE,
|
||||
context=TelemetryContext(
|
||||
tenant_id="test-tenant",
|
||||
user_id="test-user",
|
||||
app_id="test-app",
|
||||
),
|
||||
payload={},
|
||||
)
|
||||
|
||||
emit_fn(event)
|
||||
|
||||
mock_queue.put.assert_called_once()
|
||||
|
||||
def test_emit_enterprise_only_trace_dropped_when_ee_disabled(self, telemetry_test_setup):
|
||||
emit_fn, mock_queue = telemetry_test_setup
|
||||
|
||||
event = TelemetryEvent(
|
||||
name=TraceTaskName.DRAFT_NODE_EXECUTION_TRACE,
|
||||
context=TelemetryContext(
|
||||
tenant_id="test-tenant",
|
||||
user_id="test-user",
|
||||
app_id="test-app",
|
||||
),
|
||||
payload={},
|
||||
)
|
||||
|
||||
emit_fn(event)
|
||||
|
||||
mock_queue.put.assert_not_called()
|
||||
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_emit_all_enterprise_only_traces_allowed_when_ee_enabled(self, _mock_ee, telemetry_test_setup):
|
||||
emit_fn, mock_queue = telemetry_test_setup
|
||||
|
||||
enterprise_only_traces = [
|
||||
TraceTaskName.DRAFT_NODE_EXECUTION_TRACE,
|
||||
TraceTaskName.NODE_EXECUTION_TRACE,
|
||||
TraceTaskName.PROMPT_GENERATION_TRACE,
|
||||
]
|
||||
|
||||
for trace_name in enterprise_only_traces:
|
||||
mock_queue.reset_mock()
|
||||
|
||||
event = TelemetryEvent(
|
||||
name=trace_name,
|
||||
context=TelemetryContext(
|
||||
tenant_id="test-tenant",
|
||||
user_id="test-user",
|
||||
app_id="test-app",
|
||||
),
|
||||
payload={},
|
||||
)
|
||||
|
||||
emit_fn(event)
|
||||
|
||||
mock_queue.put.assert_called_once()
|
||||
called_task = mock_queue.put.call_args[0][0]
|
||||
assert called_task.trace_type == trace_name
|
||||
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_emit_passes_name_directly_to_trace_task(self, _mock_ee, telemetry_test_setup):
|
||||
emit_fn, mock_queue = telemetry_test_setup
|
||||
|
||||
event = TelemetryEvent(
|
||||
name=TraceTaskName.DRAFT_NODE_EXECUTION_TRACE,
|
||||
context=TelemetryContext(
|
||||
tenant_id="test-tenant",
|
||||
user_id="test-user",
|
||||
app_id="test-app",
|
||||
),
|
||||
payload={"extra": "data"},
|
||||
)
|
||||
|
||||
emit_fn(event)
|
||||
|
||||
mock_queue.put.assert_called_once()
|
||||
called_task = mock_queue.put.call_args[0][0]
|
||||
assert called_task.trace_type == TraceTaskName.DRAFT_NODE_EXECUTION_TRACE
|
||||
assert isinstance(called_task.trace_type, TraceTaskName)
|
||||
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_emit_with_provided_trace_manager(self, _mock_ee, telemetry_test_setup):
|
||||
emit_fn, mock_queue = telemetry_test_setup
|
||||
|
||||
mock_trace_manager = MagicMock()
|
||||
mock_trace_manager.add_trace_task = MagicMock()
|
||||
|
||||
event = TelemetryEvent(
|
||||
name=TraceTaskName.NODE_EXECUTION_TRACE,
|
||||
context=TelemetryContext(
|
||||
tenant_id="test-tenant",
|
||||
user_id="test-user",
|
||||
app_id="test-app",
|
||||
),
|
||||
payload={},
|
||||
)
|
||||
|
||||
emit_fn(event, trace_manager=mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
called_task = mock_trace_manager.add_trace_task.call_args[0][0]
|
||||
assert called_task.trace_type == TraceTaskName.NODE_EXECUTION_TRACE
|
||||
225
api/tests/unit_tests/core/telemetry/test_gateway_integration.py
Normal file
225
api/tests/unit_tests/core/telemetry/test_gateway_integration.py
Normal file
@@ -0,0 +1,225 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from core.telemetry.gateway import emit, is_enterprise_telemetry_enabled
|
||||
from enterprise.telemetry.contracts import TelemetryCase
|
||||
|
||||
|
||||
class TestTelemetryCoreExports:
|
||||
def test_is_enterprise_telemetry_enabled_exported(self) -> None:
|
||||
from core.telemetry.gateway import is_enterprise_telemetry_enabled as exported_func
|
||||
|
||||
assert callable(exported_func)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_ops_trace_manager():
|
||||
mock_module = MagicMock()
|
||||
mock_trace_task_class = MagicMock()
|
||||
mock_trace_task_class.return_value = MagicMock()
|
||||
mock_module.TraceTask = mock_trace_task_class
|
||||
mock_module.TraceQueueManager = MagicMock()
|
||||
|
||||
mock_trace_entity = MagicMock()
|
||||
mock_trace_task_name = MagicMock()
|
||||
mock_trace_task_name.return_value = "workflow"
|
||||
mock_trace_entity.TraceTaskName = mock_trace_task_name
|
||||
|
||||
with (
|
||||
patch.dict(sys.modules, {"core.ops.ops_trace_manager": mock_module}),
|
||||
patch.dict(sys.modules, {"core.ops.entities.trace_entity": mock_trace_entity}),
|
||||
):
|
||||
yield mock_module, mock_trace_entity
|
||||
|
||||
|
||||
class TestGatewayIntegrationTraceRouting:
|
||||
@pytest.fixture
|
||||
def mock_trace_manager(self) -> MagicMock:
|
||||
return MagicMock()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_ce_eligible_trace_routed_to_trace_manager(
|
||||
self,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True):
|
||||
context = {"app_id": "app-123", "user_id": "user-456", "tenant_id": "tenant-789"}
|
||||
payload = {"workflow_run_id": "run-abc"}
|
||||
|
||||
emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_ce_eligible_trace_routed_when_ee_disabled(
|
||||
self,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"workflow_run_id": "run-abc"}
|
||||
|
||||
emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_enterprise_only_trace_dropped_when_ee_disabled(
|
||||
self,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"node_id": "node-abc"}
|
||||
|
||||
emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_not_called()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_enterprise_only_trace_routed_when_ee_enabled(
|
||||
self,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"node_id": "node-abc"}
|
||||
|
||||
emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
|
||||
class TestGatewayIntegrationMetricRouting:
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_metric_case_routes_to_celery_task(
|
||||
self,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
from enterprise.telemetry.contracts import TelemetryEnvelope
|
||||
|
||||
with patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay") as mock_delay:
|
||||
context = {"tenant_id": "tenant-123"}
|
||||
payload = {"app_id": "app-abc", "name": "My App"}
|
||||
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
|
||||
mock_delay.assert_called_once()
|
||||
envelope_json = mock_delay.call_args[0][0]
|
||||
envelope = TelemetryEnvelope.model_validate_json(envelope_json)
|
||||
assert envelope.case == TelemetryCase.APP_CREATED
|
||||
assert envelope.tenant_id == "tenant-123"
|
||||
assert envelope.payload["app_id"] == "app-abc"
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_tool_execution_trace_routed(
|
||||
self,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
mock_trace_manager = MagicMock()
|
||||
context = {"tenant_id": "tenant-123", "app_id": "app-123"}
|
||||
payload = {"tool_name": "test_tool", "tool_inputs": {}, "tool_outputs": "result"}
|
||||
|
||||
emit(TelemetryCase.TOOL_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_moderation_check_trace_routed(
|
||||
self,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
mock_trace_manager = MagicMock()
|
||||
context = {"tenant_id": "tenant-123", "app_id": "app-123"}
|
||||
payload = {"message_id": "msg-123", "moderation_result": {"flagged": False}}
|
||||
|
||||
emit(TelemetryCase.MODERATION_CHECK, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
|
||||
class TestGatewayIntegrationCEEligibility:
|
||||
@pytest.fixture
|
||||
def mock_trace_manager(self) -> MagicMock:
|
||||
return MagicMock()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_workflow_run_is_ce_eligible(
|
||||
self,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"workflow_run_id": "run-abc"}
|
||||
|
||||
emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_message_run_is_ce_eligible(
|
||||
self,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"message_id": "msg-abc", "conversation_id": "conv-123"}
|
||||
|
||||
emit(TelemetryCase.MESSAGE_RUN, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_node_execution_not_ce_eligible(
|
||||
self,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"node_id": "node-abc"}
|
||||
|
||||
emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_not_called()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_draft_node_execution_not_ce_eligible(
|
||||
self,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"node_execution_data": {}}
|
||||
|
||||
emit(TelemetryCase.DRAFT_NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_not_called()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_prompt_generation_not_ce_eligible(
|
||||
self,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456", "tenant_id": "tenant-789"}
|
||||
payload = {"operation_type": "generate", "instruction": "test"}
|
||||
|
||||
emit(TelemetryCase.PROMPT_GENERATION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_not_called()
|
||||
|
||||
|
||||
class TestIsEnterpriseTelemetryEnabled:
|
||||
def test_returns_false_when_exporter_import_fails(self) -> None:
|
||||
with patch.dict(sys.modules, {"enterprise.telemetry.exporter": None}):
|
||||
result = is_enterprise_telemetry_enabled()
|
||||
assert result is False
|
||||
|
||||
def test_function_is_callable(self) -> None:
|
||||
assert callable(is_enterprise_telemetry_enabled)
|
||||
259
api/tests/unit_tests/enterprise/telemetry/test_contracts.py
Normal file
259
api/tests/unit_tests/enterprise/telemetry/test_contracts.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""Unit tests for telemetry gateway contracts."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from pydantic import ValidationError
|
||||
|
||||
from core.telemetry.gateway import CASE_ROUTING
|
||||
from enterprise.telemetry.contracts import CaseRoute, SignalType, TelemetryCase, TelemetryEnvelope
|
||||
|
||||
|
||||
class TestTelemetryCase:
|
||||
"""Tests for TelemetryCase enum."""
|
||||
|
||||
def test_all_cases_defined(self) -> None:
|
||||
"""Verify all 14 telemetry cases are defined."""
|
||||
expected_cases = {
|
||||
"WORKFLOW_RUN",
|
||||
"NODE_EXECUTION",
|
||||
"DRAFT_NODE_EXECUTION",
|
||||
"MESSAGE_RUN",
|
||||
"TOOL_EXECUTION",
|
||||
"MODERATION_CHECK",
|
||||
"SUGGESTED_QUESTION",
|
||||
"DATASET_RETRIEVAL",
|
||||
"GENERATE_NAME",
|
||||
"PROMPT_GENERATION",
|
||||
"APP_CREATED",
|
||||
"APP_UPDATED",
|
||||
"APP_DELETED",
|
||||
"FEEDBACK_CREATED",
|
||||
}
|
||||
actual_cases = {case.name for case in TelemetryCase}
|
||||
assert actual_cases == expected_cases
|
||||
|
||||
def test_case_values(self) -> None:
|
||||
"""Verify case enum values are correct."""
|
||||
assert TelemetryCase.WORKFLOW_RUN.value == "workflow_run"
|
||||
assert TelemetryCase.NODE_EXECUTION.value == "node_execution"
|
||||
assert TelemetryCase.DRAFT_NODE_EXECUTION.value == "draft_node_execution"
|
||||
assert TelemetryCase.MESSAGE_RUN.value == "message_run"
|
||||
assert TelemetryCase.TOOL_EXECUTION.value == "tool_execution"
|
||||
assert TelemetryCase.MODERATION_CHECK.value == "moderation_check"
|
||||
assert TelemetryCase.SUGGESTED_QUESTION.value == "suggested_question"
|
||||
assert TelemetryCase.DATASET_RETRIEVAL.value == "dataset_retrieval"
|
||||
assert TelemetryCase.GENERATE_NAME.value == "generate_name"
|
||||
assert TelemetryCase.PROMPT_GENERATION.value == "prompt_generation"
|
||||
assert TelemetryCase.APP_CREATED.value == "app_created"
|
||||
assert TelemetryCase.APP_UPDATED.value == "app_updated"
|
||||
assert TelemetryCase.APP_DELETED.value == "app_deleted"
|
||||
assert TelemetryCase.FEEDBACK_CREATED.value == "feedback_created"
|
||||
|
||||
|
||||
class TestCaseRoute:
|
||||
"""Tests for CaseRoute model."""
|
||||
|
||||
def test_valid_trace_route(self) -> None:
|
||||
"""Verify valid trace route creation."""
|
||||
route = CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True)
|
||||
assert route.signal_type == SignalType.TRACE
|
||||
assert route.ce_eligible is True
|
||||
|
||||
def test_valid_metric_log_route(self) -> None:
|
||||
"""Verify valid metric_log route creation."""
|
||||
route = CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False)
|
||||
assert route.signal_type == SignalType.METRIC_LOG
|
||||
assert route.ce_eligible is False
|
||||
|
||||
def test_invalid_signal_type(self) -> None:
|
||||
"""Verify invalid signal_type is rejected."""
|
||||
with pytest.raises(ValidationError):
|
||||
CaseRoute(signal_type="invalid", ce_eligible=True)
|
||||
|
||||
|
||||
class TestTelemetryEnvelope:
|
||||
"""Tests for TelemetryEnvelope model."""
|
||||
|
||||
def test_valid_envelope_minimal(self) -> None:
|
||||
"""Verify valid minimal envelope creation."""
|
||||
envelope = TelemetryEnvelope(
|
||||
case=TelemetryCase.WORKFLOW_RUN,
|
||||
tenant_id="tenant-123",
|
||||
event_id="event-456",
|
||||
payload={"key": "value"},
|
||||
)
|
||||
assert envelope.case == TelemetryCase.WORKFLOW_RUN
|
||||
assert envelope.tenant_id == "tenant-123"
|
||||
assert envelope.event_id == "event-456"
|
||||
assert envelope.payload == {"key": "value"}
|
||||
assert envelope.payload_fallback is None
|
||||
assert envelope.metadata is None
|
||||
|
||||
def test_valid_envelope_full(self) -> None:
|
||||
"""Verify valid envelope with all fields."""
|
||||
metadata = {"source": "api"}
|
||||
fallback = b"fallback data"
|
||||
envelope = TelemetryEnvelope(
|
||||
case=TelemetryCase.MESSAGE_RUN,
|
||||
tenant_id="tenant-789",
|
||||
event_id="event-012",
|
||||
payload={"message": "hello"},
|
||||
payload_fallback=fallback,
|
||||
metadata=metadata,
|
||||
)
|
||||
assert envelope.case == TelemetryCase.MESSAGE_RUN
|
||||
assert envelope.tenant_id == "tenant-789"
|
||||
assert envelope.event_id == "event-012"
|
||||
assert envelope.payload == {"message": "hello"}
|
||||
assert envelope.payload_fallback == fallback
|
||||
assert envelope.metadata == metadata
|
||||
|
||||
def test_missing_required_case(self) -> None:
|
||||
"""Verify missing case field is rejected."""
|
||||
with pytest.raises(ValidationError):
|
||||
TelemetryEnvelope(
|
||||
tenant_id="tenant-123",
|
||||
event_id="event-456",
|
||||
payload={"key": "value"},
|
||||
)
|
||||
|
||||
def test_missing_required_tenant_id(self) -> None:
|
||||
"""Verify missing tenant_id field is rejected."""
|
||||
with pytest.raises(ValidationError):
|
||||
TelemetryEnvelope(
|
||||
case=TelemetryCase.WORKFLOW_RUN,
|
||||
event_id="event-456",
|
||||
payload={"key": "value"},
|
||||
)
|
||||
|
||||
def test_missing_required_event_id(self) -> None:
|
||||
"""Verify missing event_id field is rejected."""
|
||||
with pytest.raises(ValidationError):
|
||||
TelemetryEnvelope(
|
||||
case=TelemetryCase.WORKFLOW_RUN,
|
||||
tenant_id="tenant-123",
|
||||
payload={"key": "value"},
|
||||
)
|
||||
|
||||
def test_missing_required_payload(self) -> None:
|
||||
"""Verify missing payload field is rejected."""
|
||||
with pytest.raises(ValidationError):
|
||||
TelemetryEnvelope(
|
||||
case=TelemetryCase.WORKFLOW_RUN,
|
||||
tenant_id="tenant-123",
|
||||
event_id="event-456",
|
||||
)
|
||||
|
||||
def test_payload_fallback_within_limit(self) -> None:
|
||||
"""Verify payload_fallback within 64KB limit is accepted."""
|
||||
fallback = b"x" * 65536
|
||||
envelope = TelemetryEnvelope(
|
||||
case=TelemetryCase.WORKFLOW_RUN,
|
||||
tenant_id="tenant-123",
|
||||
event_id="event-456",
|
||||
payload={"key": "value"},
|
||||
payload_fallback=fallback,
|
||||
)
|
||||
assert envelope.payload_fallback == fallback
|
||||
|
||||
def test_payload_fallback_exceeds_limit(self) -> None:
|
||||
"""Verify payload_fallback exceeding 64KB is rejected."""
|
||||
fallback = b"x" * 65537
|
||||
with pytest.raises(ValidationError) as exc_info:
|
||||
TelemetryEnvelope(
|
||||
case=TelemetryCase.WORKFLOW_RUN,
|
||||
tenant_id="tenant-123",
|
||||
event_id="event-456",
|
||||
payload={"key": "value"},
|
||||
payload_fallback=fallback,
|
||||
)
|
||||
assert "64KB" in str(exc_info.value)
|
||||
|
||||
def test_payload_fallback_none(self) -> None:
|
||||
"""Verify payload_fallback can be None."""
|
||||
envelope = TelemetryEnvelope(
|
||||
case=TelemetryCase.WORKFLOW_RUN,
|
||||
tenant_id="tenant-123",
|
||||
event_id="event-456",
|
||||
payload={"key": "value"},
|
||||
payload_fallback=None,
|
||||
)
|
||||
assert envelope.payload_fallback is None
|
||||
|
||||
|
||||
class TestCaseRouting:
|
||||
"""Tests for CASE_ROUTING table."""
|
||||
|
||||
def test_all_cases_routed(self) -> None:
|
||||
"""Verify all 14 cases have routing entries."""
|
||||
assert len(CASE_ROUTING) == 14
|
||||
for case in TelemetryCase:
|
||||
assert case in CASE_ROUTING
|
||||
|
||||
def test_trace_ce_eligible_cases(self) -> None:
|
||||
"""Verify trace cases with CE eligibility."""
|
||||
ce_eligible_trace_cases = {
|
||||
TelemetryCase.WORKFLOW_RUN,
|
||||
TelemetryCase.MESSAGE_RUN,
|
||||
}
|
||||
for case in ce_eligible_trace_cases:
|
||||
route = CASE_ROUTING[case]
|
||||
assert route.signal_type == SignalType.TRACE
|
||||
assert route.ce_eligible is True
|
||||
|
||||
def test_trace_enterprise_only_cases(self) -> None:
|
||||
"""Verify trace cases that are enterprise-only."""
|
||||
enterprise_only_trace_cases = {
|
||||
TelemetryCase.NODE_EXECUTION,
|
||||
TelemetryCase.DRAFT_NODE_EXECUTION,
|
||||
TelemetryCase.PROMPT_GENERATION,
|
||||
}
|
||||
for case in enterprise_only_trace_cases:
|
||||
route = CASE_ROUTING[case]
|
||||
assert route.signal_type == SignalType.TRACE
|
||||
assert route.ce_eligible is False
|
||||
|
||||
def test_metric_log_cases(self) -> None:
|
||||
"""Verify metric/log-only cases."""
|
||||
metric_log_cases = {
|
||||
TelemetryCase.APP_CREATED,
|
||||
TelemetryCase.APP_UPDATED,
|
||||
TelemetryCase.APP_DELETED,
|
||||
TelemetryCase.FEEDBACK_CREATED,
|
||||
}
|
||||
for case in metric_log_cases:
|
||||
route = CASE_ROUTING[case]
|
||||
assert route.signal_type == SignalType.METRIC_LOG
|
||||
assert route.ce_eligible is False
|
||||
|
||||
def test_routing_table_completeness(self) -> None:
|
||||
"""Verify routing table covers all cases with correct types."""
|
||||
trace_cases = {
|
||||
TelemetryCase.WORKFLOW_RUN,
|
||||
TelemetryCase.MESSAGE_RUN,
|
||||
TelemetryCase.NODE_EXECUTION,
|
||||
TelemetryCase.DRAFT_NODE_EXECUTION,
|
||||
TelemetryCase.PROMPT_GENERATION,
|
||||
TelemetryCase.TOOL_EXECUTION,
|
||||
TelemetryCase.MODERATION_CHECK,
|
||||
TelemetryCase.SUGGESTED_QUESTION,
|
||||
TelemetryCase.DATASET_RETRIEVAL,
|
||||
TelemetryCase.GENERATE_NAME,
|
||||
}
|
||||
metric_log_cases = {
|
||||
TelemetryCase.APP_CREATED,
|
||||
TelemetryCase.APP_UPDATED,
|
||||
TelemetryCase.APP_DELETED,
|
||||
TelemetryCase.FEEDBACK_CREATED,
|
||||
}
|
||||
|
||||
all_cases = trace_cases | metric_log_cases
|
||||
assert len(all_cases) == 14
|
||||
assert all_cases == set(TelemetryCase)
|
||||
|
||||
for case in trace_cases:
|
||||
assert CASE_ROUTING[case].signal_type == SignalType.TRACE
|
||||
|
||||
for case in metric_log_cases:
|
||||
assert CASE_ROUTING[case].signal_type == SignalType.METRIC_LOG
|
||||
121
api/tests/unit_tests/enterprise/telemetry/test_event_handlers.py
Normal file
121
api/tests/unit_tests/enterprise/telemetry/test_event_handlers.py
Normal file
@@ -0,0 +1,121 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from enterprise.telemetry import event_handlers
|
||||
from enterprise.telemetry.contracts import TelemetryCase
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_gateway_emit():
|
||||
with patch("core.telemetry.gateway.emit") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
def test_handle_app_created_calls_task(mock_gateway_emit):
|
||||
sender = MagicMock()
|
||||
sender.id = "app-123"
|
||||
sender.tenant_id = "tenant-456"
|
||||
sender.mode = "chat"
|
||||
|
||||
event_handlers._handle_app_created(sender)
|
||||
|
||||
mock_gateway_emit.assert_called_once_with(
|
||||
case=TelemetryCase.APP_CREATED,
|
||||
context={"tenant_id": "tenant-456"},
|
||||
payload={"app_id": "app-123", "mode": "chat"},
|
||||
)
|
||||
|
||||
|
||||
def test_handle_app_created_no_exporter(mock_gateway_emit):
|
||||
"""Gateway handles exporter availability internally; handler always calls gateway."""
|
||||
sender = MagicMock()
|
||||
sender.id = "app-123"
|
||||
sender.tenant_id = "tenant-456"
|
||||
|
||||
event_handlers._handle_app_created(sender)
|
||||
|
||||
mock_gateway_emit.assert_called_once()
|
||||
|
||||
|
||||
def test_handle_app_updated_calls_task(mock_gateway_emit):
|
||||
sender = MagicMock()
|
||||
sender.id = "app-123"
|
||||
sender.tenant_id = "tenant-456"
|
||||
|
||||
event_handlers._handle_app_updated(sender)
|
||||
|
||||
mock_gateway_emit.assert_called_once_with(
|
||||
case=TelemetryCase.APP_UPDATED,
|
||||
context={"tenant_id": "tenant-456"},
|
||||
payload={"app_id": "app-123"},
|
||||
)
|
||||
|
||||
|
||||
def test_handle_app_deleted_calls_task(mock_gateway_emit):
|
||||
sender = MagicMock()
|
||||
sender.id = "app-123"
|
||||
sender.tenant_id = "tenant-456"
|
||||
|
||||
event_handlers._handle_app_deleted(sender)
|
||||
|
||||
mock_gateway_emit.assert_called_once_with(
|
||||
case=TelemetryCase.APP_DELETED,
|
||||
context={"tenant_id": "tenant-456"},
|
||||
payload={"app_id": "app-123"},
|
||||
)
|
||||
|
||||
|
||||
def test_handle_feedback_created_calls_task(mock_gateway_emit):
|
||||
sender = MagicMock()
|
||||
sender.message_id = "msg-123"
|
||||
sender.app_id = "app-456"
|
||||
sender.conversation_id = "conv-789"
|
||||
sender.from_end_user_id = "user-001"
|
||||
sender.from_account_id = None
|
||||
sender.rating = "like"
|
||||
sender.from_source = "api"
|
||||
sender.content = "Great response!"
|
||||
|
||||
event_handlers._handle_feedback_created(sender, tenant_id="tenant-456")
|
||||
|
||||
mock_gateway_emit.assert_called_once_with(
|
||||
case=TelemetryCase.FEEDBACK_CREATED,
|
||||
context={"tenant_id": "tenant-456"},
|
||||
payload={
|
||||
"message_id": "msg-123",
|
||||
"app_id": "app-456",
|
||||
"conversation_id": "conv-789",
|
||||
"from_end_user_id": "user-001",
|
||||
"from_account_id": None,
|
||||
"rating": "like",
|
||||
"from_source": "api",
|
||||
"content": "Great response!",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def test_handle_feedback_created_no_exporter(mock_gateway_emit):
|
||||
"""Gateway handles exporter availability internally; handler always calls gateway."""
|
||||
sender = MagicMock()
|
||||
sender.message_id = "msg-123"
|
||||
|
||||
event_handlers._handle_feedback_created(sender, tenant_id="tenant-456")
|
||||
|
||||
mock_gateway_emit.assert_called_once()
|
||||
|
||||
|
||||
def test_handlers_create_valid_envelopes(mock_gateway_emit):
|
||||
"""Verify handlers pass correct TelemetryCase and payload structure."""
|
||||
sender = MagicMock()
|
||||
sender.id = "app-123"
|
||||
sender.tenant_id = "tenant-456"
|
||||
sender.mode = "chat"
|
||||
|
||||
event_handlers._handle_app_created(sender)
|
||||
|
||||
call_kwargs = mock_gateway_emit.call_args[1]
|
||||
assert call_kwargs["case"] == TelemetryCase.APP_CREATED
|
||||
assert call_kwargs["context"]["tenant_id"] == "tenant-456"
|
||||
assert call_kwargs["payload"]["app_id"] == "app-123"
|
||||
assert call_kwargs["payload"]["mode"] == "chat"
|
||||
272
api/tests/unit_tests/enterprise/telemetry/test_gateway.py
Normal file
272
api/tests/unit_tests/enterprise/telemetry/test_gateway.py
Normal file
@@ -0,0 +1,272 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from core.ops.entities.trace_entity import TraceTaskName
|
||||
from core.telemetry.gateway import (
|
||||
CASE_ROUTING,
|
||||
CASE_TO_TRACE_TASK,
|
||||
PAYLOAD_SIZE_THRESHOLD_BYTES,
|
||||
emit,
|
||||
)
|
||||
from enterprise.telemetry.contracts import SignalType, TelemetryCase, TelemetryEnvelope
|
||||
|
||||
|
||||
class TestCaseRoutingTable:
|
||||
def test_all_cases_have_routing(self) -> None:
|
||||
for case in TelemetryCase:
|
||||
assert case in CASE_ROUTING, f"Missing routing for {case}"
|
||||
|
||||
def test_trace_cases(self) -> None:
|
||||
trace_cases = [
|
||||
TelemetryCase.WORKFLOW_RUN,
|
||||
TelemetryCase.MESSAGE_RUN,
|
||||
TelemetryCase.NODE_EXECUTION,
|
||||
TelemetryCase.DRAFT_NODE_EXECUTION,
|
||||
TelemetryCase.PROMPT_GENERATION,
|
||||
]
|
||||
for case in trace_cases:
|
||||
assert CASE_ROUTING[case].signal_type is SignalType.TRACE, f"{case} should be trace"
|
||||
|
||||
def test_metric_log_cases(self) -> None:
|
||||
metric_log_cases = [
|
||||
TelemetryCase.APP_CREATED,
|
||||
TelemetryCase.APP_UPDATED,
|
||||
TelemetryCase.APP_DELETED,
|
||||
TelemetryCase.FEEDBACK_CREATED,
|
||||
]
|
||||
for case in metric_log_cases:
|
||||
assert CASE_ROUTING[case].signal_type is SignalType.METRIC_LOG, f"{case} should be metric_log"
|
||||
|
||||
def test_ce_eligible_cases(self) -> None:
|
||||
ce_eligible_cases = [
|
||||
TelemetryCase.WORKFLOW_RUN,
|
||||
TelemetryCase.MESSAGE_RUN,
|
||||
TelemetryCase.TOOL_EXECUTION,
|
||||
TelemetryCase.MODERATION_CHECK,
|
||||
TelemetryCase.SUGGESTED_QUESTION,
|
||||
TelemetryCase.DATASET_RETRIEVAL,
|
||||
TelemetryCase.GENERATE_NAME,
|
||||
]
|
||||
for case in ce_eligible_cases:
|
||||
assert CASE_ROUTING[case].ce_eligible is True, f"{case} should be CE eligible"
|
||||
|
||||
def test_enterprise_only_cases(self) -> None:
|
||||
enterprise_only_cases = [
|
||||
TelemetryCase.NODE_EXECUTION,
|
||||
TelemetryCase.DRAFT_NODE_EXECUTION,
|
||||
TelemetryCase.PROMPT_GENERATION,
|
||||
]
|
||||
for case in enterprise_only_cases:
|
||||
assert CASE_ROUTING[case].ce_eligible is False, f"{case} should be enterprise-only"
|
||||
|
||||
def test_trace_cases_have_task_name_mapping(self) -> None:
|
||||
trace_cases = [c for c in TelemetryCase if CASE_ROUTING[c].signal_type is SignalType.TRACE]
|
||||
for case in trace_cases:
|
||||
assert case in CASE_TO_TRACE_TASK, f"Missing TraceTaskName mapping for {case}"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_ops_trace_manager():
|
||||
mock_module = MagicMock()
|
||||
mock_trace_task_class = MagicMock()
|
||||
mock_trace_task_class.return_value = MagicMock()
|
||||
mock_module.TraceTask = mock_trace_task_class
|
||||
mock_module.TraceQueueManager = MagicMock()
|
||||
|
||||
mock_trace_entity = MagicMock()
|
||||
mock_trace_task_name = MagicMock()
|
||||
mock_trace_task_name.return_value = "workflow"
|
||||
mock_trace_entity.TraceTaskName = mock_trace_task_name
|
||||
|
||||
with (
|
||||
patch.dict(sys.modules, {"core.ops.ops_trace_manager": mock_module}),
|
||||
patch.dict(sys.modules, {"core.ops.entities.trace_entity": mock_trace_entity}),
|
||||
):
|
||||
yield mock_module, mock_trace_entity
|
||||
|
||||
|
||||
class TestGatewayTraceRouting:
|
||||
@pytest.fixture
|
||||
def mock_trace_manager(self) -> MagicMock:
|
||||
return MagicMock()
|
||||
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_trace_case_routes_to_trace_manager(
|
||||
self,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
mock_trace_manager: MagicMock,
|
||||
mock_ops_trace_manager: tuple[MagicMock, MagicMock],
|
||||
) -> None:
|
||||
context = {"app_id": "app-123", "user_id": "user-456", "tenant_id": "tenant-789"}
|
||||
payload = {"workflow_run_id": "run-abc"}
|
||||
|
||||
emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False)
|
||||
def test_ce_eligible_trace_enqueued_when_ee_disabled(
|
||||
self,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
mock_trace_manager: MagicMock,
|
||||
mock_ops_trace_manager: tuple[MagicMock, MagicMock],
|
||||
) -> None:
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"workflow_run_id": "run-abc"}
|
||||
|
||||
emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False)
|
||||
def test_enterprise_only_trace_dropped_when_ee_disabled(
|
||||
self,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
mock_trace_manager: MagicMock,
|
||||
mock_ops_trace_manager: tuple[MagicMock, MagicMock],
|
||||
) -> None:
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"node_id": "node-abc"}
|
||||
|
||||
emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_not_called()
|
||||
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_enterprise_only_trace_enqueued_when_ee_enabled(
|
||||
self,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
mock_trace_manager: MagicMock,
|
||||
mock_ops_trace_manager: tuple[MagicMock, MagicMock],
|
||||
) -> None:
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"node_id": "node-abc"}
|
||||
|
||||
emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
|
||||
class TestGatewayMetricLogRouting:
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay")
|
||||
def test_metric_case_routes_to_celery_task(
|
||||
self,
|
||||
mock_delay: MagicMock,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
context = {"tenant_id": "tenant-123"}
|
||||
payload = {"app_id": "app-abc", "name": "My App"}
|
||||
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
|
||||
mock_delay.assert_called_once()
|
||||
envelope_json = mock_delay.call_args[0][0]
|
||||
envelope = TelemetryEnvelope.model_validate_json(envelope_json)
|
||||
assert envelope.case == TelemetryCase.APP_CREATED
|
||||
assert envelope.tenant_id == "tenant-123"
|
||||
assert envelope.payload["app_id"] == "app-abc"
|
||||
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay")
|
||||
def test_envelope_has_unique_event_id(
|
||||
self,
|
||||
mock_delay: MagicMock,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
context = {"tenant_id": "tenant-123"}
|
||||
payload = {"app_id": "app-abc"}
|
||||
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
|
||||
assert mock_delay.call_count == 2
|
||||
envelope1 = TelemetryEnvelope.model_validate_json(mock_delay.call_args_list[0][0][0])
|
||||
envelope2 = TelemetryEnvelope.model_validate_json(mock_delay.call_args_list[1][0][0])
|
||||
assert envelope1.event_id != envelope2.event_id
|
||||
|
||||
|
||||
class TestGatewayPayloadSizing:
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay")
|
||||
def test_small_payload_inlined(
|
||||
self,
|
||||
mock_delay: MagicMock,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
context = {"tenant_id": "tenant-123"}
|
||||
payload = {"key": "small_value"}
|
||||
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
|
||||
envelope_json = mock_delay.call_args[0][0]
|
||||
envelope = TelemetryEnvelope.model_validate_json(envelope_json)
|
||||
assert envelope.payload == payload
|
||||
assert envelope.metadata is None
|
||||
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("core.telemetry.gateway.storage")
|
||||
@patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay")
|
||||
def test_large_payload_stored(
|
||||
self,
|
||||
mock_delay: MagicMock,
|
||||
mock_storage: MagicMock,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
context = {"tenant_id": "tenant-123"}
|
||||
large_value = "x" * (PAYLOAD_SIZE_THRESHOLD_BYTES + 1000)
|
||||
payload = {"key": large_value}
|
||||
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
|
||||
mock_storage.save.assert_called_once()
|
||||
storage_key = mock_storage.save.call_args[0][0]
|
||||
assert storage_key.startswith("telemetry/tenant-123/")
|
||||
|
||||
envelope_json = mock_delay.call_args[0][0]
|
||||
envelope = TelemetryEnvelope.model_validate_json(envelope_json)
|
||||
assert envelope.payload == {}
|
||||
assert envelope.metadata is not None
|
||||
assert envelope.metadata["payload_ref"] == storage_key
|
||||
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("core.telemetry.gateway.storage")
|
||||
@patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay")
|
||||
def test_large_payload_fallback_on_storage_error(
|
||||
self,
|
||||
mock_delay: MagicMock,
|
||||
mock_storage: MagicMock,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
mock_storage.save.side_effect = Exception("Storage failure")
|
||||
context = {"tenant_id": "tenant-123"}
|
||||
large_value = "x" * (PAYLOAD_SIZE_THRESHOLD_BYTES + 1000)
|
||||
payload = {"key": large_value}
|
||||
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
|
||||
envelope_json = mock_delay.call_args[0][0]
|
||||
envelope = TelemetryEnvelope.model_validate_json(envelope_json)
|
||||
assert envelope.payload == payload
|
||||
assert envelope.metadata is None
|
||||
|
||||
|
||||
class TestTraceTaskNameMapping:
|
||||
def test_workflow_run_mapping(self) -> None:
|
||||
assert CASE_TO_TRACE_TASK[TelemetryCase.WORKFLOW_RUN] is TraceTaskName.WORKFLOW_TRACE
|
||||
|
||||
def test_message_run_mapping(self) -> None:
|
||||
assert CASE_TO_TRACE_TASK[TelemetryCase.MESSAGE_RUN] is TraceTaskName.MESSAGE_TRACE
|
||||
|
||||
def test_node_execution_mapping(self) -> None:
|
||||
assert CASE_TO_TRACE_TASK[TelemetryCase.NODE_EXECUTION] is TraceTaskName.NODE_EXECUTION_TRACE
|
||||
|
||||
def test_draft_node_execution_mapping(self) -> None:
|
||||
assert CASE_TO_TRACE_TASK[TelemetryCase.DRAFT_NODE_EXECUTION] is TraceTaskName.DRAFT_NODE_EXECUTION_TRACE
|
||||
|
||||
def test_prompt_generation_mapping(self) -> None:
|
||||
assert CASE_TO_TRACE_TASK[TelemetryCase.PROMPT_GENERATION] is TraceTaskName.PROMPT_GENERATION_TRACE
|
||||
@@ -304,6 +304,7 @@ def test_on_app_created_emits_correct_event(mock_redis):
|
||||
attributes={
|
||||
"dify.app.id": "app-789",
|
||||
"dify.tenant_id": "tenant-123",
|
||||
"dify.event.id": "event-456",
|
||||
"dify.app.mode": "chat",
|
||||
},
|
||||
tenant_id="tenant-123",
|
||||
@@ -345,6 +346,7 @@ def test_on_app_updated_emits_correct_event(mock_redis):
|
||||
attributes={
|
||||
"dify.app.id": "app-789",
|
||||
"dify.tenant_id": "tenant-123",
|
||||
"dify.event.id": "event-456",
|
||||
},
|
||||
tenant_id="tenant-123",
|
||||
)
|
||||
@@ -384,6 +386,7 @@ def test_on_app_deleted_emits_correct_event(mock_redis):
|
||||
attributes={
|
||||
"dify.app.id": "app-789",
|
||||
"dify.tenant_id": "tenant-123",
|
||||
"dify.event.id": "event-456",
|
||||
},
|
||||
tenant_id="tenant-123",
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user