refactor(telemetry): add resolved_parent_context property and fix edge cases

- Add resolved_parent_context property to BaseTraceInfo for reusable parent context extraction
- Refactor enterprise_trace.py to use property instead of duplicated dict plucking (~19 lines eliminated)
- Fix UUID validation in exporter.py with specific error logging for invalid trace correlation IDs
- Add error isolation in event_handlers.py to prevent telemetry failures from breaking user operations
- Replace pickle-based payload_fallback with JSON storage rehydration for security
- Update TelemetryEnvelope to use Pydantic v2 ConfigDict with extra='forbid'
- Update tests to reflect contract changes and new error handling behavior
This commit is contained in:
GareArc
2026-03-01 19:33:59 -08:00
parent 808ab4bd75
commit bb8d830fa5
10 changed files with 190 additions and 158 deletions

View File

@@ -47,6 +47,28 @@ class BaseTraceInfo(BaseModel):
# Final fallback to message_id
return str(self.message_id) if self.message_id else None
@property
def resolved_parent_context(self) -> tuple[str | None, str | None]:
"""Resolve cross-workflow parent linking from metadata.
Extracts typed parent IDs from the untyped ``parent_trace_context``
metadata dict (set by tool_node when invoking nested workflows).
Returns:
(trace_correlation_override, parent_span_id_source) where
trace_correlation_override is the outer workflow_run_id and
parent_span_id_source is the outer node_execution_id.
"""
parent_ctx = self.metadata.get("parent_trace_context")
if not isinstance(parent_ctx, dict):
return None, None
trace_override = parent_ctx.get("parent_workflow_run_id")
parent_span = parent_ctx.get("parent_node_execution_id")
return (
trace_override if isinstance(trace_override, str) else None,
parent_span if isinstance(parent_span, str) else None,
)
@field_serializer("start_time", "end_time")
def serialize_datetime(self, dt: datetime | None) -> str | None:
if dt is None:

View File

