Compare commits

...

3 Commits

Author SHA1 Message Date
Yeuoly
6ef8c7e125 Merge branch 'main' into victoria 2026-01-25 13:50:37 +08:00
-LAN-
afacf6ae2a test(graph_engien): Add tests for single run iteration and loop
Signed-off-by: -LAN- <laipz8200@outlook.com>
2026-01-24 00:33:10 +08:00
-LAN-
bd64062e8b fix(graph_engine): Cannot run single iteration or loop node
Signed-off-by: -LAN- <laipz8200@outlook.com>
2026-01-24 00:16:18 +08:00
14 changed files with 283 additions and 51 deletions

View File

@@ -157,7 +157,7 @@ class WorkflowBasedAppRunner:
# Create initial runtime state with variable pool containing environment variables
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
environment_variables=workflow.environment_variables,
),
@@ -272,7 +272,9 @@ class WorkflowBasedAppRunner:
)
# init graph
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=node_id)
graph = Graph.init(
graph_config=graph_config, node_factory=node_factory, root_node_id=node_id, skip_validation=True
)
if not graph:
raise ValueError("graph not found in workflow")

View File

@@ -288,6 +288,7 @@ class Graph:
graph_config: Mapping[str, object],
node_factory: NodeFactory,
root_node_id: str | None = None,
skip_validation: bool = False,
) -> Graph:
"""
Initialize graph
@@ -339,8 +340,9 @@ class Graph:
root_node=root_node,
)
# Validate the graph structure using built-in validators
get_graph_validator().validate(graph)
if not skip_validation:
# Validate the graph structure using built-in validators
get_graph_validator().validate(graph)
return graph

View File

@@ -44,7 +44,7 @@ class VariablePool(BaseModel):
)
system_variables: SystemVariable = Field(
description="System variables",
default_factory=SystemVariable.empty,
default_factory=SystemVariable.default,
)
environment_variables: Sequence[Variable] = Field(
description="Environment variables.",
@@ -271,4 +271,4 @@ class VariablePool(BaseModel):
@classmethod
def empty(cls) -> VariablePool:
"""Create an empty variable pool."""
return cls(system_variables=SystemVariable.empty())
return cls(system_variables=SystemVariable.default())

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from collections.abc import Mapping, Sequence
from types import MappingProxyType
from typing import Any
from uuid import uuid4
from pydantic import AliasChoices, BaseModel, ConfigDict, Field, model_validator
@@ -72,8 +73,8 @@ class SystemVariable(BaseModel):
return data
@classmethod
def empty(cls) -> SystemVariable:
return cls()
def default(cls) -> SystemVariable:
return cls(workflow_execution_id=str(uuid4()))
def to_dict(self) -> dict[SystemVariableKey, Any]:
# NOTE: This method is provided for compatibility with legacy code.

View File

@@ -276,7 +276,7 @@ class WorkflowEntry:
# init variable pool
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
environment_variables=[],
)

View File

@@ -436,7 +436,7 @@ class RagPipelineService:
user_inputs=user_inputs,
user_id=account.id,
variable_pool=VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs=user_inputs,
environment_variables=[],
conversation_variables=[],

View File

@@ -675,7 +675,7 @@ class WorkflowService:
else:
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs=user_inputs,
environment_variables=draft_workflow.environment_variables,
conversation_variables=[],
@@ -1063,7 +1063,7 @@ def _setup_variable_pool(
system_variable.conversation_id = conversation_id
system_variable.dialogue_count = 1
else:
system_variable = SystemVariable.empty()
system_variable = SystemVariable.default()
# init variable pool
variable_pool = VariablePool(

View File

@@ -0,0 +1,107 @@
from __future__ import annotations
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.apps.workflow.app_runner import WorkflowAppRunner
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.workflow.system_variable import SystemVariable
from models.workflow import Workflow
def _make_graph_state():
variable_pool = VariablePool(
system_variables=SystemVariable.default(),
user_inputs={},
environment_variables=[],
conversation_variables=[],
)
return MagicMock(), variable_pool, GraphRuntimeState(variable_pool=variable_pool, start_at=0.0)
@pytest.mark.parametrize(
("single_iteration_run", "single_loop_run"),
[
(WorkflowAppGenerateEntity.SingleIterationRunEntity(node_id="iter", inputs={}), None),
(None, WorkflowAppGenerateEntity.SingleLoopRunEntity(node_id="loop", inputs={})),
],
)
def test_run_uses_single_node_execution_branch(
single_iteration_run: Any,
single_loop_run: Any,
) -> None:
app_config = MagicMock()
app_config.app_id = "app"
app_config.tenant_id = "tenant"
app_config.workflow_id = "workflow"
app_generate_entity = MagicMock(spec=WorkflowAppGenerateEntity)
app_generate_entity.app_config = app_config
app_generate_entity.inputs = {}
app_generate_entity.files = []
app_generate_entity.user_id = "user"
app_generate_entity.invoke_from = InvokeFrom.SERVICE_API
app_generate_entity.workflow_execution_id = "execution-id"
app_generate_entity.task_id = "task-id"
app_generate_entity.call_depth = 0
app_generate_entity.trace_manager = None
app_generate_entity.single_iteration_run = single_iteration_run
app_generate_entity.single_loop_run = single_loop_run
workflow = MagicMock(spec=Workflow)
workflow.tenant_id = "tenant"
workflow.app_id = "app"
workflow.id = "workflow"
workflow.type = "workflow"
workflow.version = "v1"
workflow.graph_dict = {"nodes": [], "edges": []}
workflow.environment_variables = []
runner = WorkflowAppRunner(
application_generate_entity=app_generate_entity,
queue_manager=MagicMock(spec=AppQueueManager),
variable_loader=MagicMock(),
workflow=workflow,
system_user_id="system-user",
workflow_execution_repository=MagicMock(),
workflow_node_execution_repository=MagicMock(),
)
graph, variable_pool, graph_runtime_state = _make_graph_state()
mock_workflow_entry = MagicMock()
mock_workflow_entry.graph_engine = MagicMock()
mock_workflow_entry.graph_engine.layer = MagicMock()
mock_workflow_entry.run.return_value = iter([])
with (
patch("core.app.apps.workflow.app_runner.RedisChannel"),
patch("core.app.apps.workflow.app_runner.redis_client"),
patch("core.app.apps.workflow.app_runner.WorkflowEntry", return_value=mock_workflow_entry) as entry_class,
patch.object(
runner,
"_prepare_single_node_execution",
return_value=(
graph,
variable_pool,
graph_runtime_state,
),
) as prepare_single,
patch.object(runner, "_init_graph") as init_graph,
):
runner.run()
prepare_single.assert_called_once_with(
workflow=workflow,
single_iteration_run=single_iteration_run,
single_loop_run=single_loop_run,
)
init_graph.assert_not_called()
entry_kwargs = entry_class.call_args.kwargs
assert entry_kwargs["invoke_from"] == InvokeFrom.DEBUGGER
assert entry_kwargs["variable_pool"] is variable_pool
assert entry_kwargs["graph_runtime_state"] is graph_runtime_state

View File

@@ -0,0 +1,120 @@
from __future__ import annotations
from typing import Any
import pytest
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.workflow.node_factory import DifyNodeFactory
from core.workflow.entities import GraphInitParams
from core.workflow.graph import Graph
from core.workflow.graph.validation import GraphValidationError
from core.workflow.nodes import NodeType
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.workflow.system_variable import SystemVariable
from models.enums import UserFrom
def _build_iteration_graph(node_id: str) -> dict[str, Any]:
return {
"nodes": [
{
"id": node_id,
"data": {
"type": "iteration",
"title": "Iteration",
"iterator_selector": ["start", "items"],
"output_selector": [node_id, "output"],
},
}
],
"edges": [],
}
def _build_loop_graph(node_id: str) -> dict[str, Any]:
return {
"nodes": [
{
"id": node_id,
"data": {
"type": "loop",
"title": "Loop",
"loop_count": 1,
"break_conditions": [],
"logical_operator": "and",
"loop_variables": [],
"outputs": {},
},
}
],
"edges": [],
}
def _make_factory(graph_config: dict[str, Any]) -> DifyNodeFactory:
graph_init_params = GraphInitParams(
tenant_id="tenant",
app_id="app",
workflow_id="workflow",
graph_config=graph_config,
user_id="user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(
system_variables=SystemVariable.default(),
user_inputs={},
environment_variables=[],
),
start_at=0.0,
)
return DifyNodeFactory(graph_init_params=graph_init_params, graph_runtime_state=graph_runtime_state)
def test_iteration_root_requires_skip_validation():
node_id = "iteration-node"
graph_config = _build_iteration_graph(node_id)
node_factory = _make_factory(graph_config)
with pytest.raises(GraphValidationError):
Graph.init(
graph_config=graph_config,
node_factory=node_factory,
root_node_id=node_id,
)
graph = Graph.init(
graph_config=graph_config,
node_factory=node_factory,
root_node_id=node_id,
skip_validation=True,
)
assert graph.root_node.id == node_id
assert graph.root_node.node_type == NodeType.ITERATION
def test_loop_root_requires_skip_validation():
node_id = "loop-node"
graph_config = _build_loop_graph(node_id)
node_factory = _make_factory(graph_config)
with pytest.raises(GraphValidationError):
Graph.init(
graph_config=graph_config,
node_factory=node_factory,
root_node_id=node_id,
)
graph = Graph.init(
graph_config=graph_config,
node_factory=node_factory,
root_node_id=node_id,
skip_validation=True,
)
assert graph.root_node.id == node_id
assert graph.root_node.node_type == NodeType.LOOP

View File

@@ -16,7 +16,7 @@ from core.workflow.system_variable import SystemVariable
def test_executor_with_json_body_and_number_variable():
# Prepare the variable pool
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
)
variable_pool.add(["pre_node_id", "number"], 42)
@@ -69,7 +69,7 @@ def test_executor_with_json_body_and_number_variable():
def test_executor_with_json_body_and_object_variable():
# Prepare the variable pool
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
)
variable_pool.add(["pre_node_id", "object"], {"name": "John Doe", "age": 30, "email": "john@example.com"})
@@ -124,7 +124,7 @@ def test_executor_with_json_body_and_object_variable():
def test_executor_with_json_body_and_nested_object_variable():
# Prepare the variable pool
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
)
variable_pool.add(["pre_node_id", "object"], {"name": "John Doe", "age": 30, "email": "john@example.com"})
@@ -178,7 +178,7 @@ def test_executor_with_json_body_and_nested_object_variable():
def test_extract_selectors_from_template_with_newline():
variable_pool = VariablePool(system_variables=SystemVariable.empty())
variable_pool = VariablePool(system_variables=SystemVariable.default())
variable_pool.add(("node_id", "custom_query"), "line1\nline2")
node_data = HttpRequestNodeData(
title="Test JSON Body with Nested Object Variable",
@@ -205,7 +205,7 @@ def test_extract_selectors_from_template_with_newline():
def test_executor_with_form_data():
# Prepare the variable pool
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
)
variable_pool.add(["pre_node_id", "text_field"], "Hello, World!")
@@ -290,7 +290,7 @@ def test_init_headers():
return Executor(
node_data=node_data,
timeout=timeout,
variable_pool=VariablePool(system_variables=SystemVariable.empty()),
variable_pool=VariablePool(system_variables=SystemVariable.default()),
)
executor = create_executor("aa\n cc:")
@@ -324,7 +324,7 @@ def test_init_params():
return Executor(
node_data=node_data,
timeout=timeout,
variable_pool=VariablePool(system_variables=SystemVariable.empty()),
variable_pool=VariablePool(system_variables=SystemVariable.default()),
)
# Test basic key-value pairs
@@ -355,7 +355,7 @@ def test_init_params():
def test_empty_api_key_raises_error_bearer():
"""Test that empty API key raises AuthorizationConfigError for bearer auth."""
variable_pool = VariablePool(system_variables=SystemVariable.empty())
variable_pool = VariablePool(system_variables=SystemVariable.default())
node_data = HttpRequestNodeData(
title="test",
method="get",
@@ -379,7 +379,7 @@ def test_empty_api_key_raises_error_bearer():
def test_empty_api_key_raises_error_basic():
"""Test that empty API key raises AuthorizationConfigError for basic auth."""
variable_pool = VariablePool(system_variables=SystemVariable.empty())
variable_pool = VariablePool(system_variables=SystemVariable.default())
node_data = HttpRequestNodeData(
title="test",
method="get",
@@ -403,7 +403,7 @@ def test_empty_api_key_raises_error_basic():
def test_empty_api_key_raises_error_custom():
"""Test that empty API key raises AuthorizationConfigError for custom auth."""
variable_pool = VariablePool(system_variables=SystemVariable.empty())
variable_pool = VariablePool(system_variables=SystemVariable.default())
node_data = HttpRequestNodeData(
title="test",
method="get",
@@ -427,7 +427,7 @@ def test_empty_api_key_raises_error_custom():
def test_whitespace_only_api_key_raises_error():
"""Test that whitespace-only API key raises AuthorizationConfigError."""
variable_pool = VariablePool(system_variables=SystemVariable.empty())
variable_pool = VariablePool(system_variables=SystemVariable.default())
node_data = HttpRequestNodeData(
title="test",
method="get",
@@ -451,7 +451,7 @@ def test_whitespace_only_api_key_raises_error():
def test_valid_api_key_works():
"""Test that valid API key works correctly for bearer auth."""
variable_pool = VariablePool(system_variables=SystemVariable.empty())
variable_pool = VariablePool(system_variables=SystemVariable.default())
node_data = HttpRequestNodeData(
title="test",
method="get",

View File

@@ -86,7 +86,7 @@ def graph_init_params() -> GraphInitParams:
@pytest.fixture
def graph_runtime_state() -> GraphRuntimeState:
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
)
return GraphRuntimeState(

View File

@@ -111,7 +111,7 @@ def test_webhook_node_file_conversion_to_file_variable():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {},
@@ -184,7 +184,7 @@ def test_webhook_node_file_conversion_with_missing_files():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {},
@@ -219,7 +219,7 @@ def test_webhook_node_file_conversion_with_none_file():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {},
@@ -256,7 +256,7 @@ def test_webhook_node_file_conversion_with_non_dict_file():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {},
@@ -300,7 +300,7 @@ def test_webhook_node_file_conversion_mixed_parameters():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {},
@@ -370,7 +370,7 @@ def test_webhook_node_different_file_types():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {},
@@ -430,7 +430,7 @@ def test_webhook_node_file_conversion_with_non_dict_wrapper():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {},

View File

@@ -75,7 +75,7 @@ def test_webhook_node_basic_initialization():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
)
@@ -118,7 +118,7 @@ def test_webhook_node_run_with_headers():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {
@@ -154,7 +154,7 @@ def test_webhook_node_run_with_query_params():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {},
@@ -190,7 +190,7 @@ def test_webhook_node_run_with_body_params():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {},
@@ -249,7 +249,7 @@ def test_webhook_node_run_with_file_params():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {},
@@ -302,7 +302,7 @@ def test_webhook_node_run_mixed_parameters():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {"Authorization": "Bearer token"},
@@ -342,7 +342,7 @@ def test_webhook_node_run_empty_webhook_data():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={}, # No webhook_data
)
@@ -368,7 +368,7 @@ def test_webhook_node_run_case_insensitive_headers():
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {
@@ -398,7 +398,7 @@ def test_webhook_node_variable_pool_user_inputs():
# Add some additional variables to the pool
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {"headers": {}, "query_params": {}, "body": {}, "files": {}},
"other_var": "should_be_included",
@@ -429,7 +429,7 @@ def test_webhook_node_different_methods(method):
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={
"webhook_data": {
"headers": {},

View File

@@ -127,7 +127,7 @@ class TestWorkflowEntry:
return node_config
workflow = StubWorkflow()
variable_pool = VariablePool(system_variables=SystemVariable.empty(), user_inputs={})
variable_pool = VariablePool(system_variables=SystemVariable.default(), user_inputs={})
expected_limits = CodeNodeLimits(
max_string_length=dify_config.CODE_MAX_STRING_LENGTH,
max_number=dify_config.CODE_MAX_NUMBER,
@@ -157,7 +157,7 @@ class TestWorkflowEntry:
# Initialize variable pool with environment variables
env_var = StringVariable(name="API_KEY", value="existing_key")
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
environment_variables=[env_var],
user_inputs={},
)
@@ -198,7 +198,7 @@ class TestWorkflowEntry:
# Initialize variable pool with conversation variables
conv_var = StringVariable(name="last_message", value="Hello")
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
conversation_variables=[conv_var],
user_inputs={},
)
@@ -239,7 +239,7 @@ class TestWorkflowEntry:
"""Test mapping regular node variables from user inputs to variable pool."""
# Initialize empty variable pool
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
)
@@ -281,7 +281,7 @@ class TestWorkflowEntry:
def test_mapping_user_inputs_with_file_handling(self):
"""Test mapping file inputs from user inputs to variable pool."""
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
)
@@ -340,7 +340,7 @@ class TestWorkflowEntry:
def test_mapping_user_inputs_missing_variable_error(self):
"""Test that mapping raises error when required variable is missing."""
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
)
@@ -366,7 +366,7 @@ class TestWorkflowEntry:
def test_mapping_user_inputs_with_alternative_key_format(self):
"""Test mapping with alternative key format (without node prefix)."""
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
)
@@ -396,7 +396,7 @@ class TestWorkflowEntry:
def test_mapping_user_inputs_with_complex_selectors(self):
"""Test mapping with complex node variable keys."""
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
)
@@ -432,7 +432,7 @@ class TestWorkflowEntry:
def test_mapping_user_inputs_invalid_node_variable(self):
"""Test that mapping handles invalid node variable format."""
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
)