mirror of
https://github.com/langgenius/dify.git
synced 2026-03-06 15:45:14 +00:00
fix(telemetry): gate ObservabilityLayer content attrs behind ENTERPRISE_INCLUDE_CONTENT
Add should_include_content() helper to extensions/otel/parser/base.py that returns True in CE (no behaviour change) and respects ENTERPRISE_INCLUDE_CONTENT in EE. Gate all content-bearing span attributes in LLM, retrieval, tool, and default node parsers so that gen_ai.completion, gen_ai.prompt, retrieval.document, tool call arguments/results, and node input/output values are suppressed when ENTERPRISE_ENABLED=True and ENTERPRISE_INCLUDE_CONTENT=False.
This commit is contained in:
@@ -5,7 +5,7 @@ This module provides parsers that extract node-specific metadata and set
|
||||
OpenTelemetry span attributes according to semantic conventions.
|
||||
"""
|
||||
|
||||
from extensions.otel.parser.base import DefaultNodeOTelParser, NodeOTelParser, safe_json_dumps
|
||||
from extensions.otel.parser.base import DefaultNodeOTelParser, NodeOTelParser, safe_json_dumps, should_include_content
|
||||
from extensions.otel.parser.llm import LLMNodeOTelParser
|
||||
from extensions.otel.parser.retrieval import RetrievalNodeOTelParser
|
||||
from extensions.otel.parser.tool import ToolNodeOTelParser
|
||||
@@ -17,4 +17,5 @@ __all__ = [
|
||||
"RetrievalNodeOTelParser",
|
||||
"ToolNodeOTelParser",
|
||||
"safe_json_dumps",
|
||||
"should_include_content",
|
||||
]
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
"""
|
||||
Base parser interface and utilities for OpenTelemetry node parsers.
|
||||
|
||||
Content gating: ``should_include_content()`` controls whether content-bearing
|
||||
span attributes (inputs, outputs, prompts, completions, documents) are written.
|
||||
Gate is only active in EE (``ENTERPRISE_ENABLED=True``) when
|
||||
``ENTERPRISE_INCLUDE_CONTENT=False``; CE behaviour is unchanged.
|
||||
"""
|
||||
|
||||
import json
|
||||
@@ -15,8 +20,20 @@ from core.workflow.enums import NodeType
|
||||
from core.workflow.graph_events import GraphNodeEventBase
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from extensions.otel.semconv.gen_ai import ChainAttributes, GenAIAttributes
|
||||
from configs import dify_config
|
||||
|
||||
|
||||
|
||||
def should_include_content() -> bool:
|
||||
"""Return True if content should be written to spans.
|
||||
|
||||
CE (ENTERPRISE_ENABLED=False): always True — no behaviour change.
|
||||
EE: follows ENTERPRISE_INCLUDE_CONTENT (default True).
|
||||
"""
|
||||
if not dify_config.ENTERPRISE_ENABLED:
|
||||
return True
|
||||
return dify_config.ENTERPRISE_INCLUDE_CONTENT
|
||||
|
||||
def safe_json_dumps(obj: Any, ensure_ascii: bool = False) -> str:
|
||||
"""
|
||||
Safely serialize objects to JSON, handling non-serializable types.
|
||||
@@ -105,10 +122,11 @@ class DefaultNodeOTelParser:
|
||||
# Extract inputs and outputs from result_event
|
||||
if result_event and result_event.node_run_result:
|
||||
node_run_result = result_event.node_run_result
|
||||
if node_run_result.inputs:
|
||||
span.set_attribute(ChainAttributes.INPUT_VALUE, safe_json_dumps(node_run_result.inputs))
|
||||
if node_run_result.outputs:
|
||||
span.set_attribute(ChainAttributes.OUTPUT_VALUE, safe_json_dumps(node_run_result.outputs))
|
||||
if should_include_content():
|
||||
if node_run_result.inputs:
|
||||
span.set_attribute(ChainAttributes.INPUT_VALUE, safe_json_dumps(node_run_result.inputs))
|
||||
if node_run_result.outputs:
|
||||
span.set_attribute(ChainAttributes.OUTPUT_VALUE, safe_json_dumps(node_run_result.outputs))
|
||||
|
||||
if error:
|
||||
span.record_exception(error)
|
||||
|
||||
@@ -10,7 +10,7 @@ from opentelemetry.trace import Span
|
||||
|
||||
from core.workflow.graph_events import GraphNodeEventBase
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from extensions.otel.parser.base import DefaultNodeOTelParser, safe_json_dumps
|
||||
from extensions.otel.parser.base import DefaultNodeOTelParser, safe_json_dumps, should_include_content
|
||||
from extensions.otel.semconv.gen_ai import LLMAttributes
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -132,24 +132,19 @@ class LLMNodeOTelParser:
|
||||
span.set_attribute(LLMAttributes.USAGE_OUTPUT_TOKENS, completion_tokens)
|
||||
span.set_attribute(LLMAttributes.USAGE_TOTAL_TOKENS, total_tokens)
|
||||
|
||||
# Prompts and completion
|
||||
prompts = process_data.get("prompts", [])
|
||||
if prompts:
|
||||
prompts_json = safe_json_dumps(prompts)
|
||||
span.set_attribute(LLMAttributes.PROMPT, prompts_json)
|
||||
# Prompts and completion — gated by content policy
|
||||
if should_include_content():
|
||||
prompts = process_data.get("prompts", [])
|
||||
if prompts:
|
||||
prompts_json = safe_json_dumps(prompts)
|
||||
span.set_attribute(LLMAttributes.PROMPT, prompts_json)
|
||||
|
||||
text_output = str(outputs.get("text", ""))
|
||||
if text_output:
|
||||
span.set_attribute(LLMAttributes.COMPLETION, text_output)
|
||||
text_output = str(outputs.get("text", ""))
|
||||
if text_output:
|
||||
span.set_attribute(LLMAttributes.COMPLETION, text_output)
|
||||
|
||||
# Finish reason
|
||||
finish_reason = outputs.get("finish_reason") or ""
|
||||
if finish_reason:
|
||||
span.set_attribute(LLMAttributes.RESPONSE_FINISH_REASON, finish_reason)
|
||||
|
||||
# Structured input/output messages
|
||||
gen_ai_input_message = _format_input_messages(process_data)
|
||||
gen_ai_output_message = _format_output_messages(outputs)
|
||||
|
||||
span.set_attribute(LLMAttributes.INPUT_MESSAGE, gen_ai_input_message)
|
||||
span.set_attribute(LLMAttributes.OUTPUT_MESSAGE, gen_ai_output_message)
|
||||
# Structured input/output messages
|
||||
gen_ai_input_message = _format_input_messages(process_data)
|
||||
gen_ai_output_message = _format_output_messages(outputs)
|
||||
span.set_attribute(LLMAttributes.INPUT_MESSAGE, gen_ai_input_message)
|
||||
span.set_attribute(LLMAttributes.OUTPUT_MESSAGE, gen_ai_output_message)
|
||||
|
||||
@@ -11,7 +11,7 @@ from opentelemetry.trace import Span
|
||||
from core.variables import Segment
|
||||
from core.workflow.graph_events import GraphNodeEventBase
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from extensions.otel.parser.base import DefaultNodeOTelParser, safe_json_dumps
|
||||
from extensions.otel.parser.base import DefaultNodeOTelParser, safe_json_dumps, should_include_content
|
||||
from extensions.otel.semconv.gen_ai import RetrieverAttributes
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -83,23 +83,21 @@ class RetrievalNodeOTelParser:
|
||||
inputs = node_run_result.inputs or {}
|
||||
outputs = node_run_result.outputs or {}
|
||||
|
||||
# Extract query from inputs
|
||||
query = str(inputs.get("query", "")) if inputs else ""
|
||||
if query:
|
||||
span.set_attribute(RetrieverAttributes.QUERY, query)
|
||||
# Query and documents — gated by content policy
|
||||
if should_include_content():
|
||||
query = str(inputs.get("query", "")) if inputs else ""
|
||||
if query:
|
||||
span.set_attribute(RetrieverAttributes.QUERY, query)
|
||||
|
||||
# Extract and format retrieval documents from outputs
|
||||
result_value = outputs.get("result") if outputs else None
|
||||
retrieval_documents: list[Any] = []
|
||||
if result_value:
|
||||
value_to_check = result_value
|
||||
if isinstance(result_value, Segment):
|
||||
value_to_check = result_value.value
|
||||
|
||||
if isinstance(value_to_check, (list, Sequence)):
|
||||
retrieval_documents = list(value_to_check)
|
||||
|
||||
if retrieval_documents:
|
||||
semantic_retrieval_documents = _format_retrieval_documents(retrieval_documents)
|
||||
semantic_retrieval_documents_json = safe_json_dumps(semantic_retrieval_documents)
|
||||
span.set_attribute(RetrieverAttributes.DOCUMENT, semantic_retrieval_documents_json)
|
||||
result_value = outputs.get("result") if outputs else None
|
||||
retrieval_documents: list[Any] = []
|
||||
if result_value:
|
||||
value_to_check = result_value
|
||||
if isinstance(result_value, Segment):
|
||||
value_to_check = result_value.value
|
||||
if isinstance(value_to_check, (list, Sequence)):
|
||||
retrieval_documents = list(value_to_check)
|
||||
if retrieval_documents:
|
||||
semantic_retrieval_documents = _format_retrieval_documents(retrieval_documents)
|
||||
semantic_retrieval_documents_json = safe_json_dumps(semantic_retrieval_documents)
|
||||
span.set_attribute(RetrieverAttributes.DOCUMENT, semantic_retrieval_documents_json)
|
||||
|
||||
@@ -8,7 +8,7 @@ from core.workflow.enums import WorkflowNodeExecutionMetadataKey
|
||||
from core.workflow.graph_events import GraphNodeEventBase
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.tool.entities import ToolNodeData
|
||||
from extensions.otel.parser.base import DefaultNodeOTelParser, safe_json_dumps
|
||||
from extensions.otel.parser.base import DefaultNodeOTelParser, safe_json_dumps, should_include_content
|
||||
from extensions.otel.semconv.gen_ai import ToolAttributes
|
||||
|
||||
|
||||
@@ -40,8 +40,10 @@ class ToolNodeOTelParser:
|
||||
if tool_info:
|
||||
span.set_attribute(ToolAttributes.TOOL_DESCRIPTION, safe_json_dumps(tool_info))
|
||||
|
||||
if result_event and result_event.node_run_result and result_event.node_run_result.inputs:
|
||||
span.set_attribute(ToolAttributes.TOOL_CALL_ARGUMENTS, safe_json_dumps(result_event.node_run_result.inputs))
|
||||
# Tool call arguments and result — gated by content policy
|
||||
if should_include_content():
|
||||
if result_event and result_event.node_run_result and result_event.node_run_result.inputs:
|
||||
span.set_attribute(ToolAttributes.TOOL_CALL_ARGUMENTS, safe_json_dumps(result_event.node_run_result.inputs))
|
||||
|
||||
if result_event and result_event.node_run_result and result_event.node_run_result.outputs:
|
||||
span.set_attribute(ToolAttributes.TOOL_CALL_RESULT, safe_json_dumps(result_event.node_run_result.outputs))
|
||||
if result_event and result_event.node_run_result and result_event.node_run_result.outputs:
|
||||
span.set_attribute(ToolAttributes.TOOL_CALL_RESULT, safe_json_dumps(result_event.node_run_result.outputs))
|
||||
|
||||
Reference in New Issue
Block a user