feat: add process data

This commit is contained in:
Novice
2025-12-17 17:34:02 +08:00
parent cb99b8f04d
commit f54b9b12b0

View File

@@ -164,81 +164,6 @@ class LLMNode(Node[LLMNodeData]):
def version(cls) -> str:
return "1"
def _stream_llm_events(
self,
generator: Generator[NodeEventBase | LLMStructuredOutput, None, LLMGenerationData | None],
*,
model_instance: ModelInstance,
) -> Generator[
NodeEventBase,
None,
tuple[
str,
str,
LLMUsage,
str | None,
LLMStructuredOutput | None,
LLMGenerationData | None,
],
]:
"""
Stream events and capture generator return value in one place.
Uses generator delegation so _run stays concise while still emitting events.
"""
clean_text = ""
reasoning_content = ""
usage = LLMUsage.empty_usage()
finish_reason: str | None = None
structured_output: LLMStructuredOutput | None = None
generation_data: LLMGenerationData | None = None
completed = False
while True:
try:
event = next(generator)
except StopIteration as exc:
if isinstance(exc.value, LLMGenerationData):
generation_data = exc.value
break
if completed:
# After completion we still drain to reach StopIteration.value
continue
match event:
case StreamChunkEvent() | ThoughtChunkEvent():
yield event
case ModelInvokeCompletedEvent(
text=text,
usage=usage_event,
finish_reason=finish_reason_event,
reasoning_content=reasoning_event,
structured_output=structured_raw,
):
clean_text = text
usage = usage_event
finish_reason = finish_reason_event
reasoning_content = reasoning_event or ""
if self.node_data.reasoning_format != "tagged":
clean_text, _ = LLMNode._split_reasoning(clean_text, self.node_data.reasoning_format)
structured_output = (
LLMStructuredOutput(structured_output=structured_raw) if structured_raw else None
)
llm_utils.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage)
completed = True
case LLMStructuredOutput():
structured_output = event
case _:
continue
return clean_text, reasoning_content, usage, finish_reason, structured_output, generation_data
def _run(self) -> Generator:
node_inputs: dict[str, Any] = {}
process_data: dict[str, Any] = {}
@@ -262,15 +187,6 @@ class LLMNode(Node[LLMNodeData]):
# merge inputs
inputs.update(jinja_inputs)
# Add all inputs to node_inputs for logging
node_inputs.update(inputs)
# Add tools to inputs if configured
if self.tool_call_enabled:
node_inputs["tools"] = [
{"provider_id": tool.provider_name, "tool_name": tool.tool_name} for tool in self._node_data.tools
]
# fetch files
files = (
llm_utils.fetch_files(
@@ -372,6 +288,8 @@ class LLMNode(Node[LLMNodeData]):
(
clean_text,
reasoning_content,
generation_reasoning_content,
generation_clean_content,
usage,
finish_reason,
structured_output,
@@ -396,6 +314,16 @@ class LLMNode(Node[LLMNodeData]):
"model_provider": model_config.provider,
"model_name": model_config.model,
}
if self.tool_call_enabled and self._node_data.tools:
process_data["tools"] = [
{
"type": tool.type.value if hasattr(tool.type, "value") else tool.type,
"provider_name": tool.provider_name,
"tool_name": tool.tool_name,
}
for tool in self._node_data.tools
if tool.enabled
]
# Unified outputs building
outputs = {
@@ -411,17 +339,25 @@ class LLMNode(Node[LLMNodeData]):
generation = {
"content": generation_data.text,
"reasoning_content": generation_data.reasoning_contents, # [thought1, thought2, ...]
"tool_calls": generation_data.tool_calls,
"tool_calls": [self._serialize_tool_call(item) for item in generation_data.tool_calls],
"sequence": generation_data.sequence,
}
files_to_output = generation_data.files
else:
# Traditional LLM invocation
generation_reasoning = generation_reasoning_content or reasoning_content
generation_content = generation_clean_content or clean_text
sequence: list[dict[str, Any]] = []
if generation_reasoning:
sequence = [
{"type": "reasoning", "index": 0},
{"type": "content", "start": 0, "end": len(generation_content)},
]
generation = {
"content": clean_text,
"reasoning_content": [reasoning_content] if reasoning_content else [],
"content": generation_content,
"reasoning_content": [generation_reasoning] if generation_reasoning else [],
"tool_calls": [],
"sequence": [],
"sequence": sequence,
}
files_to_output = self._file_outputs
@@ -1460,6 +1396,104 @@ class LLMNode(Node[LLMNodeData]):
and all(tool.enabled for tool in self.node_data.tools)
)
def _stream_llm_events(
self,
generator: Generator[NodeEventBase | LLMStructuredOutput, None, LLMGenerationData | None],
*,
model_instance: ModelInstance,
) -> Generator[
NodeEventBase,
None,
tuple[
str,
str,
str,
str,
LLMUsage,
str | None,
LLMStructuredOutput | None,
LLMGenerationData | None,
],
]:
"""
Stream events and capture generator return value in one place.
Uses generator delegation so _run stays concise while still emitting events.
"""
clean_text = ""
reasoning_content = ""
generation_reasoning_content = ""
generation_clean_content = ""
usage = LLMUsage.empty_usage()
finish_reason: str | None = None
structured_output: LLMStructuredOutput | None = None
generation_data: LLMGenerationData | None = None
completed = False
while True:
try:
event = next(generator)
except StopIteration as exc:
if isinstance(exc.value, LLMGenerationData):
generation_data = exc.value
break
if completed:
# After completion we still drain to reach StopIteration.value
continue
match event:
case StreamChunkEvent() | ThoughtChunkEvent():
yield event
case ModelInvokeCompletedEvent(
text=text,
usage=usage_event,
finish_reason=finish_reason_event,
reasoning_content=reasoning_event,
structured_output=structured_raw,
):
clean_text = text
usage = usage_event
finish_reason = finish_reason_event
reasoning_content = reasoning_event or ""
generation_reasoning_content = reasoning_content
generation_clean_content = clean_text
if self.node_data.reasoning_format == "tagged":
# Keep tagged text for output; also extract reasoning for generation field
generation_clean_content, generation_reasoning_content = LLMNode._split_reasoning(
clean_text, reasoning_format="separated"
)
else:
clean_text, generation_reasoning_content = LLMNode._split_reasoning(
clean_text, self.node_data.reasoning_format
)
generation_clean_content = clean_text
structured_output = (
LLMStructuredOutput(structured_output=structured_raw) if structured_raw else None
)
llm_utils.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage)
completed = True
case LLMStructuredOutput():
structured_output = event
case _:
continue
return (
clean_text,
reasoning_content,
generation_reasoning_content,
generation_clean_content,
usage,
finish_reason,
structured_output,
generation_data,
)
def _invoke_llm_with_tools(
self,
model_instance: ModelInstance,
@@ -1594,6 +1628,29 @@ class LLMNode(Node[LLMNodeData]):
return files
@staticmethod
def _serialize_tool_call(tool_call: ToolCallResult) -> dict[str, Any]:
"""Convert ToolCallResult into JSON-friendly dict."""
def _file_to_ref(file: File) -> str | None:
# Align with streamed tool result events which carry file IDs
return file.id or file.related_id
files = []
for file in tool_call.files or []:
ref = _file_to_ref(file)
if ref:
files.append(ref)
return {
"id": tool_call.id,
"name": tool_call.name,
"arguments": tool_call.arguments,
"output": tool_call.output,
"files": files,
"status": tool_call.status.value if hasattr(tool_call.status, "value") else tool_call.status,
}
def _flush_thought_segment(self, buffers: StreamBuffers, trace_state: TraceState) -> None:
if not buffers.pending_thought:
return