@@ -9,7 +9,7 @@ from __future__ import annotations
from enum import StrEnum
from typing import Any
from pydantic import BaseModel, field_validator
from pydantic import BaseModel, ConfigDict
class TelemetryCase(StrEnum):
@@ -57,27 +57,17 @@ class TelemetryEnvelope(BaseModel):
case: The telemetry case type.
tenant_id: The tenant identifier.
event_id: Unique event identifier for deduplication.
payload: The main event payload.
payload_fallback: Fallback payload (max 64KB).
metadata: Optional metadata dictionary.
payload: The main event payload (inline for small payloads,
empty when offloaded to storage via ``payload_ref``).
metadata: Optional metadata dictionary. When the gateway
offloads a large payload to object storage, this contains
``{"payload_ref": "<storage_key>"}``.
"""
model_config = ConfigDict(extra="forbid", use_enum_values=False)
case: TelemetryCase
tenant_id: str
event_id: str
payload: dict[str, Any]
payload_fallback: bytes | None = None
metadata: dict[str, Any] | None = None
@field_validator("payload_fallback")
@classmethod
def validate_payload_fallback_size(cls, v: bytes | None) -> bytes | None:
"""Validate that payload_fallback does not exceed 64KB."""
if v is not None and len(v) > 65536: # 64 * 1024
raise ValueError("payload_fallback must not exceed 64KB")
return v
class Config:
"""Pydantic configuration."""
use_enum_values = False

View File

@@ -177,8 +177,7 @@ class EnterpriseOtelTrace:
"dify.invoked_by": info.invoked_by,
}
trace_correlation_override: str | None = None
parent_span_id_source: str | None = None
trace_correlation_override, parent_span_id_source = info.resolved_parent_context
parent_ctx = metadata.get("parent_trace_context")
if isinstance(parent_ctx, dict):
@@ -188,13 +187,6 @@ class EnterpriseOtelTrace:
span_attrs["dify.parent.workflow.run_id"] = parent_ctx_dict.get("parent_workflow_run_id")
span_attrs["dify.parent.app.id"] = parent_ctx_dict.get("parent_app_id")
trace_override_value = parent_ctx_dict.get("parent_workflow_run_id")
if isinstance(trace_override_value, str):
trace_correlation_override = trace_override_value
parent_span_value = parent_ctx_dict.get("parent_node_execution_id")
if isinstance(parent_span_value, str):
parent_span_id_source = parent_span_value
self._exporter.export_span(
EnterpriseTelemetrySpan.WORKFLOW_RUN,
span_attrs,
@@ -329,13 +321,8 @@ class EnterpriseOtelTrace:
"dify.node.invoked_by": info.invoked_by,
}
trace_correlation_override = trace_correlation_override_param
parent_ctx = metadata.get("parent_trace_context")
if isinstance(parent_ctx, dict):
parent_ctx_dict = cast(dict[str, Any], parent_ctx)
override_value = parent_ctx_dict.get("parent_workflow_run_id")
if isinstance(override_value, str):
trace_correlation_override = override_value
resolved_override, _ = info.resolved_parent_context
trace_correlation_override = trace_correlation_override_param or resolved_override
effective_correlation_id = correlation_id_override or info.workflow_run_id
self._exporter.export_span(

View File

@@ -4,6 +4,9 @@ Registered at import time via ``@signal.connect`` decorators.
Import must happen during ``ext_enterprise_telemetry.init_app()`` to
ensure handlers fire. Each handler delegates to ``core.telemetry.gateway``
which handles routing, EE-gating, and dispatch.
All handlers are best-effort: exceptions are caught and logged so that
telemetry failures never break user-facing operations.
"""
from __future__ import annotations
@@ -25,60 +28,72 @@ __all__ = [
@app_was_created.connect
def _handle_app_created(sender: object, **kwargs: object) -> None:
from core.telemetry.gateway import emit as gateway_emit
from enterprise.telemetry.contracts import TelemetryCase
try:
from core.telemetry.gateway import emit as gateway_emit
from enterprise.telemetry.contracts import TelemetryCase
gateway_emit(
case=TelemetryCase.APP_CREATED,
context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")},
payload={
"app_id": getattr(sender, "id", None),
"mode": getattr(sender, "mode", None),
},
)
gateway_emit(
case=TelemetryCase.APP_CREATED,
context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")},
payload={
"app_id": getattr(sender, "id", None),
"mode": getattr(sender, "mode", None),
},
)
except Exception:
logger.warning("Failed to emit app_created telemetry", exc_info=True)
@app_was_deleted.connect
def _handle_app_deleted(sender: object, **kwargs: object) -> None:
from core.telemetry.gateway import emit as gateway_emit
from enterprise.telemetry.contracts import TelemetryCase
try:
from core.telemetry.gateway import emit as gateway_emit
from enterprise.telemetry.contracts import TelemetryCase
gateway_emit(
case=TelemetryCase.APP_DELETED,
context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")},
payload={"app_id": getattr(sender, "id", None)},
)
gateway_emit(
case=TelemetryCase.APP_DELETED,
context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")},
payload={"app_id": getattr(sender, "id", None)},
)
except Exception:
logger.warning("Failed to emit app_deleted telemetry", exc_info=True)
@app_was_updated.connect
def _handle_app_updated(sender: object, **kwargs: object) -> None:
from core.telemetry.gateway import emit as gateway_emit
from enterprise.telemetry.contracts import TelemetryCase
try:
from core.telemetry.gateway import emit as gateway_emit
from enterprise.telemetry.contracts import TelemetryCase
gateway_emit(
case=TelemetryCase.APP_UPDATED,
context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")},
payload={"app_id": getattr(sender, "id", None)},
)
gateway_emit(
case=TelemetryCase.APP_UPDATED,
context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")},
payload={"app_id": getattr(sender, "id", None)},
)
except Exception:
logger.warning("Failed to emit app_updated telemetry", exc_info=True)
@feedback_was_created.connect
def _handle_feedback_created(sender: object, **kwargs: object) -> None:
from core.telemetry.gateway import emit as gateway_emit
from enterprise.telemetry.contracts import TelemetryCase
try:
from core.telemetry.gateway import emit as gateway_emit
from enterprise.telemetry.contracts import TelemetryCase
tenant_id = str(kwargs.get("tenant_id", "") or "")
gateway_emit(
case=TelemetryCase.FEEDBACK_CREATED,
context={"tenant_id": tenant_id},
payload={
"message_id": getattr(sender, "message_id", None),
"app_id": getattr(sender, "app_id", None),
"conversation_id": getattr(sender, "conversation_id", None),
"from_end_user_id": getattr(sender, "from_end_user_id", None),
"from_account_id": getattr(sender, "from_account_id", None),
"rating": getattr(sender, "rating", None),
"from_source": getattr(sender, "from_source", None),
"content": getattr(sender, "content", None),
},
)
tenant_id = str(kwargs.get("tenant_id", "") or "")
gateway_emit(
case=TelemetryCase.FEEDBACK_CREATED,
context={"tenant_id": tenant_id},
payload={
"message_id": getattr(sender, "message_id", None),
"app_id": getattr(sender, "app_id", None),
"conversation_id": getattr(sender, "conversation_id", None),
"from_end_user_id": getattr(sender, "from_end_user_id", None),
"from_account_id": getattr(sender, "from_account_id", None),
"rating": getattr(sender, "rating", None),
"from_source": getattr(sender, "from_source", None),
"content": getattr(sender, "content", None),
},
)
except Exception:
logger.warning("Failed to emit feedback_created telemetry", exc_info=True)

View File

@@ -109,10 +109,10 @@ class EnterpriseExporter:
sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0)
self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True)
api_key: str = getattr(config, "ENTERPRISE_OTLP_API_KEY", "")
# Auto-detect TLS: https:// uses secure, everything else is insecure
insecure = not endpoint.startswith("https://")
resource = Resource(
attributes={
ResourceAttributes.SERVICE_NAME: service_name,
@@ -207,7 +207,15 @@ class EnterpriseExporter:
if parent_span_id_source:
# Cross-workflow linking: parent is an explicit span (e.g. tool node in outer workflow)
parent_span_id = compute_deterministic_span_id(parent_span_id_source)
parent_trace_id = int(uuid.UUID(effective_trace_correlation)) if effective_trace_correlation else 0
try:
parent_trace_id = int(uuid.UUID(effective_trace_correlation)) if effective_trace_correlation else 0
except (ValueError, AttributeError):
logger.warning(
"Invalid trace correlation UUID for cross-workflow link: %s, span=%s",
effective_trace_correlation,
name,
)
parent_trace_id = 0
if parent_trace_id:
parent_span_context = SpanContext(
trace_id=parent_trace_id,
@@ -219,14 +227,23 @@ class EnterpriseExporter:
elif correlation_id and correlation_id != span_id_source:
# Child span: parent is the correlation-group root (workflow root span)
parent_span_id = compute_deterministic_span_id(correlation_id)
parent_trace_id = int(uuid.UUID(effective_trace_correlation or correlation_id))
parent_span_context = SpanContext(
trace_id=parent_trace_id,
span_id=parent_span_id,
is_remote=True,
trace_flags=TraceFlags(TraceFlags.SAMPLED),
)
parent_context = trace.set_span_in_context(trace.NonRecordingSpan(parent_span_context))
try:
parent_trace_id = int(uuid.UUID(effective_trace_correlation or correlation_id))
except (ValueError, AttributeError):
logger.warning(
"Invalid trace correlation UUID for child span link: %s, span=%s",
effective_trace_correlation or correlation_id,
name,
)
parent_trace_id = 0
if parent_trace_id:
parent_span_context = SpanContext(
trace_id=parent_trace_id,
span_id=parent_span_id,
is_remote=True,
trace_flags=TraceFlags(TraceFlags.SAMPLED),
)
parent_context = trace.set_span_in_context(trace.NonRecordingSpan(parent_span_context))
span_start_time = _datetime_to_ns(start_time) if start_time is not None else None
span_end_on_exit = end_time is None

View File

@@ -7,11 +7,13 @@ idempotency checking, and payload rehydration.
from __future__ import annotations
import json
import logging
from typing import Any
from enterprise.telemetry.contracts import TelemetryCase, TelemetryEnvelope
from extensions.ext_redis import redis_client
from extensions.ext_storage import storage
logger = logging.getLogger(__name__)
@@ -136,44 +138,46 @@ class EnterpriseMetricHandler:
return False
def _rehydrate(self, envelope: TelemetryEnvelope) -> dict[str, Any]:
"""Rehydrate payload from reference or fallback.
"""Rehydrate payload from storage reference or inline data.
Attempts to resolve payload_ref to full data. If that fails,
falls back to payload_fallback. If both fail, emits a degraded
event marker.
If the envelope payload is empty and metadata contains a
``payload_ref``, the full payload is loaded from object storage
(where the gateway wrote it as JSON). When both the inline
payload and storage resolution fail, a degraded-event marker
is emitted so the gap is observable.
Args:
envelope: The telemetry envelope containing payload data.
Returns:
The rehydrated payload dictionary.
The rehydrated payload dictionary, or ``{}`` on total failure.
"""
# For now, payload is directly in the envelope
# Future: implement payload_ref resolution from storage
payload = envelope.payload
if not payload and envelope.payload_fallback:
import pickle
try:
payload = pickle.loads(envelope.payload_fallback) # noqa: S301
logger.debug("Used payload_fallback for event_id=%s", envelope.event_id)
except Exception:
logger.warning(
"Failed to deserialize payload_fallback for event_id=%s",
envelope.event_id,
exc_info=True,
)
# Resolve from object storage when the gateway offloaded a large payload.
if not payload and envelope.metadata:
payload_ref = envelope.metadata.get("payload_ref")
if payload_ref:
try:
payload_bytes = storage.load(payload_ref)
payload = json.loads(payload_bytes.decode("utf-8"))
logger.debug("Loaded payload from storage: key=%s", payload_ref)
except Exception:
logger.warning(
"Failed to load payload from storage: key=%s, event_id=%s",
payload_ref,
envelope.event_id,
exc_info=True,
)
if not payload:
# Both ref and fallback failed - emit degraded event
# Storage resolution failed or no data available — emit degraded event.
logger.error(
"Payload rehydration failed for event_id=%s, tenant_id=%s, case=%s",
envelope.event_id,
envelope.tenant_id,
envelope.case,
)
# Emit degraded event marker
from enterprise.telemetry.entities import EnterpriseTelemetryEvent
from enterprise.telemetry.telemetry_log import emit_metric_only_event

View File

@@ -81,23 +81,19 @@ def init_app(app: DifyApp):
# Auto-detect TLS: https:// uses secure, everything else is insecure
endpoint = dify_config.OTLP_BASE_ENDPOINT
insecure = not endpoint.startswith("https://")
exporter = GRPCSpanExporter(
endpoint=endpoint,
# Header field names must consist of lowercase letters, check RFC7540
headers=(
(
("authorization", f"Bearer {dify_config.OTLP_API_KEY}"),
) if dify_config.OTLP_API_KEY else None
(("authorization", f"Bearer {dify_config.OTLP_API_KEY}"),) if dify_config.OTLP_API_KEY else None
),
insecure=insecure,
)
metric_exporter = GRPCMetricExporter(
endpoint=endpoint,
headers=(
(
("authorization", f"Bearer {dify_config.OTLP_API_KEY}"),
) if dify_config.OTLP_API_KEY else None
(("authorization", f"Bearer {dify_config.OTLP_API_KEY}"),) if dify_config.OTLP_API_KEY else None
),
insecure=insecure,
)

View File

@@ -87,26 +87,22 @@ class TestTelemetryEnvelope:
assert envelope.tenant_id == "tenant-123"
assert envelope.event_id == "event-456"
assert envelope.payload == {"key": "value"}
assert envelope.payload_fallback is None
assert envelope.metadata is None
def test_valid_envelope_full(self) -> None:
"""Verify valid envelope with all fields."""
metadata = {"source": "api"}
fallback = b"fallback data"
metadata = {"payload_ref": "telemetry/tenant-789/event-012.json"}
envelope = TelemetryEnvelope(
case=TelemetryCase.MESSAGE_RUN,
tenant_id="tenant-789",
event_id="event-012",
payload={"message": "hello"},
payload_fallback=fallback,
metadata=metadata,
)
assert envelope.case == TelemetryCase.MESSAGE_RUN
assert envelope.tenant_id == "tenant-789"
assert envelope.event_id == "event-012"
assert envelope.payload == {"message": "hello"}
assert envelope.payload_fallback == fallback
assert envelope.metadata == metadata
def test_missing_required_case(self) -> None:
@@ -145,41 +141,16 @@ class TestTelemetryEnvelope:
event_id="event-456",
)
def test_payload_fallback_within_limit(self) -> None:
"""Verify payload_fallback within 64KB limit is accepted."""
fallback = b"x" * 65536
def test_metadata_none(self) -> None:
"""Verify metadata can be None."""
envelope = TelemetryEnvelope(
case=TelemetryCase.WORKFLOW_RUN,
tenant_id="tenant-123",
event_id="event-456",
payload={"key": "value"},
payload_fallback=fallback,
metadata=None,
)
assert envelope.payload_fallback == fallback
def test_payload_fallback_exceeds_limit(self) -> None:
"""Verify payload_fallback exceeding 64KB is rejected."""
fallback = b"x" * 65537
with pytest.raises(ValidationError) as exc_info:
TelemetryEnvelope(
case=TelemetryCase.WORKFLOW_RUN,
tenant_id="tenant-123",
event_id="event-456",
payload={"key": "value"},
payload_fallback=fallback,
)
assert "64KB" in str(exc_info.value)
def test_payload_fallback_none(self) -> None:
"""Verify payload_fallback can be None."""
envelope = TelemetryEnvelope(
case=TelemetryCase.WORKFLOW_RUN,
tenant_id="tenant-123",
event_id="event-456",
payload={"key": "value"},
payload_fallback=None,
)
assert envelope.payload_fallback is None
assert envelope.metadata is None
class TestCaseRouting:

View File

@@ -260,4 +260,4 @@ def test_no_scheme_production_uses_insecure(mock_metric_exporter: MagicMock, moc
assert mock_span_exporter.call_args.kwargs["insecure"] is True
assert mock_metric_exporter.call_args is not None
assert mock_metric_exporter.call_args.kwargs["insecure"] is True
assert mock_metric_exporter.call_args.kwargs["insecure"] is True

View File

@@ -1,5 +1,6 @@
"""Unit tests for EnterpriseMetricHandler."""
import json
from unittest.mock import MagicMock, patch
import pytest
@@ -238,31 +239,60 @@ def test_rehydration_uses_payload(sample_envelope):
assert payload == {"app_id": "app-123", "name": "Test App"}
def test_rehydration_fallback():
import pickle
fallback_data = {"fallback": "data"}
def test_rehydration_from_storage():
"""Verify _rehydrate loads payload from object storage via payload_ref."""
stored_data = {"app_id": "app-stored", "mode": "workflow"}
envelope = TelemetryEnvelope(
case=TelemetryCase.APP_CREATED,
tenant_id="test-tenant",
event_id="test-event-fb",
payload={},
payload_fallback=pickle.dumps(fallback_data),
metadata={"payload_ref": "telemetry/test-tenant/test-event-fb.json"},
)
handler = EnterpriseMetricHandler()
payload = handler._rehydrate(envelope)
with patch("enterprise.telemetry.metric_handler.storage") as mock_storage:
mock_storage.load.return_value = json.dumps(stored_data).encode("utf-8")
payload = handler._rehydrate(envelope)
assert payload == fallback_data
assert payload == stored_data
mock_storage.load.assert_called_once_with("telemetry/test-tenant/test-event-fb.json")
def test_rehydration_emits_degraded_event_on_failure():
def test_rehydration_storage_failure_emits_degraded_event():
"""Verify _rehydrate emits degraded event when storage load fails."""
envelope = TelemetryEnvelope(
case=TelemetryCase.APP_CREATED,
tenant_id="test-tenant",
event_id="test-event-fail",
payload={},
payload_fallback=None,
metadata={"payload_ref": "telemetry/test-tenant/test-event-fail.json"},
)
handler = EnterpriseMetricHandler()
with (
patch("enterprise.telemetry.metric_handler.storage") as mock_storage,
patch("enterprise.telemetry.telemetry_log.emit_metric_only_event") as mock_emit,
):
mock_storage.load.side_effect = Exception("Storage unavailable")
payload = handler._rehydrate(envelope)
from enterprise.telemetry.entities import EnterpriseTelemetryEvent
assert payload == {}
mock_emit.assert_called_once()
call_args = mock_emit.call_args
assert call_args[1]["event_name"] == EnterpriseTelemetryEvent.REHYDRATION_FAILED
assert call_args[1]["attributes"]["rehydration_failed"] is True
def test_rehydration_emits_degraded_event_on_empty_payload():
"""Verify _rehydrate emits degraded event when payload is empty and no ref exists."""
envelope = TelemetryEnvelope(
case=TelemetryCase.APP_CREATED,
tenant_id="test-tenant",
event_id="test-event-empty",
payload={},
)
handler = EnterpriseMetricHandler()