Compare commits

...

2 Commits

Author SHA1 Message Date
-LAN-
aff9853156 Add type hint for empty iteration inputs 2025-12-17 13:10:47 +08:00
-LAN-
bd1f9bb735 fix: cannot run single step on iteration node 2025-12-17 13:10:46 +08:00
8 changed files with 269 additions and 29 deletions

View File

@@ -109,6 +109,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
workflow=self._workflow,
single_iteration_run=self.application_generate_entity.single_iteration_run,
single_loop_run=self.application_generate_entity.single_loop_run,
system_variables=system_inputs,
)
else:
inputs = self.application_generate_entity.inputs

View File

@@ -92,6 +92,22 @@ class PipelineRunner(WorkflowBasedAppRunner):
db.session.close()
files = self.application_generate_entity.files
system_inputs = SystemVariable(
files=files,
user_id=user_id,
app_id=app_config.app_id,
workflow_id=app_config.workflow_id,
workflow_execution_id=self.application_generate_entity.workflow_execution_id,
document_id=self.application_generate_entity.document_id,
original_document_id=self.application_generate_entity.original_document_id,
batch=self.application_generate_entity.batch,
dataset_id=self.application_generate_entity.dataset_id,
datasource_type=self.application_generate_entity.datasource_type,
datasource_info=self.application_generate_entity.datasource_info,
invoke_from=self.application_generate_entity.invoke_from.value,
)
# if only single iteration run is requested
if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
# Handle single iteration or single loop run
@@ -99,27 +115,12 @@ class PipelineRunner(WorkflowBasedAppRunner):
workflow=workflow,
single_iteration_run=self.application_generate_entity.single_iteration_run,
single_loop_run=self.application_generate_entity.single_loop_run,
system_variables=system_inputs,
)
else:
inputs = self.application_generate_entity.inputs
files = self.application_generate_entity.files
# Create a variable pool.
system_inputs = SystemVariable(
files=files,
user_id=user_id,
app_id=app_config.app_id,
workflow_id=app_config.workflow_id,
workflow_execution_id=self.application_generate_entity.workflow_execution_id,
document_id=self.application_generate_entity.document_id,
original_document_id=self.application_generate_entity.original_document_id,
batch=self.application_generate_entity.batch,
dataset_id=self.application_generate_entity.dataset_id,
datasource_type=self.application_generate_entity.datasource_type,
datasource_info=self.application_generate_entity.datasource_info,
invoke_from=self.application_generate_entity.invoke_from.value,
)
rag_pipeline_variables = []
if workflow.rag_pipeline_variables:
for v in workflow.rag_pipeline_variables:

View File

@@ -80,6 +80,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
workflow=self._workflow,
single_iteration_run=self.application_generate_entity.single_iteration_run,
single_loop_run=self.application_generate_entity.single_loop_run,
system_variables=system_inputs,
)
else:
inputs = self.application_generate_entity.inputs

View File

