mirror of
https://github.com/langgenius/dify.git
synced 2026-04-10 18:42:48 +00:00
Compare commits
4 Commits
main
...
locate-que
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af7dad4412 | ||
|
|
f33840a105 | ||
|
|
91f77543ea | ||
|
|
aaa6629619 |
@@ -33,6 +33,7 @@ from core.moderation.base import ModerationError
|
||||
from core.moderation.input_moderation import InputModeration
|
||||
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
|
||||
from core.workflow.node_factory import get_default_root_node_id
|
||||
from core.workflow.runtime_state import create_graph_runtime_state
|
||||
from core.workflow.system_variables import (
|
||||
build_bootstrap_variables,
|
||||
build_system_variables,
|
||||
@@ -188,7 +189,11 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=new_inputs)
|
||||
|
||||
# init graph
|
||||
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.time())
|
||||
graph_runtime_state = create_graph_runtime_state(
|
||||
variable_pool=variable_pool,
|
||||
start_at=time.time(),
|
||||
workflow_id=self._workflow.id,
|
||||
)
|
||||
graph = self._init_graph(
|
||||
graph_config=self._workflow.graph_dict,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
|
||||
@@ -22,6 +22,7 @@ from core.app.entities.app_invoke_entities import (
|
||||
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
|
||||
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
|
||||
from core.workflow.node_factory import DifyGraphInitContext, DifyNodeFactory, get_default_root_node_id
|
||||
from core.workflow.runtime_state import create_graph_runtime_state
|
||||
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
|
||||
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
|
||||
from core.workflow.workflow_entry import WorkflowEntry
|
||||
@@ -157,7 +158,11 @@ class PipelineRunner(WorkflowBasedAppRunner):
|
||||
workflow.graph_dict
|
||||
)
|
||||
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=inputs)
|
||||
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
|
||||
graph_runtime_state = create_graph_runtime_state(
|
||||
variable_pool=variable_pool,
|
||||
start_at=time.perf_counter(),
|
||||
workflow_id=workflow.id,
|
||||
)
|
||||
|
||||
# init graph
|
||||
graph = self._init_rag_pipeline_graph(
|
||||
|
||||
@@ -16,6 +16,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat
|
||||
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
|
||||
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
|
||||
from core.workflow.node_factory import get_default_root_node_id
|
||||
from core.workflow.runtime_state import create_graph_runtime_state
|
||||
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
|
||||
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
|
||||
from core.workflow.workflow_entry import WorkflowEntry
|
||||
@@ -118,7 +119,11 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
|
||||
root_node_id = self._root_node_id or get_default_root_node_id(self._workflow.graph_dict)
|
||||
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=inputs)
|
||||
|
||||
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
|
||||
graph_runtime_state = create_graph_runtime_state(
|
||||
variable_pool=variable_pool,
|
||||
start_at=time.perf_counter(),
|
||||
workflow_id=self._workflow.id,
|
||||
)
|
||||
graph = self._init_graph(
|
||||
graph_config=self._workflow.graph_dict,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
|
||||
@@ -72,6 +72,7 @@ from core.workflow.node_factory import (
|
||||
get_default_root_node_id,
|
||||
resolve_workflow_node_class,
|
||||
)
|
||||
from core.workflow.runtime_state import create_graph_runtime_state
|
||||
from core.workflow.system_variables import (
|
||||
build_bootstrap_variables,
|
||||
default_system_variables,
|
||||
@@ -187,6 +188,9 @@ class WorkflowBasedAppRunner:
|
||||
Raises:
|
||||
ValueError: If neither single_iteration_run nor single_loop_run is specified
|
||||
"""
|
||||
if single_iteration_run is None and single_loop_run is None:
|
||||
raise ValueError("Neither single_iteration_run nor single_loop_run is specified")
|
||||
|
||||
# Create initial runtime state with variable pool containing environment variables
|
||||
variable_pool = VariablePool()
|
||||
add_variables_to_pool(
|
||||
@@ -196,10 +200,14 @@ class WorkflowBasedAppRunner:
|
||||
environment_variables=workflow.environment_variables,
|
||||
),
|
||||
)
|
||||
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.time())
|
||||
graph_runtime_state = create_graph_runtime_state(
|
||||
variable_pool=variable_pool,
|
||||
start_at=time.time(),
|
||||
workflow_id=workflow.id,
|
||||
)
|
||||
|
||||
# Determine which type of single node execution and get graph/variable_pool
|
||||
if single_iteration_run:
|
||||
if single_iteration_run is not None:
|
||||
graph, variable_pool = self._get_graph_and_variable_pool_for_single_node_run(
|
||||
workflow=workflow,
|
||||
node_id=single_iteration_run.node_id,
|
||||
@@ -209,7 +217,8 @@ class WorkflowBasedAppRunner:
|
||||
node_type_label="iteration",
|
||||
user_id=user_id,
|
||||
)
|
||||
elif single_loop_run:
|
||||
else:
|
||||
assert single_loop_run is not None
|
||||
graph, variable_pool = self._get_graph_and_variable_pool_for_single_node_run(
|
||||
workflow=workflow,
|
||||
node_id=single_loop_run.node_id,
|
||||
@@ -219,9 +228,6 @@ class WorkflowBasedAppRunner:
|
||||
node_type_label="loop",
|
||||
user_id=user_id,
|
||||
)
|
||||
else:
|
||||
raise ValueError("Neither single_iteration_run nor single_loop_run is specified")
|
||||
|
||||
# Return the graph, variable_pool, and the same graph_runtime_state used during graph creation
|
||||
# This ensures all nodes in the graph reference the same GraphRuntimeState instance
|
||||
return graph, variable_pool, graph_runtime_state
|
||||
|
||||
127
api/core/workflow/runtime_state.py
Normal file
127
api/core/workflow/runtime_state.py
Normal file
@@ -0,0 +1,127 @@
|
||||
"""Helpers for explicitly wiring GraphRuntimeState collaborators.
|
||||
|
||||
GraphOn currently supports lazy construction of several runtime-state
|
||||
collaborators such as the ready queue, graph execution aggregate, and response
|
||||
coordinator. Dify initializes those collaborators eagerly so repository code
|
||||
does not depend on that implicit behavior.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import AbstractContextManager
|
||||
from typing import Any, cast
|
||||
|
||||
from graphon.graph import Graph
|
||||
from graphon.graph_engine.domain.graph_execution import GraphExecution
|
||||
from graphon.graph_engine.ready_queue import InMemoryReadyQueue
|
||||
from graphon.graph_engine.response_coordinator import ResponseStreamCoordinator
|
||||
from graphon.model_runtime.entities.llm_entities import LLMUsage
|
||||
from graphon.runtime import GraphRuntimeState, VariablePool
|
||||
|
||||
|
||||
def _require_workflow_id(workflow_id: str) -> str:
|
||||
"""Validate that workflow-scoped runtime collaborators receive a real id."""
|
||||
|
||||
if not workflow_id:
|
||||
raise ValueError("workflow_id must be a non-empty string")
|
||||
return workflow_id
|
||||
|
||||
|
||||
def create_graph_runtime_state(
|
||||
*,
|
||||
variable_pool: VariablePool,
|
||||
start_at: float,
|
||||
workflow_id: str,
|
||||
total_tokens: int = 0,
|
||||
llm_usage: LLMUsage | None = None,
|
||||
outputs: dict[str, object] | None = None,
|
||||
node_run_steps: int = 0,
|
||||
execution_context: AbstractContextManager[object] | None = None,
|
||||
) -> GraphRuntimeState:
|
||||
"""Create a runtime state with explicit non-graph collaborators.
|
||||
|
||||
The graph itself is attached later, once node construction has completed and
|
||||
the final Graph instance exists.
|
||||
"""
|
||||
workflow_id = _require_workflow_id(workflow_id)
|
||||
|
||||
return GraphRuntimeState(
|
||||
variable_pool=variable_pool,
|
||||
start_at=start_at,
|
||||
total_tokens=total_tokens,
|
||||
llm_usage=llm_usage or LLMUsage.empty_usage(),
|
||||
outputs=outputs or {},
|
||||
node_run_steps=node_run_steps,
|
||||
ready_queue=InMemoryReadyQueue(),
|
||||
graph_execution=GraphExecution(workflow_id=workflow_id),
|
||||
execution_context=execution_context,
|
||||
)
|
||||
|
||||
|
||||
def ensure_graph_runtime_state_initialized(
|
||||
graph_runtime_state: GraphRuntimeState,
|
||||
*,
|
||||
workflow_id: str,
|
||||
) -> GraphRuntimeState:
|
||||
"""Materialize non-graph collaborators when loading legacy or sparse state."""
|
||||
workflow_id = _require_workflow_id(workflow_id)
|
||||
state = cast(Any, graph_runtime_state)
|
||||
|
||||
if state._ready_queue is None:
|
||||
state._ready_queue = InMemoryReadyQueue()
|
||||
|
||||
graph_execution = state._graph_execution
|
||||
if graph_execution is None:
|
||||
state._graph_execution = GraphExecution(
|
||||
workflow_id=workflow_id,
|
||||
)
|
||||
elif not graph_execution.workflow_id:
|
||||
graph_execution.workflow_id = workflow_id
|
||||
elif graph_execution.workflow_id != workflow_id:
|
||||
raise ValueError("GraphRuntimeState workflow_id does not match graph execution workflow_id")
|
||||
|
||||
return graph_runtime_state
|
||||
|
||||
|
||||
def bind_graph_runtime_state_to_graph(
|
||||
graph_runtime_state: GraphRuntimeState,
|
||||
graph: Graph,
|
||||
*,
|
||||
workflow_id: str,
|
||||
) -> GraphRuntimeState:
|
||||
"""Attach graph-scoped collaborators without relying on GraphOn lazy setup."""
|
||||
|
||||
ensure_graph_runtime_state_initialized(
|
||||
graph_runtime_state,
|
||||
workflow_id=workflow_id,
|
||||
)
|
||||
state = cast(Any, graph_runtime_state)
|
||||
graph_protocol = cast(Any, graph)
|
||||
|
||||
attached_graph = state._graph
|
||||
if attached_graph is not None and attached_graph is not graph:
|
||||
raise ValueError("GraphRuntimeState already attached to a different graph instance")
|
||||
|
||||
if state._response_coordinator is None:
|
||||
response_coordinator = ResponseStreamCoordinator(
|
||||
variable_pool=graph_runtime_state.variable_pool,
|
||||
graph=graph_protocol,
|
||||
)
|
||||
state._response_coordinator = response_coordinator
|
||||
|
||||
graph_runtime_state.attach_graph(graph_protocol)
|
||||
return graph_runtime_state
|
||||
|
||||
|
||||
def snapshot_graph_runtime_state(
|
||||
graph_runtime_state: GraphRuntimeState,
|
||||
*,
|
||||
workflow_id: str,
|
||||
) -> str:
|
||||
"""Serialize runtime state after explicit collaborator initialization."""
|
||||
|
||||
ensure_graph_runtime_state_initialized(
|
||||
graph_runtime_state,
|
||||
workflow_id=workflow_id,
|
||||
)
|
||||
return graph_runtime_state.dumps()
|
||||
@@ -30,6 +30,10 @@ from core.workflow.node_factory import (
|
||||
is_start_node_type,
|
||||
resolve_workflow_node_class,
|
||||
)
|
||||
from core.workflow.runtime_state import (
|
||||
bind_graph_runtime_state_to_graph,
|
||||
create_graph_runtime_state,
|
||||
)
|
||||
from core.workflow.system_variables import (
|
||||
default_system_variables,
|
||||
get_node_creation_preload_selectors,
|
||||
@@ -46,6 +50,10 @@ logger = logging.getLogger(__name__)
|
||||
_file_access_controller = DatabaseFileAccessController()
|
||||
|
||||
|
||||
def _build_free_node_workflow_id(node_id: str) -> str:
|
||||
return f"free-node:{node_id}"
|
||||
|
||||
|
||||
class _WorkflowChildEngineBuilder:
|
||||
@staticmethod
|
||||
def _has_node_id(graph_config: Mapping[str, Any], node_id: str) -> bool | None:
|
||||
@@ -77,9 +85,10 @@ class _WorkflowChildEngineBuilder:
|
||||
variable_pool: VariablePool | None = None,
|
||||
) -> GraphEngine:
|
||||
"""Build a child engine with a fresh runtime state and only child-safe layers."""
|
||||
child_graph_runtime_state = GraphRuntimeState(
|
||||
child_graph_runtime_state = create_graph_runtime_state(
|
||||
variable_pool=variable_pool if variable_pool is not None else parent_graph_runtime_state.variable_pool,
|
||||
start_at=time.perf_counter(),
|
||||
workflow_id=workflow_id,
|
||||
execution_context=parent_graph_runtime_state.execution_context,
|
||||
)
|
||||
node_factory = DifyNodeFactory(
|
||||
@@ -97,6 +106,11 @@ class _WorkflowChildEngineBuilder:
|
||||
node_factory=node_factory,
|
||||
root_node_id=root_node_id,
|
||||
)
|
||||
bind_graph_runtime_state_to_graph(
|
||||
child_graph_runtime_state,
|
||||
child_graph,
|
||||
workflow_id=workflow_id,
|
||||
)
|
||||
|
||||
command_channel = InMemoryChannel()
|
||||
config = GraphEngineConfig()
|
||||
@@ -177,6 +191,11 @@ class WorkflowEntry:
|
||||
self.command_channel = command_channel
|
||||
execution_context = capture_current_context()
|
||||
graph_runtime_state.execution_context = execution_context
|
||||
bind_graph_runtime_state_to_graph(
|
||||
graph_runtime_state,
|
||||
graph,
|
||||
workflow_id=workflow_id,
|
||||
)
|
||||
self._child_engine_builder = _WorkflowChildEngineBuilder()
|
||||
self.graph_engine = GraphEngine(
|
||||
workflow_id=workflow_id,
|
||||
@@ -270,9 +289,10 @@ class WorkflowEntry:
|
||||
run_context=run_context,
|
||||
call_depth=0,
|
||||
)
|
||||
graph_runtime_state = GraphRuntimeState(
|
||||
graph_runtime_state = create_graph_runtime_state(
|
||||
variable_pool=variable_pool,
|
||||
start_at=time.perf_counter(),
|
||||
workflow_id=workflow.id,
|
||||
execution_context=capture_current_context(),
|
||||
)
|
||||
|
||||
@@ -410,6 +430,7 @@ class WorkflowEntry:
|
||||
node_cls = resolve_workflow_node_class(node_type=node_type, node_version="1")
|
||||
if not node_cls:
|
||||
raise ValueError(f"Node class not found for node type {node_type}")
|
||||
workflow_id = _build_free_node_workflow_id(node_id)
|
||||
|
||||
# init variable pool
|
||||
variable_pool = VariablePool()
|
||||
@@ -424,14 +445,15 @@ class WorkflowEntry:
|
||||
invoke_from=InvokeFrom.DEBUGGER,
|
||||
)
|
||||
graph_init_context = DifyGraphInitContext(
|
||||
workflow_id="",
|
||||
workflow_id=workflow_id,
|
||||
graph_config=graph_dict,
|
||||
run_context=run_context,
|
||||
call_depth=0,
|
||||
)
|
||||
graph_runtime_state = GraphRuntimeState(
|
||||
graph_runtime_state = create_graph_runtime_state(
|
||||
variable_pool=variable_pool,
|
||||
start_at=time.perf_counter(),
|
||||
workflow_id=workflow_id,
|
||||
execution_context=capture_current_context(),
|
||||
)
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ from graphon.nodes.human_input.entities import HumanInputNodeData, validate_huma
|
||||
from graphon.nodes.human_input.enums import HumanInputFormKind
|
||||
from graphon.nodes.human_input.human_input_node import HumanInputNode
|
||||
from graphon.nodes.start.entities import StartNodeData
|
||||
from graphon.runtime import GraphRuntimeState, VariablePool
|
||||
from graphon.runtime import VariablePool
|
||||
from graphon.variable_loader import load_into_variable_pool
|
||||
from graphon.variables import VariableBase
|
||||
from graphon.variables.input_entities import VariableEntityType
|
||||
@@ -55,6 +55,7 @@ from core.workflow.node_factory import (
|
||||
is_start_node_type,
|
||||
)
|
||||
from core.workflow.node_runtime import DifyHumanInputNodeRuntime, apply_dify_debug_email_recipient
|
||||
from core.workflow.runtime_state import create_graph_runtime_state
|
||||
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables, default_system_variables
|
||||
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
|
||||
from core.workflow.workflow_entry import WorkflowEntry
|
||||
@@ -1151,9 +1152,10 @@ class WorkflowService:
|
||||
call_depth=0,
|
||||
)
|
||||
graph_init_params = graph_init_context.to_graph_init_params()
|
||||
graph_runtime_state = GraphRuntimeState(
|
||||
graph_runtime_state = create_graph_runtime_state(
|
||||
variable_pool=variable_pool,
|
||||
start_at=time.perf_counter(),
|
||||
workflow_id=workflow.id,
|
||||
)
|
||||
node = HumanInputNode(
|
||||
id=node_config["id"],
|
||||
|
||||
@@ -28,7 +28,7 @@ from graphon.graph_engine.entities.commands import GraphEngineCommand
|
||||
from graphon.graph_engine.layers.base import GraphEngineLayerNotInitializedError
|
||||
from graphon.graph_events import GraphRunPausedEvent
|
||||
from graphon.model_runtime.entities.llm_entities import LLMUsage
|
||||
from graphon.runtime import GraphRuntimeState, ReadOnlyGraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool
|
||||
from graphon.runtime import ReadOnlyGraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool
|
||||
from sqlalchemy import Engine, delete, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
@@ -38,6 +38,7 @@ from core.app.layers.pause_state_persist_layer import (
|
||||
PauseStatePersistenceLayer,
|
||||
WorkflowResumptionContext,
|
||||
)
|
||||
from core.workflow.runtime_state import create_graph_runtime_state
|
||||
from core.workflow.system_variables import build_system_variables
|
||||
from extensions.ext_storage import storage
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
@@ -203,11 +204,13 @@ class TestPauseStatePersistenceLayerTestContainers:
|
||||
node_run_steps: int = 0,
|
||||
variables: dict[tuple[str, str], object] | None = None,
|
||||
workflow_run_id: str | None = None,
|
||||
workflow_id: str | None = None,
|
||||
) -> ReadOnlyGraphRuntimeState:
|
||||
"""Create a real GraphRuntimeState for testing."""
|
||||
start_at = time()
|
||||
|
||||
execution_id = workflow_run_id or getattr(self, "test_workflow_run_id", None) or str(uuid.uuid4())
|
||||
resolved_workflow_id = workflow_id or getattr(self, "test_workflow_id", None) or str(uuid.uuid4())
|
||||
|
||||
# Create variable pool
|
||||
variable_pool = VariablePool(system_variables=build_system_variables(workflow_execution_id=execution_id))
|
||||
@@ -219,9 +222,10 @@ class TestPauseStatePersistenceLayerTestContainers:
|
||||
llm_usage = LLMUsage.empty_usage()
|
||||
|
||||
# Create graph runtime state
|
||||
graph_runtime_state = GraphRuntimeState(
|
||||
graph_runtime_state = create_graph_runtime_state(
|
||||
variable_pool=variable_pool,
|
||||
start_at=start_at,
|
||||
workflow_id=resolved_workflow_id,
|
||||
total_tokens=total_tokens,
|
||||
llm_usage=llm_usage,
|
||||
outputs=outputs or {},
|
||||
|
||||
@@ -26,6 +26,11 @@ from core.repositories.human_input_repository import HumanInputFormEntity, Human
|
||||
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
|
||||
from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository
|
||||
from core.workflow.node_runtime import DifyHumanInputNodeRuntime
|
||||
from core.workflow.runtime_state import (
|
||||
bind_graph_runtime_state_to_graph,
|
||||
create_graph_runtime_state,
|
||||
snapshot_graph_runtime_state,
|
||||
)
|
||||
from core.workflow.system_variables import build_system_variables
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from models import Account
|
||||
@@ -76,7 +81,11 @@ def _build_runtime_state(workflow_execution_id: str, app_id: str, workflow_id: s
|
||||
user_inputs={},
|
||||
conversation_variables=[],
|
||||
)
|
||||
return GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
|
||||
return create_graph_runtime_state(
|
||||
variable_pool=variable_pool,
|
||||
start_at=time.perf_counter(),
|
||||
workflow_id=workflow_id,
|
||||
)
|
||||
|
||||
|
||||
def _build_graph(
|
||||
@@ -285,6 +294,11 @@ class TestHumanInputResumeNodeExecutionIntegration:
|
||||
)
|
||||
|
||||
def _run_graph(self, graph: Graph, runtime_state: GraphRuntimeState, execution_id: str) -> None:
|
||||
bind_graph_runtime_state_to_graph(
|
||||
runtime_state,
|
||||
graph,
|
||||
workflow_id=self.workflow.id,
|
||||
)
|
||||
engine = GraphEngine(
|
||||
workflow_id=self.workflow.id,
|
||||
graph=graph,
|
||||
@@ -314,7 +328,10 @@ class TestHumanInputResumeNodeExecutionIntegration:
|
||||
)
|
||||
self._run_graph(paused_graph, runtime_state, execution_id)
|
||||
|
||||
snapshot = runtime_state.dumps()
|
||||
snapshot = snapshot_graph_runtime_state(
|
||||
runtime_state,
|
||||
workflow_id=self.workflow.id,
|
||||
)
|
||||
resumed_state = GraphRuntimeState.from_snapshot(snapshot)
|
||||
resume_repo = _mock_form_repository_with_submission(action_id="continue")
|
||||
resumed_graph = _build_graph(
|
||||
|
||||
@@ -5,7 +5,7 @@ from unittest.mock import patch
|
||||
import pytest
|
||||
from graphon.enums import WorkflowExecutionStatus
|
||||
from graphon.nodes.human_input.entities import HumanInputNodeData
|
||||
from graphon.runtime import GraphRuntimeState, VariablePool
|
||||
from graphon.runtime import VariablePool
|
||||
|
||||
from configs import dify_config
|
||||
from core.app.app_config.entities import WorkflowUIBasedAppConfig
|
||||
@@ -19,6 +19,7 @@ from core.workflow.human_input_compat import (
|
||||
ExternalRecipient,
|
||||
MemberRecipient,
|
||||
)
|
||||
from core.workflow.runtime_state import create_graph_runtime_state, snapshot_graph_runtime_state
|
||||
from extensions.ext_storage import storage
|
||||
from models.account import Account, AccountStatus, Tenant, TenantAccountJoin, TenantAccountRole
|
||||
from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom
|
||||
@@ -136,7 +137,11 @@ def _create_workflow_pause_state(
|
||||
)
|
||||
db_session_with_containers.add(workflow_run)
|
||||
|
||||
runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0)
|
||||
runtime_state = create_graph_runtime_state(
|
||||
variable_pool=variable_pool,
|
||||
start_at=0.0,
|
||||
workflow_id=workflow_id,
|
||||
)
|
||||
resumption_context = WorkflowResumptionContext(
|
||||
generate_entity={
|
||||
"type": AppMode.WORKFLOW,
|
||||
@@ -156,7 +161,10 @@ def _create_workflow_pause_state(
|
||||
workflow_execution_id=workflow_run_id,
|
||||
),
|
||||
},
|
||||
serialized_graph_runtime_state=runtime_state.dumps(),
|
||||
serialized_graph_runtime_state=snapshot_graph_runtime_state(
|
||||
runtime_state,
|
||||
workflow_id=workflow_id,
|
||||
),
|
||||
)
|
||||
|
||||
state_object_key = f"workflow_pause_states/{workflow_run_id}.json"
|
||||
|
||||
@@ -30,6 +30,11 @@ from core.app.apps.advanced_chat import app_generator as adv_app_gen_module
|
||||
from core.app.apps.workflow import app_generator as wf_app_gen_module
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.workflow.node_factory import DifyNodeFactory
|
||||
from core.workflow.runtime_state import (
|
||||
bind_graph_runtime_state_to_graph,
|
||||
create_graph_runtime_state,
|
||||
snapshot_graph_runtime_state,
|
||||
)
|
||||
from core.workflow.system_variables import build_system_variables
|
||||
from tests.workflow_test_utils import build_test_graph_init_params
|
||||
|
||||
@@ -168,12 +173,21 @@ def _build_runtime_state(run_id: str) -> GraphRuntimeState:
|
||||
conversation_variables=[],
|
||||
)
|
||||
variable_pool.add(("sys", "workflow_run_id"), run_id)
|
||||
return GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
|
||||
return create_graph_runtime_state(
|
||||
variable_pool=variable_pool,
|
||||
start_at=time.perf_counter(),
|
||||
workflow_id="workflow",
|
||||
)
|
||||
|
||||
|
||||
def _run_with_optional_pause(runtime_state: GraphRuntimeState, *, pause_on: str | None) -> list[GraphEngineEvent]:
|
||||
command_channel = InMemoryChannel()
|
||||
graph = _build_graph(runtime_state, pause_on=pause_on)
|
||||
bind_graph_runtime_state_to_graph(
|
||||
runtime_state,
|
||||
graph,
|
||||
workflow_id="workflow",
|
||||
)
|
||||
engine = GraphEngine(
|
||||
workflow_id="workflow",
|
||||
graph=graph,
|
||||
@@ -204,7 +218,10 @@ def test_workflow_app_pause_resume_matches_baseline(mocker):
|
||||
paused_events = _run_with_optional_pause(paused_state, pause_on="tool_a")
|
||||
assert isinstance(paused_events[-1], GraphRunPausedEvent)
|
||||
paused_nodes = _node_successes(paused_events)
|
||||
snapshot = paused_state.dumps()
|
||||
snapshot = snapshot_graph_runtime_state(
|
||||
paused_state,
|
||||
workflow_id="workflow",
|
||||
)
|
||||
|
||||
resumed_state = GraphRuntimeState.from_snapshot(snapshot)
|
||||
|
||||
@@ -244,7 +261,10 @@ def test_advanced_chat_pause_resume_matches_baseline(mocker):
|
||||
paused_events = _run_with_optional_pause(paused_state, pause_on="tool_a")
|
||||
assert isinstance(paused_events[-1], GraphRunPausedEvent)
|
||||
paused_nodes = _node_successes(paused_events)
|
||||
snapshot = paused_state.dumps()
|
||||
snapshot = snapshot_graph_runtime_state(
|
||||
paused_state,
|
||||
workflow_id="workflow",
|
||||
)
|
||||
|
||||
resumed_state = GraphRuntimeState.from_snapshot(snapshot)
|
||||
|
||||
@@ -281,7 +301,12 @@ def test_resume_emits_resumption_start_reason(mocker) -> None:
|
||||
initial_start = next(event for event in paused_events if isinstance(event, GraphRunStartedEvent))
|
||||
assert initial_start.reason == WorkflowStartReason.INITIAL
|
||||
|
||||
resumed_state = GraphRuntimeState.from_snapshot(paused_state.dumps())
|
||||
resumed_state = GraphRuntimeState.from_snapshot(
|
||||
snapshot_graph_runtime_state(
|
||||
paused_state,
|
||||
workflow_id="workflow",
|
||||
)
|
||||
)
|
||||
resumed_events = _run_with_optional_pause(resumed_state, pause_on=None)
|
||||
resume_start = next(event for event in resumed_events if isinstance(event, GraphRunStartedEvent))
|
||||
assert resume_start.reason == WorkflowStartReason.RESUMPTION
|
||||
|
||||
@@ -27,10 +27,11 @@ from graphon.graph_events import (
|
||||
NodeRunSucceededEvent,
|
||||
)
|
||||
from graphon.node_events import NodeRunResult
|
||||
from graphon.runtime import GraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool
|
||||
from graphon.runtime import ReadOnlyGraphRuntimeStateWrapper, VariablePool
|
||||
|
||||
from core.app.entities.app_invoke_entities import WorkflowAppGenerateEntity
|
||||
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
|
||||
from core.workflow.runtime_state import create_graph_runtime_state
|
||||
from core.workflow.system_variables import SystemVariableKey, build_system_variables
|
||||
|
||||
|
||||
@@ -60,7 +61,11 @@ def _make_layer(
|
||||
workflow_execution_id="run-id",
|
||||
conversation_id="conv-id",
|
||||
)
|
||||
runtime_state = GraphRuntimeState(variable_pool=VariablePool(system_variables=system_variables), start_at=0.0)
|
||||
runtime_state = create_graph_runtime_state(
|
||||
variable_pool=VariablePool(system_variables=system_variables),
|
||||
start_at=0.0,
|
||||
workflow_id="workflow-id",
|
||||
)
|
||||
read_only_state = ReadOnlyGraphRuntimeStateWrapper(runtime_state)
|
||||
|
||||
application_generate_entity = WorkflowAppGenerateEntity.model_construct(
|
||||
|
||||
@@ -622,7 +622,8 @@ class MockIterationNode(MockNodeMixin, IterationNode):
|
||||
from graphon.graph import Graph
|
||||
from graphon.graph_engine import GraphEngine, GraphEngineConfig
|
||||
from graphon.graph_engine.command_channels import InMemoryChannel
|
||||
from graphon.runtime import GraphRuntimeState
|
||||
|
||||
from core.workflow.runtime_state import bind_graph_runtime_state_to_graph, create_graph_runtime_state
|
||||
|
||||
# Import our MockNodeFactory instead of DifyNodeFactory
|
||||
from .test_mock_factory import MockNodeFactory
|
||||
@@ -643,9 +644,10 @@ class MockIterationNode(MockNodeMixin, IterationNode):
|
||||
variable_pool_copy.add([self._node_id, "item"], item)
|
||||
|
||||
# Create a new GraphRuntimeState for this iteration
|
||||
graph_runtime_state_copy = GraphRuntimeState(
|
||||
graph_runtime_state_copy = create_graph_runtime_state(
|
||||
variable_pool=variable_pool_copy,
|
||||
start_at=self.graph_runtime_state.start_at,
|
||||
workflow_id=self.workflow_id,
|
||||
total_tokens=0,
|
||||
node_run_steps=0,
|
||||
)
|
||||
@@ -666,6 +668,11 @@ class MockIterationNode(MockNodeMixin, IterationNode):
|
||||
from graphon.nodes.iteration.exc import IterationGraphNotFoundError
|
||||
|
||||
raise IterationGraphNotFoundError("iteration graph not found")
|
||||
bind_graph_runtime_state_to_graph(
|
||||
graph_runtime_state_copy,
|
||||
iteration_graph,
|
||||
workflow_id=self.workflow_id,
|
||||
)
|
||||
|
||||
# Create a new GraphEngine for this iteration
|
||||
graph_engine = GraphEngine(
|
||||
@@ -694,7 +701,8 @@ class MockLoopNode(MockNodeMixin, LoopNode):
|
||||
from graphon.graph import Graph
|
||||
from graphon.graph_engine import GraphEngine, GraphEngineConfig
|
||||
from graphon.graph_engine.command_channels import InMemoryChannel
|
||||
from graphon.runtime import GraphRuntimeState
|
||||
|
||||
from core.workflow.runtime_state import bind_graph_runtime_state_to_graph, create_graph_runtime_state
|
||||
|
||||
# Import our MockNodeFactory instead of DifyNodeFactory
|
||||
from .test_mock_factory import MockNodeFactory
|
||||
@@ -708,9 +716,10 @@ class MockLoopNode(MockNodeMixin, LoopNode):
|
||||
)
|
||||
|
||||
# Create a new GraphRuntimeState for this iteration
|
||||
graph_runtime_state_copy = GraphRuntimeState(
|
||||
graph_runtime_state_copy = create_graph_runtime_state(
|
||||
variable_pool=self.graph_runtime_state.variable_pool,
|
||||
start_at=start_at.timestamp(),
|
||||
workflow_id=self.workflow_id,
|
||||
)
|
||||
|
||||
# Create a MockNodeFactory with the same mock_config
|
||||
@@ -725,6 +734,11 @@ class MockLoopNode(MockNodeMixin, LoopNode):
|
||||
|
||||
if not loop_graph:
|
||||
raise ValueError("loop graph not found")
|
||||
bind_graph_runtime_state_to_graph(
|
||||
graph_runtime_state_copy,
|
||||
loop_graph,
|
||||
workflow_id=self.workflow_id,
|
||||
)
|
||||
|
||||
# Create a new GraphEngine for this iteration
|
||||
graph_engine = GraphEngine(
|
||||
|
||||
@@ -30,6 +30,11 @@ from core.repositories.human_input_repository import (
|
||||
HumanInputFormRepository,
|
||||
)
|
||||
from core.workflow.node_runtime import DifyHumanInputNodeRuntime
|
||||
from core.workflow.runtime_state import (
|
||||
bind_graph_runtime_state_to_graph,
|
||||
create_graph_runtime_state,
|
||||
snapshot_graph_runtime_state,
|
||||
)
|
||||
from core.workflow.system_variables import build_system_variables
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from tests.workflow_test_utils import build_test_graph_init_params
|
||||
@@ -46,7 +51,10 @@ class InMemoryPauseStore:
|
||||
self._snapshot: str | None = None
|
||||
|
||||
def save(self, runtime_state: GraphRuntimeState) -> None:
|
||||
self._snapshot = runtime_state.dumps()
|
||||
self._snapshot = snapshot_graph_runtime_state(
|
||||
runtime_state,
|
||||
workflow_id="workflow",
|
||||
)
|
||||
|
||||
def load(self) -> GraphRuntimeState:
|
||||
assert self._snapshot is not None
|
||||
@@ -122,7 +130,11 @@ def _build_runtime_state() -> GraphRuntimeState:
|
||||
user_inputs={},
|
||||
conversation_variables=[],
|
||||
)
|
||||
return GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
|
||||
return create_graph_runtime_state(
|
||||
variable_pool=variable_pool,
|
||||
start_at=time.perf_counter(),
|
||||
workflow_id="workflow",
|
||||
)
|
||||
|
||||
|
||||
def _build_graph(runtime_state: GraphRuntimeState, repo: HumanInputFormRepository) -> Graph:
|
||||
@@ -200,6 +212,11 @@ def _build_graph(runtime_state: GraphRuntimeState, repo: HumanInputFormRepositor
|
||||
|
||||
|
||||
def _run_graph(graph: Graph, runtime_state: GraphRuntimeState) -> list[object]:
|
||||
bind_graph_runtime_state_to_graph(
|
||||
runtime_state,
|
||||
graph,
|
||||
workflow_id="workflow",
|
||||
)
|
||||
engine = GraphEngine(
|
||||
workflow_id="workflow",
|
||||
graph=graph,
|
||||
|
||||
@@ -42,6 +42,7 @@ from graphon.variables import (
|
||||
from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, InvokeFrom, UserFrom
|
||||
from core.tools.utils.yaml_utils import _load_yaml_file
|
||||
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id
|
||||
from core.workflow.runtime_state import bind_graph_runtime_state_to_graph, create_graph_runtime_state
|
||||
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
|
||||
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
|
||||
|
||||
@@ -65,9 +66,10 @@ class _TableTestChildEngineBuilder:
|
||||
root_node_id: str,
|
||||
variable_pool: VariablePool | None = None,
|
||||
) -> GraphEngine:
|
||||
child_graph_runtime_state = GraphRuntimeState(
|
||||
child_graph_runtime_state = create_graph_runtime_state(
|
||||
variable_pool=variable_pool if variable_pool is not None else parent_graph_runtime_state.variable_pool,
|
||||
start_at=time.perf_counter(),
|
||||
workflow_id=workflow_id,
|
||||
execution_context=parent_graph_runtime_state.execution_context,
|
||||
)
|
||||
if self._use_mock_factory:
|
||||
@@ -86,6 +88,11 @@ class _TableTestChildEngineBuilder:
|
||||
child_graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=root_node_id)
|
||||
if not child_graph:
|
||||
raise ValueError("child graph not found")
|
||||
bind_graph_runtime_state_to_graph(
|
||||
child_graph_runtime_state,
|
||||
child_graph,
|
||||
workflow_id=workflow_id,
|
||||
)
|
||||
|
||||
child_engine = GraphEngine(
|
||||
workflow_id=workflow_id,
|
||||
@@ -261,7 +268,11 @@ class WorkflowRunner:
|
||||
)
|
||||
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=root_node_inputs)
|
||||
|
||||
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
|
||||
graph_runtime_state = create_graph_runtime_state(
|
||||
variable_pool=variable_pool,
|
||||
start_at=time.perf_counter(),
|
||||
workflow_id=graph_init_params.workflow_id,
|
||||
)
|
||||
|
||||
if use_mock_factory:
|
||||
node_factory = MockNodeFactory(
|
||||
@@ -275,6 +286,11 @@ class WorkflowRunner:
|
||||
node_factory=node_factory,
|
||||
root_node_id=root_node_id,
|
||||
)
|
||||
bind_graph_runtime_state_to_graph(
|
||||
graph_runtime_state,
|
||||
graph,
|
||||
workflow_id=graph_init_params.workflow_id,
|
||||
)
|
||||
|
||||
return graph, graph_runtime_state
|
||||
|
||||
@@ -366,6 +382,11 @@ class TableTestRunner:
|
||||
|
||||
try:
|
||||
graph, graph_runtime_state = self._create_graph_runtime_state(test_case)
|
||||
bind_graph_runtime_state_to_graph(
|
||||
graph_runtime_state,
|
||||
graph,
|
||||
workflow_id="test_workflow",
|
||||
)
|
||||
|
||||
# Create and run the engine with configured worker settings
|
||||
engine = GraphEngine(
|
||||
|
||||
@@ -5,6 +5,8 @@ from graphon.graph_events import (
|
||||
NodeRunStreamChunkEvent,
|
||||
)
|
||||
|
||||
from core.workflow.runtime_state import bind_graph_runtime_state_to_graph
|
||||
|
||||
from .test_table_runner import TableTestRunner
|
||||
|
||||
|
||||
@@ -20,6 +22,11 @@ def test_tool_in_chatflow():
|
||||
query="1",
|
||||
use_mock_factory=True,
|
||||
)
|
||||
bind_graph_runtime_state_to_graph(
|
||||
graph_runtime_state,
|
||||
graph,
|
||||
workflow_id="test_workflow",
|
||||
)
|
||||
|
||||
# Create and run the engine
|
||||
engine = GraphEngine(
|
||||
|
||||
234
api/tests/unit_tests/core/workflow/test_runtime_state.py
Normal file
234
api/tests/unit_tests/core/workflow/test_runtime_state.py
Normal file
@@ -0,0 +1,234 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import sentinel
|
||||
|
||||
import pytest
|
||||
from graphon.graph_engine.domain.graph_execution import GraphExecution
|
||||
from graphon.graph_engine.ready_queue import InMemoryReadyQueue
|
||||
from graphon.model_runtime.entities.llm_entities import LLMUsage
|
||||
from graphon.runtime import GraphRuntimeState, VariablePool
|
||||
|
||||
from core.workflow.runtime_state import (
|
||||
bind_graph_runtime_state_to_graph,
|
||||
create_graph_runtime_state,
|
||||
ensure_graph_runtime_state_initialized,
|
||||
snapshot_graph_runtime_state,
|
||||
)
|
||||
|
||||
|
||||
class _FakeGraph:
|
||||
def __init__(self) -> None:
|
||||
self.nodes: dict[str, object] = {}
|
||||
self.edges: dict[str, object] = {}
|
||||
self.root_node = SimpleNamespace()
|
||||
|
||||
def get_outgoing_edges(self, node_id: str) -> list[object]:
|
||||
_ = node_id
|
||||
return []
|
||||
|
||||
|
||||
def _build_variable_pool() -> VariablePool:
|
||||
return VariablePool()
|
||||
|
||||
|
||||
class TestCreateGraphRuntimeState:
|
||||
def test_initializes_explicit_collaborators_with_defaults(self) -> None:
|
||||
execution_context = sentinel.execution_context
|
||||
|
||||
runtime_state = create_graph_runtime_state(
|
||||
variable_pool=_build_variable_pool(),
|
||||
start_at=12.5,
|
||||
workflow_id="workflow-id",
|
||||
execution_context=execution_context,
|
||||
)
|
||||
|
||||
assert runtime_state.start_at == 12.5
|
||||
assert runtime_state.total_tokens == 0
|
||||
assert runtime_state.node_run_steps == 0
|
||||
assert runtime_state.llm_usage == LLMUsage.empty_usage()
|
||||
assert runtime_state.outputs == {}
|
||||
assert isinstance(runtime_state.ready_queue, InMemoryReadyQueue)
|
||||
assert runtime_state.graph_execution.workflow_id == "workflow-id"
|
||||
assert runtime_state.execution_context is execution_context
|
||||
|
||||
def test_preserves_explicit_scalar_and_usage_values(self) -> None:
|
||||
llm_usage = LLMUsage.empty_usage()
|
||||
llm_usage.total_tokens = 9
|
||||
|
||||
runtime_state = create_graph_runtime_state(
|
||||
variable_pool=_build_variable_pool(),
|
||||
start_at=3.0,
|
||||
workflow_id="workflow-id",
|
||||
total_tokens=7,
|
||||
llm_usage=llm_usage,
|
||||
outputs={"answer": "ok"},
|
||||
node_run_steps=4,
|
||||
)
|
||||
|
||||
assert runtime_state.total_tokens == 7
|
||||
assert runtime_state.llm_usage == llm_usage
|
||||
assert runtime_state.outputs == {"answer": "ok"}
|
||||
assert runtime_state.node_run_steps == 4
|
||||
|
||||
def test_rejects_blank_workflow_id(self) -> None:
|
||||
with pytest.raises(ValueError, match="workflow_id must be a non-empty string"):
|
||||
create_graph_runtime_state(
|
||||
variable_pool=_build_variable_pool(),
|
||||
start_at=0.0,
|
||||
workflow_id="",
|
||||
)
|
||||
|
||||
|
||||
class TestEnsureGraphRuntimeStateInitialized:
|
||||
def test_materializes_missing_ready_queue_and_graph_execution(self) -> None:
|
||||
runtime_state = GraphRuntimeState(
|
||||
variable_pool=_build_variable_pool(),
|
||||
start_at=0.0,
|
||||
)
|
||||
|
||||
ensure_graph_runtime_state_initialized(
|
||||
runtime_state,
|
||||
workflow_id="workflow-id",
|
||||
)
|
||||
|
||||
assert isinstance(runtime_state.ready_queue, InMemoryReadyQueue)
|
||||
assert runtime_state.graph_execution.workflow_id == "workflow-id"
|
||||
|
||||
def test_backfills_empty_graph_execution_workflow_id(self) -> None:
|
||||
graph_execution = GraphExecution(workflow_id="")
|
||||
runtime_state = GraphRuntimeState(
|
||||
variable_pool=_build_variable_pool(),
|
||||
start_at=0.0,
|
||||
ready_queue=InMemoryReadyQueue(),
|
||||
graph_execution=graph_execution,
|
||||
)
|
||||
|
||||
ensure_graph_runtime_state_initialized(
|
||||
runtime_state,
|
||||
workflow_id="workflow-id",
|
||||
)
|
||||
|
||||
assert runtime_state.graph_execution is graph_execution
|
||||
assert graph_execution.workflow_id == "workflow-id"
|
||||
|
||||
def test_preserves_existing_ready_queue_and_graph_execution(self) -> None:
|
||||
ready_queue = InMemoryReadyQueue()
|
||||
graph_execution = GraphExecution(workflow_id="workflow-id")
|
||||
runtime_state = GraphRuntimeState(
|
||||
variable_pool=_build_variable_pool(),
|
||||
start_at=0.0,
|
||||
ready_queue=ready_queue,
|
||||
graph_execution=graph_execution,
|
||||
)
|
||||
|
||||
ensure_graph_runtime_state_initialized(
|
||||
runtime_state,
|
||||
workflow_id="workflow-id",
|
||||
)
|
||||
|
||||
assert runtime_state.ready_queue is ready_queue
|
||||
assert runtime_state.graph_execution is graph_execution
|
||||
|
||||
def test_rejects_mismatched_workflow_id(self) -> None:
|
||||
runtime_state = GraphRuntimeState(
|
||||
variable_pool=_build_variable_pool(),
|
||||
start_at=0.0,
|
||||
graph_execution=GraphExecution(workflow_id="other-workflow"),
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="workflow_id does not match graph execution workflow_id"):
|
||||
ensure_graph_runtime_state_initialized(
|
||||
runtime_state,
|
||||
workflow_id="workflow-id",
|
||||
)
|
||||
|
||||
|
||||
class TestBindGraphRuntimeStateToGraph:
|
||||
def test_creates_response_coordinator_and_attaches_graph(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
runtime_state = GraphRuntimeState(
|
||||
variable_pool=_build_variable_pool(),
|
||||
start_at=0.0,
|
||||
)
|
||||
graph = _FakeGraph()
|
||||
coordinator = sentinel.response_coordinator
|
||||
|
||||
monkeypatch.setattr(
|
||||
"core.workflow.runtime_state.ResponseStreamCoordinator",
|
||||
lambda *, variable_pool, graph: (
|
||||
coordinator if variable_pool is runtime_state.variable_pool and graph is graph else None
|
||||
),
|
||||
)
|
||||
|
||||
bind_graph_runtime_state_to_graph(
|
||||
runtime_state,
|
||||
graph,
|
||||
workflow_id="workflow-id",
|
||||
)
|
||||
|
||||
assert runtime_state.response_coordinator is coordinator
|
||||
|
||||
def test_preserves_existing_response_coordinator(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
existing_coordinator = sentinel.existing_coordinator
|
||||
runtime_state = GraphRuntimeState(
|
||||
variable_pool=_build_variable_pool(),
|
||||
start_at=0.0,
|
||||
response_coordinator=existing_coordinator,
|
||||
)
|
||||
|
||||
monkeypatch.setattr(
|
||||
"core.workflow.runtime_state.ResponseStreamCoordinator",
|
||||
lambda **kwargs: pytest.fail(f"unexpected response coordinator construction: {kwargs}"),
|
||||
)
|
||||
|
||||
bind_graph_runtime_state_to_graph(
|
||||
runtime_state,
|
||||
_FakeGraph(),
|
||||
workflow_id="workflow-id",
|
||||
)
|
||||
|
||||
assert runtime_state.response_coordinator is existing_coordinator
|
||||
|
||||
def test_rejects_attaching_a_different_graph_instance(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setattr(
|
||||
"core.workflow.runtime_state.ResponseStreamCoordinator",
|
||||
lambda **kwargs: sentinel.response_coordinator,
|
||||
)
|
||||
runtime_state = GraphRuntimeState(
|
||||
variable_pool=_build_variable_pool(),
|
||||
start_at=0.0,
|
||||
)
|
||||
first_graph = _FakeGraph()
|
||||
second_graph = _FakeGraph()
|
||||
|
||||
bind_graph_runtime_state_to_graph(
|
||||
runtime_state,
|
||||
first_graph,
|
||||
workflow_id="workflow-id",
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="already attached to a different graph instance"):
|
||||
bind_graph_runtime_state_to_graph(
|
||||
runtime_state,
|
||||
second_graph,
|
||||
workflow_id="workflow-id",
|
||||
)
|
||||
|
||||
|
||||
class TestSnapshotGraphRuntimeState:
|
||||
def test_serializes_sparse_state_after_explicit_initialization(self) -> None:
|
||||
runtime_state = GraphRuntimeState(
|
||||
variable_pool=_build_variable_pool(),
|
||||
start_at=0.0,
|
||||
)
|
||||
|
||||
snapshot = snapshot_graph_runtime_state(
|
||||
runtime_state,
|
||||
workflow_id="workflow-id",
|
||||
)
|
||||
payload = json.loads(snapshot)
|
||||
|
||||
assert payload["ready_queue"] is not None
|
||||
assert json.loads(payload["graph_execution"])["workflow_id"] == "workflow-id"
|
||||
assert payload["outputs"] == {}
|
||||
@@ -107,11 +107,12 @@ class TestWorkflowChildEngineBuilder:
|
||||
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
|
||||
patch.object(
|
||||
workflow_entry,
|
||||
"GraphRuntimeState",
|
||||
"create_graph_runtime_state",
|
||||
return_value=child_graph_runtime_state,
|
||||
) as graph_runtime_state_cls,
|
||||
) as create_graph_runtime_state,
|
||||
patch.object(workflow_entry, "DifyNodeFactory", return_value=sentinel.factory) as dify_node_factory,
|
||||
patch.object(workflow_entry.Graph, "init", return_value=child_graph) as graph_init,
|
||||
patch.object(workflow_entry, "bind_graph_runtime_state_to_graph") as bind_runtime_state,
|
||||
patch.object(workflow_entry, "GraphEngine", return_value=child_engine) as graph_engine_cls,
|
||||
patch.object(workflow_entry, "GraphEngineConfig", return_value=sentinel.graph_engine_config),
|
||||
patch.object(workflow_entry, "InMemoryChannel", return_value=sentinel.command_channel),
|
||||
@@ -126,9 +127,10 @@ class TestWorkflowChildEngineBuilder:
|
||||
)
|
||||
|
||||
assert result is child_engine
|
||||
graph_runtime_state_cls.assert_called_once_with(
|
||||
create_graph_runtime_state.assert_called_once_with(
|
||||
variable_pool=sentinel.child_variable_pool,
|
||||
start_at=123.0,
|
||||
workflow_id="workflow-id",
|
||||
execution_context=sentinel.execution_context,
|
||||
)
|
||||
dify_node_factory.assert_called_once_with(
|
||||
@@ -140,6 +142,11 @@ class TestWorkflowChildEngineBuilder:
|
||||
node_factory=sentinel.factory,
|
||||
root_node_id="root",
|
||||
)
|
||||
bind_runtime_state.assert_called_once_with(
|
||||
child_graph_runtime_state,
|
||||
child_graph,
|
||||
workflow_id="workflow-id",
|
||||
)
|
||||
graph_engine_cls.assert_called_once_with(
|
||||
workflow_id="workflow-id",
|
||||
graph=child_graph,
|
||||
@@ -247,6 +254,7 @@ class TestWorkflowEntryInit:
|
||||
patch.object(workflow_entry.dify_config, "ENABLE_OTEL", False),
|
||||
patch.object(workflow_entry, "is_instrument_flag_enabled", return_value=True),
|
||||
patch.object(workflow_entry, "capture_current_context", return_value=sentinel.execution_context),
|
||||
patch.object(workflow_entry, "bind_graph_runtime_state_to_graph"),
|
||||
patch.object(workflow_entry, "GraphEngine", return_value=graph_engine) as graph_engine_cls,
|
||||
patch.object(workflow_entry, "GraphEngineConfig", return_value=sentinel.graph_engine_config),
|
||||
patch.object(workflow_entry, "InMemoryChannel", return_value=sentinel.command_channel),
|
||||
@@ -352,7 +360,7 @@ class TestWorkflowEntrySingleStepRun:
|
||||
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
|
||||
patch.object(
|
||||
workflow_entry,
|
||||
"GraphRuntimeState",
|
||||
"create_graph_runtime_state",
|
||||
return_value=SimpleNamespace(variable_pool=variable_pool),
|
||||
),
|
||||
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
|
||||
@@ -413,7 +421,7 @@ class TestWorkflowEntrySingleStepRun:
|
||||
|
||||
with (
|
||||
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
|
||||
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
|
||||
patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state),
|
||||
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
|
||||
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
|
||||
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeNode),
|
||||
@@ -482,7 +490,7 @@ class TestWorkflowEntrySingleStepRun:
|
||||
|
||||
with (
|
||||
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
|
||||
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
|
||||
patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state),
|
||||
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
|
||||
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
|
||||
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeDatasourceNode),
|
||||
@@ -542,7 +550,7 @@ class TestWorkflowEntrySingleStepRun:
|
||||
|
||||
with (
|
||||
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
|
||||
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
|
||||
patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state),
|
||||
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
|
||||
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
|
||||
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeNode),
|
||||
@@ -653,7 +661,9 @@ class TestWorkflowEntryHelpers:
|
||||
patch.object(
|
||||
workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context
|
||||
) as graph_init_context_cls,
|
||||
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
|
||||
patch.object(
|
||||
workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state
|
||||
) as create_graph_runtime_state,
|
||||
patch.object(
|
||||
workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}
|
||||
) as build_dify_run_context,
|
||||
@@ -693,13 +703,14 @@ class TestWorkflowEntryHelpers:
|
||||
invoke_from=InvokeFrom.DEBUGGER,
|
||||
)
|
||||
graph_init_context_cls.assert_called_once_with(
|
||||
workflow_id="",
|
||||
workflow_id="free-node:node-id",
|
||||
graph_config=workflow_entry.WorkflowEntry._create_single_node_graph(
|
||||
"node-id", {"type": BuiltinNodeTypes.PARAMETER_EXTRACTOR, "title": "Node"}
|
||||
),
|
||||
run_context={"_dify": "context"},
|
||||
call_depth=0,
|
||||
)
|
||||
create_graph_runtime_state.assert_called_once()
|
||||
dify_node_factory_cls.assert_called_once_with(
|
||||
graph_init_context=sentinel.graph_init_context,
|
||||
graph_runtime_state=sentinel.graph_runtime_state,
|
||||
@@ -739,7 +750,7 @@ class TestWorkflowEntryHelpers:
|
||||
patch.object(workflow_entry, "VariablePool", return_value=sentinel.variable_pool),
|
||||
patch.object(workflow_entry, "add_variables_to_pool"),
|
||||
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
|
||||
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
|
||||
patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state),
|
||||
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
|
||||
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
|
||||
patch.object(
|
||||
|
||||
@@ -26,7 +26,10 @@ class TestWorkflowEntryRedisChannel:
|
||||
redis_channel = RedisChannel(mock_redis_client, "test:channel:key")
|
||||
|
||||
# Patch GraphEngine to verify it receives the Redis channel
|
||||
with patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine:
|
||||
with (
|
||||
patch("core.workflow.workflow_entry.bind_graph_runtime_state_to_graph"),
|
||||
patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine,
|
||||
):
|
||||
mock_graph_engine = MockGraphEngine.return_value # Create WorkflowEntry with Redis channel
|
||||
workflow_entry = WorkflowEntry(
|
||||
tenant_id="test-tenant",
|
||||
@@ -60,6 +63,7 @@ class TestWorkflowEntryRedisChannel:
|
||||
|
||||
# Patch GraphEngine and InMemoryChannel
|
||||
with (
|
||||
patch("core.workflow.workflow_entry.bind_graph_runtime_state_to_graph"),
|
||||
patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine,
|
||||
patch("core.workflow.workflow_entry.InMemoryChannel", autospec=True) as MockInMemoryChannel,
|
||||
):
|
||||
@@ -107,7 +111,10 @@ class TestWorkflowEntryRedisChannel:
|
||||
mock_event2 = MagicMock()
|
||||
|
||||
# Patch GraphEngine
|
||||
with patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine:
|
||||
with (
|
||||
patch("core.workflow.workflow_entry.bind_graph_runtime_state_to_graph"),
|
||||
patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine,
|
||||
):
|
||||
mock_graph_engine = MagicMock()
|
||||
mock_graph_engine.run.return_value = iter([mock_event1, mock_event2])
|
||||
MockGraphEngine.return_value = mock_graph_engine
|
||||
|
||||
@@ -2754,8 +2754,8 @@ class TestWorkflowServiceFreeNodeExecution:
|
||||
|
||||
with (
|
||||
patch("services.workflow_service.DifyGraphInitContext") as mock_graph_init_context_cls,
|
||||
patch("services.workflow_service.GraphRuntimeState"),
|
||||
patch("services.workflow_service.build_dify_run_context") as mock_build_dify_run_context,
|
||||
patch("services.workflow_service.create_graph_runtime_state"),
|
||||
patch("services.workflow_service.DifyHumanInputNodeRuntime") as mock_runtime_cls,
|
||||
patch("services.workflow_service.HumanInputNode") as mock_node_cls,
|
||||
):
|
||||
|
||||
@@ -9,11 +9,12 @@ from threading import Event
|
||||
import pytest
|
||||
from graphon.entities.pause_reason import HumanInputRequired
|
||||
from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionStatus
|
||||
from graphon.runtime import GraphRuntimeState, VariablePool
|
||||
from graphon.runtime import VariablePool
|
||||
|
||||
from core.app.app_config.entities import WorkflowUIBasedAppConfig
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
|
||||
from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext, _WorkflowGenerateEntityWrapper
|
||||
from core.workflow.runtime_state import create_graph_runtime_state, snapshot_graph_runtime_state
|
||||
from models.enums import CreatorUserRole
|
||||
from models.model import AppMode
|
||||
from models.workflow import WorkflowRun
|
||||
@@ -116,13 +117,20 @@ def _build_resumption_context(task_id: str) -> WorkflowResumptionContext:
|
||||
call_depth=0,
|
||||
workflow_execution_id="run-1",
|
||||
)
|
||||
runtime_state = GraphRuntimeState(variable_pool=VariablePool(), start_at=0.0)
|
||||
runtime_state = create_graph_runtime_state(
|
||||
variable_pool=VariablePool(),
|
||||
start_at=0.0,
|
||||
workflow_id="workflow-1",
|
||||
)
|
||||
runtime_state.register_paused_node("node-1")
|
||||
runtime_state.outputs = {"result": "value"}
|
||||
wrapper = _WorkflowGenerateEntityWrapper(entity=generate_entity)
|
||||
return WorkflowResumptionContext(
|
||||
generate_entity=wrapper,
|
||||
serialized_graph_runtime_state=runtime_state.dumps(),
|
||||
serialized_graph_runtime_state=snapshot_graph_runtime_state(
|
||||
runtime_state,
|
||||
workflow_id="workflow-1",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user