mirror of
https://github.com/langgenius/dify.git
synced 2025-12-20 22:52:26 +00:00
Compare commits
2 Commits
refactor-c
...
fix/loop-i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aff9853156 | ||
|
|
bd1f9bb735 |
@@ -109,6 +109,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
workflow=self._workflow,
|
||||
single_iteration_run=self.application_generate_entity.single_iteration_run,
|
||||
single_loop_run=self.application_generate_entity.single_loop_run,
|
||||
system_variables=system_inputs,
|
||||
)
|
||||
else:
|
||||
inputs = self.application_generate_entity.inputs
|
||||
|
||||
@@ -92,6 +92,22 @@ class PipelineRunner(WorkflowBasedAppRunner):
|
||||
|
||||
db.session.close()
|
||||
|
||||
files = self.application_generate_entity.files
|
||||
system_inputs = SystemVariable(
|
||||
files=files,
|
||||
user_id=user_id,
|
||||
app_id=app_config.app_id,
|
||||
workflow_id=app_config.workflow_id,
|
||||
workflow_execution_id=self.application_generate_entity.workflow_execution_id,
|
||||
document_id=self.application_generate_entity.document_id,
|
||||
original_document_id=self.application_generate_entity.original_document_id,
|
||||
batch=self.application_generate_entity.batch,
|
||||
dataset_id=self.application_generate_entity.dataset_id,
|
||||
datasource_type=self.application_generate_entity.datasource_type,
|
||||
datasource_info=self.application_generate_entity.datasource_info,
|
||||
invoke_from=self.application_generate_entity.invoke_from.value,
|
||||
)
|
||||
|
||||
# if only single iteration run is requested
|
||||
if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
|
||||
# Handle single iteration or single loop run
|
||||
@@ -99,27 +115,12 @@ class PipelineRunner(WorkflowBasedAppRunner):
|
||||
workflow=workflow,
|
||||
single_iteration_run=self.application_generate_entity.single_iteration_run,
|
||||
single_loop_run=self.application_generate_entity.single_loop_run,
|
||||
system_variables=system_inputs,
|
||||
)
|
||||
else:
|
||||
inputs = self.application_generate_entity.inputs
|
||||
files = self.application_generate_entity.files
|
||||
|
||||
# Create a variable pool.
|
||||
system_inputs = SystemVariable(
|
||||
files=files,
|
||||
user_id=user_id,
|
||||
app_id=app_config.app_id,
|
||||
workflow_id=app_config.workflow_id,
|
||||
workflow_execution_id=self.application_generate_entity.workflow_execution_id,
|
||||
document_id=self.application_generate_entity.document_id,
|
||||
original_document_id=self.application_generate_entity.original_document_id,
|
||||
batch=self.application_generate_entity.batch,
|
||||
dataset_id=self.application_generate_entity.dataset_id,
|
||||
datasource_type=self.application_generate_entity.datasource_type,
|
||||
datasource_info=self.application_generate_entity.datasource_info,
|
||||
invoke_from=self.application_generate_entity.invoke_from.value,
|
||||
)
|
||||
|
||||
rag_pipeline_variables = []
|
||||
if workflow.rag_pipeline_variables:
|
||||
for v in workflow.rag_pipeline_variables:
|
||||
|
||||
@@ -80,6 +80,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
|
||||
workflow=self._workflow,
|
||||
single_iteration_run=self.application_generate_entity.single_iteration_run,
|
||||
single_loop_run=self.application_generate_entity.single_loop_run,
|
||||
system_variables=system_inputs,
|
||||
)
|
||||
else:
|
||||
inputs = self.application_generate_entity.inputs
|
||||
|
||||
@@ -130,6 +130,8 @@ class WorkflowBasedAppRunner:
|
||||
workflow: Workflow,
|
||||
single_iteration_run: Any | None = None,
|
||||
single_loop_run: Any | None = None,
|
||||
*,
|
||||
system_variables: SystemVariable | None = None,
|
||||
) -> tuple[Graph, VariablePool, GraphRuntimeState]:
|
||||
"""
|
||||
Prepare graph, variable pool, and runtime state for single node execution
|
||||
@@ -147,9 +149,10 @@ class WorkflowBasedAppRunner:
|
||||
ValueError: If neither single_iteration_run nor single_loop_run is specified
|
||||
"""
|
||||
# Create initial runtime state with variable pool containing environment variables
|
||||
system_variables = system_variables or SystemVariable.empty()
|
||||
graph_runtime_state = GraphRuntimeState(
|
||||
variable_pool=VariablePool(
|
||||
system_variables=SystemVariable.empty(),
|
||||
system_variables=system_variables,
|
||||
user_inputs={},
|
||||
environment_variables=workflow.environment_variables,
|
||||
),
|
||||
@@ -220,7 +223,7 @@ class WorkflowBasedAppRunner:
|
||||
# filter nodes only in the specified node type (iteration or loop)
|
||||
main_node_config = next((n for n in graph_config.get("nodes", []) if n.get("id") == node_id), None)
|
||||
start_node_id = main_node_config.get("data", {}).get("start_node_id") if main_node_config else None
|
||||
node_configs = [
|
||||
base_node_configs = [
|
||||
node
|
||||
for node in graph_config.get("nodes", [])
|
||||
if node.get("id") == node_id
|
||||
@@ -228,26 +231,74 @@ class WorkflowBasedAppRunner:
|
||||
or (start_node_id and node.get("id") == start_node_id)
|
||||
]
|
||||
|
||||
graph_config["nodes"] = node_configs
|
||||
# Build a base graph config (without synthetic entry) to keep node-level context minimal.
|
||||
base_graph_config = graph_config.copy()
|
||||
base_graph_config["nodes"] = base_node_configs
|
||||
|
||||
node_ids = [node.get("id") for node in node_configs]
|
||||
node_ids = [node.get("id") for node in base_node_configs if isinstance(node.get("id"), str)]
|
||||
|
||||
# filter edges only in the specified node type
|
||||
edge_configs = [
|
||||
base_edge_configs = [
|
||||
edge
|
||||
for edge in graph_config.get("edges", [])
|
||||
if (edge.get("source") is None or edge.get("source") in node_ids)
|
||||
and (edge.get("target") is None or edge.get("target") in node_ids)
|
||||
]
|
||||
|
||||
graph_config["edges"] = edge_configs
|
||||
base_graph_config["edges"] = base_edge_configs
|
||||
|
||||
# Inject a synthetic start node so Graph validation accepts the single-node graph
|
||||
# (loop/iteration nodes are containers and cannot serve as graph roots).
|
||||
synthetic_start_node_id = f"{node_id}_single_step_start"
|
||||
synthetic_start_node = {
|
||||
"id": synthetic_start_node_id,
|
||||
"type": "custom",
|
||||
"data": {
|
||||
"type": NodeType.START,
|
||||
"title": "Start",
|
||||
"desc": "Synthetic start for single-step run",
|
||||
"version": "1",
|
||||
"variables": [],
|
||||
},
|
||||
}
|
||||
|
||||
synthetic_end_node_id = f"{node_id}_single_step_end"
|
||||
synthetic_end_node = {
|
||||
"id": synthetic_end_node_id,
|
||||
"type": "custom",
|
||||
"data": {
|
||||
"type": NodeType.END,
|
||||
"title": "End",
|
||||
"desc": "Synthetic end for single-step run",
|
||||
"version": "1",
|
||||
"outputs": [],
|
||||
},
|
||||
}
|
||||
|
||||
graph_config_with_entry = base_graph_config.copy()
|
||||
graph_config_with_entry["nodes"] = [*base_node_configs, synthetic_start_node, synthetic_end_node]
|
||||
graph_config_with_entry["edges"] = [
|
||||
*base_edge_configs,
|
||||
{
|
||||
"source": synthetic_start_node_id,
|
||||
"target": node_id,
|
||||
"sourceHandle": "source",
|
||||
"targetHandle": "target",
|
||||
},
|
||||
{
|
||||
"source": node_id,
|
||||
"target": synthetic_end_node_id,
|
||||
"sourceHandle": "source",
|
||||
"targetHandle": "target",
|
||||
},
|
||||
]
|
||||
|
||||
# Create required parameters for Graph.init
|
||||
graph_init_params = GraphInitParams(
|
||||
tenant_id=workflow.tenant_id,
|
||||
app_id=self._app_id,
|
||||
workflow_id=workflow.id,
|
||||
graph_config=graph_config,
|
||||
graph_config=base_graph_config,
|
||||
user_id="",
|
||||
user_from=UserFrom.ACCOUNT,
|
||||
invoke_from=InvokeFrom.SERVICE_API,
|
||||
@@ -260,14 +311,16 @@ class WorkflowBasedAppRunner:
|
||||
)
|
||||
|
||||
# init graph
|
||||
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=node_id)
|
||||
graph = Graph.init(
|
||||
graph_config=graph_config_with_entry, node_factory=node_factory, root_node_id=synthetic_start_node_id
|
||||
)
|
||||
|
||||
if not graph:
|
||||
raise ValueError("graph not found in workflow")
|
||||
|
||||
# fetch node config from node id
|
||||
target_node_config = None
|
||||
for node in node_configs:
|
||||
for node in base_node_configs:
|
||||
if node.get("id") == node_id:
|
||||
target_node_config = node
|
||||
break
|
||||
|
||||
@@ -149,18 +149,49 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
|
||||
return isinstance(variable, NoneSegment) or len(variable.value) == 0
|
||||
|
||||
def _handle_empty_iteration(self, variable: ArraySegment | NoneSegment) -> Generator[NodeEventBase, None, None]:
|
||||
started_at = naive_utc_now()
|
||||
inputs: dict[str, object] = {"iterator_selector": []}
|
||||
usage = LLMUsage.empty_usage()
|
||||
|
||||
yield IterationStartedEvent(
|
||||
start_at=started_at,
|
||||
inputs=inputs,
|
||||
metadata={"iteration_length": 0},
|
||||
)
|
||||
|
||||
# Try our best to preserve the type information.
|
||||
if isinstance(variable, ArraySegment):
|
||||
output = variable.model_copy(update={"value": []})
|
||||
else:
|
||||
output = ArrayAnySegment(value=[])
|
||||
|
||||
yield IterationSucceededEvent(
|
||||
start_at=started_at,
|
||||
inputs=inputs,
|
||||
outputs={"output": []},
|
||||
steps=0,
|
||||
metadata={
|
||||
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
|
||||
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,
|
||||
WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency,
|
||||
WorkflowNodeExecutionMetadataKey.ITERATION_DURATION_MAP: {},
|
||||
},
|
||||
)
|
||||
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
# TODO(QuantumGhost): is it possible to compute the type of `output`
|
||||
# from graph definition?
|
||||
outputs={"output": output},
|
||||
inputs=inputs,
|
||||
metadata={
|
||||
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
|
||||
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,
|
||||
WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency,
|
||||
WorkflowNodeExecutionMetadataKey.ITERATION_DURATION_MAP: {},
|
||||
},
|
||||
llm_usage=usage,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner
|
||||
from core.workflow.enums import NodeType, SystemVariableKey
|
||||
from core.workflow.system_variable import SystemVariable
|
||||
|
||||
|
||||
def test_prepare_single_iteration_injects_system_variables_and_fake_workflow():
|
||||
node_id = "iteration_node"
|
||||
execution_id = "workflow-exec-123"
|
||||
|
||||
workflow = SimpleNamespace(
|
||||
id="workflow-id",
|
||||
tenant_id="tenant-id",
|
||||
app_id="app-id",
|
||||
environment_variables=[],
|
||||
graph_dict={
|
||||
"nodes": [
|
||||
{
|
||||
"id": node_id,
|
||||
"type": "custom",
|
||||
"data": {
|
||||
"type": NodeType.ITERATION,
|
||||
"title": "Iteration",
|
||||
"version": "1",
|
||||
"iterator_selector": ["start", "items"],
|
||||
"output_selector": [node_id, "output"],
|
||||
},
|
||||
}
|
||||
],
|
||||
"edges": [],
|
||||
},
|
||||
)
|
||||
|
||||
runner = WorkflowBasedAppRunner(queue_manager=MagicMock(), app_id="app-id")
|
||||
|
||||
system_inputs = SystemVariable(app_id="app-id", workflow_id="workflow-id", workflow_execution_id=execution_id)
|
||||
|
||||
graph, _, runtime_state = runner._prepare_single_node_execution(
|
||||
workflow=workflow,
|
||||
single_iteration_run=SimpleNamespace(node_id=node_id, inputs={"input_selector": [1, 2, 3]}),
|
||||
system_variables=system_inputs,
|
||||
)
|
||||
|
||||
assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id
|
||||
assert runtime_state.variable_pool.get_by_prefix("sys")[SystemVariableKey.WORKFLOW_EXECUTION_ID] == execution_id
|
||||
assert graph.root_node.id == f"{node_id}_single_step_start"
|
||||
assert f"{node_id}_single_step_end" in graph.nodes
|
||||
|
||||
|
||||
def test_prepare_single_loop_injects_system_variables_and_fake_workflow():
|
||||
node_id = "loop_node"
|
||||
execution_id = "workflow-exec-456"
|
||||
|
||||
workflow = SimpleNamespace(
|
||||
id="workflow-id",
|
||||
tenant_id="tenant-id",
|
||||
app_id="app-id",
|
||||
environment_variables=[],
|
||||
graph_dict={
|
||||
"nodes": [
|
||||
{
|
||||
"id": node_id,
|
||||
"type": "custom",
|
||||
"data": {
|
||||
"type": NodeType.LOOP,
|
||||
"title": "Loop",
|
||||
"version": "1",
|
||||
"loop_count": 1,
|
||||
"break_conditions": [],
|
||||
"logical_operator": "and",
|
||||
"loop_variables": [],
|
||||
"outputs": {},
|
||||
},
|
||||
}
|
||||
],
|
||||
"edges": [],
|
||||
},
|
||||
)
|
||||
|
||||
runner = WorkflowBasedAppRunner(queue_manager=MagicMock(), app_id="app-id")
|
||||
|
||||
system_inputs = SystemVariable(app_id="app-id", workflow_id="workflow-id", workflow_execution_id=execution_id)
|
||||
|
||||
graph, _, runtime_state = runner._prepare_single_node_execution(
|
||||
workflow=workflow,
|
||||
single_loop_run=SimpleNamespace(node_id=node_id, inputs={}),
|
||||
system_variables=system_inputs,
|
||||
)
|
||||
|
||||
assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id
|
||||
assert graph.root_node.id == f"{node_id}_single_step_start"
|
||||
assert f"{node_id}_single_step_end" in graph.nodes
|
||||
@@ -0,0 +1,51 @@
|
||||
from core.workflow.entities import GraphInitParams
|
||||
from core.workflow.graph_events import (
|
||||
NodeRunIterationStartedEvent,
|
||||
NodeRunIterationSucceededEvent,
|
||||
NodeRunSucceededEvent,
|
||||
)
|
||||
from core.workflow.nodes.iteration.iteration_node import IterationNode
|
||||
from core.workflow.runtime import GraphRuntimeState, VariablePool
|
||||
from core.workflow.system_variable import SystemVariable
|
||||
|
||||
|
||||
def test_iteration_node_emits_iteration_events_when_iterator_empty():
|
||||
init_params = GraphInitParams(
|
||||
tenant_id="tenant",
|
||||
app_id="app",
|
||||
workflow_id="workflow",
|
||||
graph_config={},
|
||||
user_id="user",
|
||||
user_from="account",
|
||||
invoke_from="debugger",
|
||||
call_depth=0,
|
||||
)
|
||||
runtime_state = GraphRuntimeState(
|
||||
variable_pool=VariablePool(system_variables=SystemVariable.empty(), user_inputs={}),
|
||||
start_at=0.0,
|
||||
)
|
||||
runtime_state.variable_pool.add(("start", "items"), [])
|
||||
|
||||
node = IterationNode(
|
||||
id="iteration-node",
|
||||
config={
|
||||
"id": "iteration-node",
|
||||
"data": {
|
||||
"title": "Iteration",
|
||||
"iterator_selector": ["start", "items"],
|
||||
"output_selector": ["iteration-node", "output"],
|
||||
},
|
||||
},
|
||||
graph_init_params=init_params,
|
||||
graph_runtime_state=runtime_state,
|
||||
)
|
||||
|
||||
events = list(node.run())
|
||||
|
||||
assert any(isinstance(event, NodeRunIterationStartedEvent) for event in events)
|
||||
|
||||
iteration_succeeded_event = next(event for event in events if isinstance(event, NodeRunIterationSucceededEvent))
|
||||
assert iteration_succeeded_event.steps == 0
|
||||
assert iteration_succeeded_event.outputs == {"output": []}
|
||||
|
||||
assert any(isinstance(event, NodeRunSucceededEvent) for event in events)
|
||||
@@ -792,8 +792,12 @@ const useOneStepRun = <T>({
|
||||
},
|
||||
})
|
||||
const { data: iterationData } = params
|
||||
_runResult.created_by = iterationData.created_by.name
|
||||
setRunResult(_runResult)
|
||||
const nextRunResult = {
|
||||
..._runResult,
|
||||
created_by: (iterationData.created_by as any)?.name || '',
|
||||
}
|
||||
_runResult = nextRunResult
|
||||
setRunResult(nextRunResult)
|
||||
},
|
||||
onIterationStart: (params) => {
|
||||
const newIterationRunResult = produce(_iterationResult, (draft) => {
|
||||
@@ -895,8 +899,12 @@ const useOneStepRun = <T>({
|
||||
},
|
||||
})
|
||||
const { data: loopData } = params
|
||||
_runResult.created_by = loopData.created_by.name
|
||||
setRunResult(_runResult)
|
||||
const nextRunResult = {
|
||||
..._runResult,
|
||||
created_by: (loopData.created_by as any)?.name || '',
|
||||
}
|
||||
_runResult = nextRunResult
|
||||
setRunResult(nextRunResult)
|
||||
},
|
||||
onLoopStart: (params) => {
|
||||
const newLoopRunResult = produce(_loopResult, (draft) => {
|
||||
|
||||
Reference in New Issue
Block a user