@@ -130,6 +130,8 @@ class WorkflowBasedAppRunner:
workflow: Workflow,
single_iteration_run: Any | None = None,
single_loop_run: Any | None = None,
*,
system_variables: SystemVariable | None = None,
) -> tuple[Graph, VariablePool, GraphRuntimeState]:
"""
Prepare graph, variable pool, and runtime state for single node execution
@@ -147,9 +149,10 @@ class WorkflowBasedAppRunner:
ValueError: If neither single_iteration_run nor single_loop_run is specified
"""
# Create initial runtime state with variable pool containing environment variables
system_variables = system_variables or SystemVariable.empty()
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(
system_variables=SystemVariable.empty(),
system_variables=system_variables,
user_inputs={},
environment_variables=workflow.environment_variables,
),
@@ -220,7 +223,7 @@ class WorkflowBasedAppRunner:
# filter nodes only in the specified node type (iteration or loop)
main_node_config = next((n for n in graph_config.get("nodes", []) if n.get("id") == node_id), None)
start_node_id = main_node_config.get("data", {}).get("start_node_id") if main_node_config else None
node_configs = [
base_node_configs = [
node
for node in graph_config.get("nodes", [])
if node.get("id") == node_id
@@ -228,26 +231,74 @@ class WorkflowBasedAppRunner:
or (start_node_id and node.get("id") == start_node_id)
]
graph_config["nodes"] = node_configs
# Build a base graph config (without synthetic entry) to keep node-level context minimal.
base_graph_config = graph_config.copy()
base_graph_config["nodes"] = base_node_configs
node_ids = [node.get("id") for node in node_configs]
node_ids = [node.get("id") for node in base_node_configs if isinstance(node.get("id"), str)]
# filter edges only in the specified node type
edge_configs = [
base_edge_configs = [
edge
for edge in graph_config.get("edges", [])
if (edge.get("source") is None or edge.get("source") in node_ids)
and (edge.get("target") is None or edge.get("target") in node_ids)
]
graph_config["edges"] = edge_configs
base_graph_config["edges"] = base_edge_configs
# Inject a synthetic start node so Graph validation accepts the single-node graph
# (loop/iteration nodes are containers and cannot serve as graph roots).
synthetic_start_node_id = f"{node_id}_single_step_start"
synthetic_start_node = {
"id": synthetic_start_node_id,
"type": "custom",
"data": {
"type": NodeType.START,
"title": "Start",
"desc": "Synthetic start for single-step run",
"version": "1",
"variables": [],
},
}
synthetic_end_node_id = f"{node_id}_single_step_end"
synthetic_end_node = {
"id": synthetic_end_node_id,
"type": "custom",
"data": {
"type": NodeType.END,
"title": "End",
"desc": "Synthetic end for single-step run",
"version": "1",
"outputs": [],
},
}
graph_config_with_entry = base_graph_config.copy()
graph_config_with_entry["nodes"] = [*base_node_configs, synthetic_start_node, synthetic_end_node]
graph_config_with_entry["edges"] = [
*base_edge_configs,
{
"source": synthetic_start_node_id,
"target": node_id,
"sourceHandle": "source",
"targetHandle": "target",
},
{
"source": node_id,
"target": synthetic_end_node_id,
"sourceHandle": "source",
"targetHandle": "target",
},
]
# Create required parameters for Graph.init
graph_init_params = GraphInitParams(
tenant_id=workflow.tenant_id,
app_id=self._app_id,
workflow_id=workflow.id,
graph_config=graph_config,
graph_config=base_graph_config,
user_id="",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.SERVICE_API,
@@ -260,14 +311,16 @@ class WorkflowBasedAppRunner:
)
# init graph
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=node_id)
graph = Graph.init(
graph_config=graph_config_with_entry, node_factory=node_factory, root_node_id=synthetic_start_node_id
)
if not graph:
raise ValueError("graph not found in workflow")
# fetch node config from node id
target_node_config = None
for node in node_configs:
for node in base_node_configs:
if node.get("id") == node_id:
target_node_config = node
break

View File

@@ -149,18 +149,49 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
return isinstance(variable, NoneSegment) or len(variable.value) == 0
def _handle_empty_iteration(self, variable: ArraySegment | NoneSegment) -> Generator[NodeEventBase, None, None]:
started_at = naive_utc_now()
inputs: dict[str, object] = {"iterator_selector": []}
usage = LLMUsage.empty_usage()
yield IterationStartedEvent(
start_at=started_at,
inputs=inputs,
metadata={"iteration_length": 0},
)
# Try our best to preserve the type information.
if isinstance(variable, ArraySegment):
output = variable.model_copy(update={"value": []})
else:
output = ArrayAnySegment(value=[])
yield IterationSucceededEvent(
start_at=started_at,
inputs=inputs,
outputs={"output": []},
steps=0,
metadata={
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,
WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency,
WorkflowNodeExecutionMetadataKey.ITERATION_DURATION_MAP: {},
},
)
yield StreamCompletedEvent(
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
# TODO(QuantumGhost): is it possible to compute the type of `output`
# from graph definition?
outputs={"output": output},
inputs=inputs,
metadata={
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,
WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency,
WorkflowNodeExecutionMetadataKey.ITERATION_DURATION_MAP: {},
},
llm_usage=usage,
)
)

View File

@@ -0,0 +1,94 @@
from types import SimpleNamespace
from unittest.mock import MagicMock
from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner
from core.workflow.enums import NodeType, SystemVariableKey
from core.workflow.system_variable import SystemVariable
def test_prepare_single_iteration_injects_system_variables_and_fake_workflow():
node_id = "iteration_node"
execution_id = "workflow-exec-123"
workflow = SimpleNamespace(
id="workflow-id",
tenant_id="tenant-id",
app_id="app-id",
environment_variables=[],
graph_dict={
"nodes": [
{
"id": node_id,
"type": "custom",
"data": {
"type": NodeType.ITERATION,
"title": "Iteration",
"version": "1",
"iterator_selector": ["start", "items"],
"output_selector": [node_id, "output"],
},
}
],
"edges": [],
},
)
runner = WorkflowBasedAppRunner(queue_manager=MagicMock(), app_id="app-id")
system_inputs = SystemVariable(app_id="app-id", workflow_id="workflow-id", workflow_execution_id=execution_id)
graph, _, runtime_state = runner._prepare_single_node_execution(
workflow=workflow,
single_iteration_run=SimpleNamespace(node_id=node_id, inputs={"input_selector": [1, 2, 3]}),
system_variables=system_inputs,
)
assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id
assert runtime_state.variable_pool.get_by_prefix("sys")[SystemVariableKey.WORKFLOW_EXECUTION_ID] == execution_id
assert graph.root_node.id == f"{node_id}_single_step_start"
assert f"{node_id}_single_step_end" in graph.nodes
def test_prepare_single_loop_injects_system_variables_and_fake_workflow():
node_id = "loop_node"
execution_id = "workflow-exec-456"
workflow = SimpleNamespace(
id="workflow-id",
tenant_id="tenant-id",
app_id="app-id",
environment_variables=[],
graph_dict={
"nodes": [
{
"id": node_id,
"type": "custom",
"data": {
"type": NodeType.LOOP,
"title": "Loop",
"version": "1",
"loop_count": 1,
"break_conditions": [],
"logical_operator": "and",
"loop_variables": [],
"outputs": {},
},
}
],
"edges": [],
},
)
runner = WorkflowBasedAppRunner(queue_manager=MagicMock(), app_id="app-id")
system_inputs = SystemVariable(app_id="app-id", workflow_id="workflow-id", workflow_execution_id=execution_id)
graph, _, runtime_state = runner._prepare_single_node_execution(
workflow=workflow,
single_loop_run=SimpleNamespace(node_id=node_id, inputs={}),
system_variables=system_inputs,
)
assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id
assert graph.root_node.id == f"{node_id}_single_step_start"
assert f"{node_id}_single_step_end" in graph.nodes

View File

@@ -0,0 +1,51 @@
from core.workflow.entities import GraphInitParams
from core.workflow.graph_events import (
NodeRunIterationStartedEvent,
NodeRunIterationSucceededEvent,
NodeRunSucceededEvent,
)
from core.workflow.nodes.iteration.iteration_node import IterationNode
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.workflow.system_variable import SystemVariable
def test_iteration_node_emits_iteration_events_when_iterator_empty():
init_params = GraphInitParams(
tenant_id="tenant",
app_id="app",
workflow_id="workflow",
graph_config={},
user_id="user",
user_from="account",
invoke_from="debugger",
call_depth=0,
)
runtime_state = GraphRuntimeState(
variable_pool=VariablePool(system_variables=SystemVariable.empty(), user_inputs={}),
start_at=0.0,
)
runtime_state.variable_pool.add(("start", "items"), [])
node = IterationNode(
id="iteration-node",
config={
"id": "iteration-node",
"data": {
"title": "Iteration",
"iterator_selector": ["start", "items"],
"output_selector": ["iteration-node", "output"],
},
},
graph_init_params=init_params,
graph_runtime_state=runtime_state,
)
events = list(node.run())
assert any(isinstance(event, NodeRunIterationStartedEvent) for event in events)
iteration_succeeded_event = next(event for event in events if isinstance(event, NodeRunIterationSucceededEvent))
assert iteration_succeeded_event.steps == 0
assert iteration_succeeded_event.outputs == {"output": []}
assert any(isinstance(event, NodeRunSucceededEvent) for event in events)

View File

@@ -792,8 +792,12 @@ const useOneStepRun = <T>({
},
})
const { data: iterationData } = params
_runResult.created_by = iterationData.created_by.name
setRunResult(_runResult)
const nextRunResult = {
..._runResult,
created_by: (iterationData.created_by as any)?.name || '',
}
_runResult = nextRunResult
setRunResult(nextRunResult)
},
onIterationStart: (params) => {
const newIterationRunResult = produce(_iterationResult, (draft) => {
@@ -895,8 +899,12 @@ const useOneStepRun = <T>({
},
})
const { data: loopData } = params
_runResult.created_by = loopData.created_by.name
setRunResult(_runResult)
const nextRunResult = {
..._runResult,
created_by: (loopData.created_by as any)?.name || '',
}
_runResult = nextRunResult
setRunResult(nextRunResult)
},
onLoopStart: (params) => {
const newLoopRunResult = produce(_loopResult, (draft) => {