Compare commits

...

4 Commits

Author SHA1 Message Date
autofix-ci[bot]
af7dad4412 [autofix.ci] apply automated fixes 2026-04-10 19:21:06 +08:00
-LAN-
f33840a105 test(workflow): add branch coverage for runtime state helpers 2026-04-10 19:20:45 +08:00
-LAN-
91f77543ea fix(workflow): repair explicit runtime state CI regressions 2026-04-10 19:20:45 +08:00
-LAN-
aaa6629619 Initialize workflow runtime state explicitly 2026-04-10 19:18:22 +08:00
21 changed files with 602 additions and 52 deletions

View File

@@ -33,6 +33,7 @@ from core.moderation.base import ModerationError
from core.moderation.input_moderation import InputModeration
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import get_default_root_node_id
from core.workflow.runtime_state import create_graph_runtime_state
from core.workflow.system_variables import (
build_bootstrap_variables,
build_system_variables,
@@ -188,7 +189,11 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=new_inputs)
# init graph
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.time())
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.time(),
workflow_id=self._workflow.id,
)
graph = self._init_graph(
graph_config=self._workflow.graph_dict,
graph_runtime_state=graph_runtime_state,

View File

@@ -22,6 +22,7 @@ from core.app.entities.app_invoke_entities import (
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import DifyGraphInitContext, DifyNodeFactory, get_default_root_node_id
from core.workflow.runtime_state import create_graph_runtime_state
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
@@ -157,7 +158,11 @@ class PipelineRunner(WorkflowBasedAppRunner):
workflow.graph_dict
)
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=inputs)
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.perf_counter(),
workflow_id=workflow.id,
)
# init graph
graph = self._init_rag_pipeline_graph(

View File

@@ -16,6 +16,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import get_default_root_node_id
from core.workflow.runtime_state import create_graph_runtime_state
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
@@ -118,7 +119,11 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
root_node_id = self._root_node_id or get_default_root_node_id(self._workflow.graph_dict)
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=inputs)
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.perf_counter(),
workflow_id=self._workflow.id,
)
graph = self._init_graph(
graph_config=self._workflow.graph_dict,
graph_runtime_state=graph_runtime_state,

View File

@@ -72,6 +72,7 @@ from core.workflow.node_factory import (
get_default_root_node_id,
resolve_workflow_node_class,
)
from core.workflow.runtime_state import create_graph_runtime_state
from core.workflow.system_variables import (
build_bootstrap_variables,
default_system_variables,
@@ -187,6 +188,9 @@ class WorkflowBasedAppRunner:
Raises:
ValueError: If neither single_iteration_run nor single_loop_run is specified
"""
if single_iteration_run is None and single_loop_run is None:
raise ValueError("Neither single_iteration_run nor single_loop_run is specified")
# Create initial runtime state with variable pool containing environment variables
variable_pool = VariablePool()
add_variables_to_pool(
@@ -196,10 +200,14 @@ class WorkflowBasedAppRunner:
environment_variables=workflow.environment_variables,
),
)
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.time())
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.time(),
workflow_id=workflow.id,
)
# Determine which type of single node execution and get graph/variable_pool
if single_iteration_run:
if single_iteration_run is not None:
graph, variable_pool = self._get_graph_and_variable_pool_for_single_node_run(
workflow=workflow,
node_id=single_iteration_run.node_id,
@@ -209,7 +217,8 @@ class WorkflowBasedAppRunner:
node_type_label="iteration",
user_id=user_id,
)
elif single_loop_run:
else:
assert single_loop_run is not None
graph, variable_pool = self._get_graph_and_variable_pool_for_single_node_run(
workflow=workflow,
node_id=single_loop_run.node_id,
@@ -219,9 +228,6 @@ class WorkflowBasedAppRunner:
node_type_label="loop",
user_id=user_id,
)
else:
raise ValueError("Neither single_iteration_run nor single_loop_run is specified")
# Return the graph, variable_pool, and the same graph_runtime_state used during graph creation
# This ensures all nodes in the graph reference the same GraphRuntimeState instance
return graph, variable_pool, graph_runtime_state

View File

@@ -0,0 +1,127 @@
"""Helpers for explicitly wiring GraphRuntimeState collaborators.
GraphOn currently supports lazy construction of several runtime-state
collaborators such as the ready queue, graph execution aggregate, and response
coordinator. Dify initializes those collaborators eagerly so repository code
does not depend on that implicit behavior.
"""
from __future__ import annotations
from contextlib import AbstractContextManager
from typing import Any, cast
from graphon.graph import Graph
from graphon.graph_engine.domain.graph_execution import GraphExecution
from graphon.graph_engine.ready_queue import InMemoryReadyQueue
from graphon.graph_engine.response_coordinator import ResponseStreamCoordinator
from graphon.model_runtime.entities.llm_entities import LLMUsage
from graphon.runtime import GraphRuntimeState, VariablePool
def _require_workflow_id(workflow_id: str) -> str:
"""Validate that workflow-scoped runtime collaborators receive a real id."""
if not workflow_id:
raise ValueError("workflow_id must be a non-empty string")
return workflow_id
def create_graph_runtime_state(
*,
variable_pool: VariablePool,
start_at: float,
workflow_id: str,
total_tokens: int = 0,
llm_usage: LLMUsage | None = None,
outputs: dict[str, object] | None = None,
node_run_steps: int = 0,
execution_context: AbstractContextManager[object] | None = None,
) -> GraphRuntimeState:
"""Create a runtime state with explicit non-graph collaborators.
The graph itself is attached later, once node construction has completed and
the final Graph instance exists.
"""
workflow_id = _require_workflow_id(workflow_id)
return GraphRuntimeState(
variable_pool=variable_pool,
start_at=start_at,
total_tokens=total_tokens,
llm_usage=llm_usage or LLMUsage.empty_usage(),
outputs=outputs or {},
node_run_steps=node_run_steps,
ready_queue=InMemoryReadyQueue(),
graph_execution=GraphExecution(workflow_id=workflow_id),
execution_context=execution_context,
)
def ensure_graph_runtime_state_initialized(
graph_runtime_state: GraphRuntimeState,
*,
workflow_id: str,
) -> GraphRuntimeState:
"""Materialize non-graph collaborators when loading legacy or sparse state."""
workflow_id = _require_workflow_id(workflow_id)
state = cast(Any, graph_runtime_state)
if state._ready_queue is None:
state._ready_queue = InMemoryReadyQueue()
graph_execution = state._graph_execution
if graph_execution is None:
state._graph_execution = GraphExecution(
workflow_id=workflow_id,
)
elif not graph_execution.workflow_id:
graph_execution.workflow_id = workflow_id
elif graph_execution.workflow_id != workflow_id:
raise ValueError("GraphRuntimeState workflow_id does not match graph execution workflow_id")
return graph_runtime_state
def bind_graph_runtime_state_to_graph(
graph_runtime_state: GraphRuntimeState,
graph: Graph,
*,
workflow_id: str,
) -> GraphRuntimeState:
"""Attach graph-scoped collaborators without relying on GraphOn lazy setup."""
ensure_graph_runtime_state_initialized(
graph_runtime_state,
workflow_id=workflow_id,
)
state = cast(Any, graph_runtime_state)
graph_protocol = cast(Any, graph)
attached_graph = state._graph
if attached_graph is not None and attached_graph is not graph:
raise ValueError("GraphRuntimeState already attached to a different graph instance")
if state._response_coordinator is None:
response_coordinator = ResponseStreamCoordinator(
variable_pool=graph_runtime_state.variable_pool,
graph=graph_protocol,
)
state._response_coordinator = response_coordinator
graph_runtime_state.attach_graph(graph_protocol)
return graph_runtime_state
def snapshot_graph_runtime_state(
graph_runtime_state: GraphRuntimeState,
*,
workflow_id: str,
) -> str:
"""Serialize runtime state after explicit collaborator initialization."""
ensure_graph_runtime_state_initialized(
graph_runtime_state,
workflow_id=workflow_id,
)
return graph_runtime_state.dumps()

View File

@@ -30,6 +30,10 @@ from core.workflow.node_factory import (
is_start_node_type,
resolve_workflow_node_class,
)
from core.workflow.runtime_state import (
bind_graph_runtime_state_to_graph,
create_graph_runtime_state,
)
from core.workflow.system_variables import (
default_system_variables,
get_node_creation_preload_selectors,
@@ -46,6 +50,10 @@ logger = logging.getLogger(__name__)
_file_access_controller = DatabaseFileAccessController()
def _build_free_node_workflow_id(node_id: str) -> str:
return f"free-node:{node_id}"
class _WorkflowChildEngineBuilder:
@staticmethod
def _has_node_id(graph_config: Mapping[str, Any], node_id: str) -> bool | None:
@@ -77,9 +85,10 @@ class _WorkflowChildEngineBuilder:
variable_pool: VariablePool | None = None,
) -> GraphEngine:
"""Build a child engine with a fresh runtime state and only child-safe layers."""
child_graph_runtime_state = GraphRuntimeState(
child_graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool if variable_pool is not None else parent_graph_runtime_state.variable_pool,
start_at=time.perf_counter(),
workflow_id=workflow_id,
execution_context=parent_graph_runtime_state.execution_context,
)
node_factory = DifyNodeFactory(
@@ -97,6 +106,11 @@ class _WorkflowChildEngineBuilder:
node_factory=node_factory,
root_node_id=root_node_id,
)
bind_graph_runtime_state_to_graph(
child_graph_runtime_state,
child_graph,
workflow_id=workflow_id,
)
command_channel = InMemoryChannel()
config = GraphEngineConfig()
@@ -177,6 +191,11 @@ class WorkflowEntry:
self.command_channel = command_channel
execution_context = capture_current_context()
graph_runtime_state.execution_context = execution_context
bind_graph_runtime_state_to_graph(
graph_runtime_state,
graph,
workflow_id=workflow_id,
)
self._child_engine_builder = _WorkflowChildEngineBuilder()
self.graph_engine = GraphEngine(
workflow_id=workflow_id,
@@ -270,9 +289,10 @@ class WorkflowEntry:
run_context=run_context,
call_depth=0,
)
graph_runtime_state = GraphRuntimeState(
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.perf_counter(),
workflow_id=workflow.id,
execution_context=capture_current_context(),
)
@@ -410,6 +430,7 @@ class WorkflowEntry:
node_cls = resolve_workflow_node_class(node_type=node_type, node_version="1")
if not node_cls:
raise ValueError(f"Node class not found for node type {node_type}")
workflow_id = _build_free_node_workflow_id(node_id)
# init variable pool
variable_pool = VariablePool()
@@ -424,14 +445,15 @@ class WorkflowEntry:
invoke_from=InvokeFrom.DEBUGGER,
)
graph_init_context = DifyGraphInitContext(
workflow_id="",
workflow_id=workflow_id,
graph_config=graph_dict,
run_context=run_context,
call_depth=0,
)
graph_runtime_state = GraphRuntimeState(
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.perf_counter(),
workflow_id=workflow_id,
execution_context=capture_current_context(),
)

View File

@@ -25,7 +25,7 @@ from graphon.nodes.human_input.entities import HumanInputNodeData, validate_huma
from graphon.nodes.human_input.enums import HumanInputFormKind
from graphon.nodes.human_input.human_input_node import HumanInputNode
from graphon.nodes.start.entities import StartNodeData
from graphon.runtime import GraphRuntimeState, VariablePool
from graphon.runtime import VariablePool
from graphon.variable_loader import load_into_variable_pool
from graphon.variables import VariableBase
from graphon.variables.input_entities import VariableEntityType
@@ -55,6 +55,7 @@ from core.workflow.node_factory import (
is_start_node_type,
)
from core.workflow.node_runtime import DifyHumanInputNodeRuntime, apply_dify_debug_email_recipient
from core.workflow.runtime_state import create_graph_runtime_state
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables, default_system_variables
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
@@ -1151,9 +1152,10 @@ class WorkflowService:
call_depth=0,
)
graph_init_params = graph_init_context.to_graph_init_params()
graph_runtime_state = GraphRuntimeState(
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.perf_counter(),
workflow_id=workflow.id,
)
node = HumanInputNode(
id=node_config["id"],

View File

@@ -28,7 +28,7 @@ from graphon.graph_engine.entities.commands import GraphEngineCommand
from graphon.graph_engine.layers.base import GraphEngineLayerNotInitializedError
from graphon.graph_events import GraphRunPausedEvent
from graphon.model_runtime.entities.llm_entities import LLMUsage
from graphon.runtime import GraphRuntimeState, ReadOnlyGraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool
from graphon.runtime import ReadOnlyGraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool
from sqlalchemy import Engine, delete, select
from sqlalchemy.orm import Session
@@ -38,6 +38,7 @@ from core.app.layers.pause_state_persist_layer import (
PauseStatePersistenceLayer,
WorkflowResumptionContext,
)
from core.workflow.runtime_state import create_graph_runtime_state
from core.workflow.system_variables import build_system_variables
from extensions.ext_storage import storage
from libs.datetime_utils import naive_utc_now
@@ -203,11 +204,13 @@ class TestPauseStatePersistenceLayerTestContainers:
node_run_steps: int = 0,
variables: dict[tuple[str, str], object] | None = None,
workflow_run_id: str | None = None,
workflow_id: str | None = None,
) -> ReadOnlyGraphRuntimeState:
"""Create a real GraphRuntimeState for testing."""
start_at = time()
execution_id = workflow_run_id or getattr(self, "test_workflow_run_id", None) or str(uuid.uuid4())
resolved_workflow_id = workflow_id or getattr(self, "test_workflow_id", None) or str(uuid.uuid4())
# Create variable pool
variable_pool = VariablePool(system_variables=build_system_variables(workflow_execution_id=execution_id))
@@ -219,9 +222,10 @@ class TestPauseStatePersistenceLayerTestContainers:
llm_usage = LLMUsage.empty_usage()
# Create graph runtime state
graph_runtime_state = GraphRuntimeState(
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=start_at,
workflow_id=resolved_workflow_id,
total_tokens=total_tokens,
llm_usage=llm_usage,
outputs=outputs or {},

View File

@@ -26,6 +26,11 @@ from core.repositories.human_input_repository import HumanInputFormEntity, Human
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.node_runtime import DifyHumanInputNodeRuntime
from core.workflow.runtime_state import (
bind_graph_runtime_state_to_graph,
create_graph_runtime_state,
snapshot_graph_runtime_state,
)
from core.workflow.system_variables import build_system_variables
from libs.datetime_utils import naive_utc_now
from models import Account
@@ -76,7 +81,11 @@ def _build_runtime_state(workflow_execution_id: str, app_id: str, workflow_id: s
user_inputs={},
conversation_variables=[],
)
return GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
return create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.perf_counter(),
workflow_id=workflow_id,
)
def _build_graph(
@@ -285,6 +294,11 @@ class TestHumanInputResumeNodeExecutionIntegration:
)
def _run_graph(self, graph: Graph, runtime_state: GraphRuntimeState, execution_id: str) -> None:
bind_graph_runtime_state_to_graph(
runtime_state,
graph,
workflow_id=self.workflow.id,
)
engine = GraphEngine(
workflow_id=self.workflow.id,
graph=graph,
@@ -314,7 +328,10 @@ class TestHumanInputResumeNodeExecutionIntegration:
)
self._run_graph(paused_graph, runtime_state, execution_id)
snapshot = runtime_state.dumps()
snapshot = snapshot_graph_runtime_state(
runtime_state,
workflow_id=self.workflow.id,
)
resumed_state = GraphRuntimeState.from_snapshot(snapshot)
resume_repo = _mock_form_repository_with_submission(action_id="continue")
resumed_graph = _build_graph(

View File

@@ -5,7 +5,7 @@ from unittest.mock import patch
import pytest
from graphon.enums import WorkflowExecutionStatus
from graphon.nodes.human_input.entities import HumanInputNodeData
from graphon.runtime import GraphRuntimeState, VariablePool
from graphon.runtime import VariablePool
from configs import dify_config
from core.app.app_config.entities import WorkflowUIBasedAppConfig
@@ -19,6 +19,7 @@ from core.workflow.human_input_compat import (
ExternalRecipient,
MemberRecipient,
)
from core.workflow.runtime_state import create_graph_runtime_state, snapshot_graph_runtime_state
from extensions.ext_storage import storage
from models.account import Account, AccountStatus, Tenant, TenantAccountJoin, TenantAccountRole
from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom
@@ -136,7 +137,11 @@ def _create_workflow_pause_state(
)
db_session_with_containers.add(workflow_run)
runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0)
runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=0.0,
workflow_id=workflow_id,
)
resumption_context = WorkflowResumptionContext(
generate_entity={
"type": AppMode.WORKFLOW,
@@ -156,7 +161,10 @@ def _create_workflow_pause_state(
workflow_execution_id=workflow_run_id,
),
},
serialized_graph_runtime_state=runtime_state.dumps(),
serialized_graph_runtime_state=snapshot_graph_runtime_state(
runtime_state,
workflow_id=workflow_id,
),
)
state_object_key = f"workflow_pause_states/{workflow_run_id}.json"

View File

@@ -30,6 +30,11 @@ from core.app.apps.advanced_chat import app_generator as adv_app_gen_module
from core.app.apps.workflow import app_generator as wf_app_gen_module
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.node_factory import DifyNodeFactory
from core.workflow.runtime_state import (
bind_graph_runtime_state_to_graph,
create_graph_runtime_state,
snapshot_graph_runtime_state,
)
from core.workflow.system_variables import build_system_variables
from tests.workflow_test_utils import build_test_graph_init_params
@@ -168,12 +173,21 @@ def _build_runtime_state(run_id: str) -> GraphRuntimeState:
conversation_variables=[],
)
variable_pool.add(("sys", "workflow_run_id"), run_id)
return GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
return create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.perf_counter(),
workflow_id="workflow",
)
def _run_with_optional_pause(runtime_state: GraphRuntimeState, *, pause_on: str | None) -> list[GraphEngineEvent]:
command_channel = InMemoryChannel()
graph = _build_graph(runtime_state, pause_on=pause_on)
bind_graph_runtime_state_to_graph(
runtime_state,
graph,
workflow_id="workflow",
)
engine = GraphEngine(
workflow_id="workflow",
graph=graph,
@@ -204,7 +218,10 @@ def test_workflow_app_pause_resume_matches_baseline(mocker):
paused_events = _run_with_optional_pause(paused_state, pause_on="tool_a")
assert isinstance(paused_events[-1], GraphRunPausedEvent)
paused_nodes = _node_successes(paused_events)
snapshot = paused_state.dumps()
snapshot = snapshot_graph_runtime_state(
paused_state,
workflow_id="workflow",
)
resumed_state = GraphRuntimeState.from_snapshot(snapshot)
@@ -244,7 +261,10 @@ def test_advanced_chat_pause_resume_matches_baseline(mocker):
paused_events = _run_with_optional_pause(paused_state, pause_on="tool_a")
assert isinstance(paused_events[-1], GraphRunPausedEvent)
paused_nodes = _node_successes(paused_events)
snapshot = paused_state.dumps()
snapshot = snapshot_graph_runtime_state(
paused_state,
workflow_id="workflow",
)
resumed_state = GraphRuntimeState.from_snapshot(snapshot)
@@ -281,7 +301,12 @@ def test_resume_emits_resumption_start_reason(mocker) -> None:
initial_start = next(event for event in paused_events if isinstance(event, GraphRunStartedEvent))
assert initial_start.reason == WorkflowStartReason.INITIAL
resumed_state = GraphRuntimeState.from_snapshot(paused_state.dumps())
resumed_state = GraphRuntimeState.from_snapshot(
snapshot_graph_runtime_state(
paused_state,
workflow_id="workflow",
)
)
resumed_events = _run_with_optional_pause(resumed_state, pause_on=None)
resume_start = next(event for event in resumed_events if isinstance(event, GraphRunStartedEvent))
assert resume_start.reason == WorkflowStartReason.RESUMPTION

View File

@@ -27,10 +27,11 @@ from graphon.graph_events import (
NodeRunSucceededEvent,
)
from graphon.node_events import NodeRunResult
from graphon.runtime import GraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool
from graphon.runtime import ReadOnlyGraphRuntimeStateWrapper, VariablePool
from core.app.entities.app_invoke_entities import WorkflowAppGenerateEntity
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.workflow.runtime_state import create_graph_runtime_state
from core.workflow.system_variables import SystemVariableKey, build_system_variables
@@ -60,7 +61,11 @@ def _make_layer(
workflow_execution_id="run-id",
conversation_id="conv-id",
)
runtime_state = GraphRuntimeState(variable_pool=VariablePool(system_variables=system_variables), start_at=0.0)
runtime_state = create_graph_runtime_state(
variable_pool=VariablePool(system_variables=system_variables),
start_at=0.0,
workflow_id="workflow-id",
)
read_only_state = ReadOnlyGraphRuntimeStateWrapper(runtime_state)
application_generate_entity = WorkflowAppGenerateEntity.model_construct(

View File

@@ -622,7 +622,8 @@ class MockIterationNode(MockNodeMixin, IterationNode):
from graphon.graph import Graph
from graphon.graph_engine import GraphEngine, GraphEngineConfig
from graphon.graph_engine.command_channels import InMemoryChannel
from graphon.runtime import GraphRuntimeState
from core.workflow.runtime_state import bind_graph_runtime_state_to_graph, create_graph_runtime_state
# Import our MockNodeFactory instead of DifyNodeFactory
from .test_mock_factory import MockNodeFactory
@@ -643,9 +644,10 @@ class MockIterationNode(MockNodeMixin, IterationNode):
variable_pool_copy.add([self._node_id, "item"], item)
# Create a new GraphRuntimeState for this iteration
graph_runtime_state_copy = GraphRuntimeState(
graph_runtime_state_copy = create_graph_runtime_state(
variable_pool=variable_pool_copy,
start_at=self.graph_runtime_state.start_at,
workflow_id=self.workflow_id,
total_tokens=0,
node_run_steps=0,
)
@@ -666,6 +668,11 @@ class MockIterationNode(MockNodeMixin, IterationNode):
from graphon.nodes.iteration.exc import IterationGraphNotFoundError
raise IterationGraphNotFoundError("iteration graph not found")
bind_graph_runtime_state_to_graph(
graph_runtime_state_copy,
iteration_graph,
workflow_id=self.workflow_id,
)
# Create a new GraphEngine for this iteration
graph_engine = GraphEngine(
@@ -694,7 +701,8 @@ class MockLoopNode(MockNodeMixin, LoopNode):
from graphon.graph import Graph
from graphon.graph_engine import GraphEngine, GraphEngineConfig
from graphon.graph_engine.command_channels import InMemoryChannel
from graphon.runtime import GraphRuntimeState
from core.workflow.runtime_state import bind_graph_runtime_state_to_graph, create_graph_runtime_state
# Import our MockNodeFactory instead of DifyNodeFactory
from .test_mock_factory import MockNodeFactory
@@ -708,9 +716,10 @@ class MockLoopNode(MockNodeMixin, LoopNode):
)
# Create a new GraphRuntimeState for this iteration
graph_runtime_state_copy = GraphRuntimeState(
graph_runtime_state_copy = create_graph_runtime_state(
variable_pool=self.graph_runtime_state.variable_pool,
start_at=start_at.timestamp(),
workflow_id=self.workflow_id,
)
# Create a MockNodeFactory with the same mock_config
@@ -725,6 +734,11 @@ class MockLoopNode(MockNodeMixin, LoopNode):
if not loop_graph:
raise ValueError("loop graph not found")
bind_graph_runtime_state_to_graph(
graph_runtime_state_copy,
loop_graph,
workflow_id=self.workflow_id,
)
# Create a new GraphEngine for this iteration
graph_engine = GraphEngine(

View File

@@ -30,6 +30,11 @@ from core.repositories.human_input_repository import (
HumanInputFormRepository,
)
from core.workflow.node_runtime import DifyHumanInputNodeRuntime
from core.workflow.runtime_state import (
bind_graph_runtime_state_to_graph,
create_graph_runtime_state,
snapshot_graph_runtime_state,
)
from core.workflow.system_variables import build_system_variables
from libs.datetime_utils import naive_utc_now
from tests.workflow_test_utils import build_test_graph_init_params
@@ -46,7 +51,10 @@ class InMemoryPauseStore:
self._snapshot: str | None = None
def save(self, runtime_state: GraphRuntimeState) -> None:
self._snapshot = runtime_state.dumps()
self._snapshot = snapshot_graph_runtime_state(
runtime_state,
workflow_id="workflow",
)
def load(self) -> GraphRuntimeState:
assert self._snapshot is not None
@@ -122,7 +130,11 @@ def _build_runtime_state() -> GraphRuntimeState:
user_inputs={},
conversation_variables=[],
)
return GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
return create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.perf_counter(),
workflow_id="workflow",
)
def _build_graph(runtime_state: GraphRuntimeState, repo: HumanInputFormRepository) -> Graph:
@@ -200,6 +212,11 @@ def _build_graph(runtime_state: GraphRuntimeState, repo: HumanInputFormRepositor
def _run_graph(graph: Graph, runtime_state: GraphRuntimeState) -> list[object]:
bind_graph_runtime_state_to_graph(
runtime_state,
graph,
workflow_id="workflow",
)
engine = GraphEngine(
workflow_id="workflow",
graph=graph,

View File

@@ -42,6 +42,7 @@ from graphon.variables import (
from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, InvokeFrom, UserFrom
from core.tools.utils.yaml_utils import _load_yaml_file
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id
from core.workflow.runtime_state import bind_graph_runtime_state_to_graph, create_graph_runtime_state
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
@@ -65,9 +66,10 @@ class _TableTestChildEngineBuilder:
root_node_id: str,
variable_pool: VariablePool | None = None,
) -> GraphEngine:
child_graph_runtime_state = GraphRuntimeState(
child_graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool if variable_pool is not None else parent_graph_runtime_state.variable_pool,
start_at=time.perf_counter(),
workflow_id=workflow_id,
execution_context=parent_graph_runtime_state.execution_context,
)
if self._use_mock_factory:
@@ -86,6 +88,11 @@ class _TableTestChildEngineBuilder:
child_graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=root_node_id)
if not child_graph:
raise ValueError("child graph not found")
bind_graph_runtime_state_to_graph(
child_graph_runtime_state,
child_graph,
workflow_id=workflow_id,
)
child_engine = GraphEngine(
workflow_id=workflow_id,
@@ -261,7 +268,11 @@ class WorkflowRunner:
)
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=root_node_inputs)
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.perf_counter(),
workflow_id=graph_init_params.workflow_id,
)
if use_mock_factory:
node_factory = MockNodeFactory(
@@ -275,6 +286,11 @@ class WorkflowRunner:
node_factory=node_factory,
root_node_id=root_node_id,
)
bind_graph_runtime_state_to_graph(
graph_runtime_state,
graph,
workflow_id=graph_init_params.workflow_id,
)
return graph, graph_runtime_state
@@ -366,6 +382,11 @@ class TableTestRunner:
try:
graph, graph_runtime_state = self._create_graph_runtime_state(test_case)
bind_graph_runtime_state_to_graph(
graph_runtime_state,
graph,
workflow_id="test_workflow",
)
# Create and run the engine with configured worker settings
engine = GraphEngine(

View File

@@ -5,6 +5,8 @@ from graphon.graph_events import (
NodeRunStreamChunkEvent,
)
from core.workflow.runtime_state import bind_graph_runtime_state_to_graph
from .test_table_runner import TableTestRunner
@@ -20,6 +22,11 @@ def test_tool_in_chatflow():
query="1",
use_mock_factory=True,
)
bind_graph_runtime_state_to_graph(
graph_runtime_state,
graph,
workflow_id="test_workflow",
)
# Create and run the engine
engine = GraphEngine(

View File

@@ -0,0 +1,234 @@
from __future__ import annotations
import json
from types import SimpleNamespace
from unittest.mock import sentinel
import pytest
from graphon.graph_engine.domain.graph_execution import GraphExecution
from graphon.graph_engine.ready_queue import InMemoryReadyQueue
from graphon.model_runtime.entities.llm_entities import LLMUsage
from graphon.runtime import GraphRuntimeState, VariablePool
from core.workflow.runtime_state import (
bind_graph_runtime_state_to_graph,
create_graph_runtime_state,
ensure_graph_runtime_state_initialized,
snapshot_graph_runtime_state,
)
class _FakeGraph:
def __init__(self) -> None:
self.nodes: dict[str, object] = {}
self.edges: dict[str, object] = {}
self.root_node = SimpleNamespace()
def get_outgoing_edges(self, node_id: str) -> list[object]:
_ = node_id
return []
def _build_variable_pool() -> VariablePool:
return VariablePool()
class TestCreateGraphRuntimeState:
def test_initializes_explicit_collaborators_with_defaults(self) -> None:
execution_context = sentinel.execution_context
runtime_state = create_graph_runtime_state(
variable_pool=_build_variable_pool(),
start_at=12.5,
workflow_id="workflow-id",
execution_context=execution_context,
)
assert runtime_state.start_at == 12.5
assert runtime_state.total_tokens == 0
assert runtime_state.node_run_steps == 0
assert runtime_state.llm_usage == LLMUsage.empty_usage()
assert runtime_state.outputs == {}
assert isinstance(runtime_state.ready_queue, InMemoryReadyQueue)
assert runtime_state.graph_execution.workflow_id == "workflow-id"
assert runtime_state.execution_context is execution_context
def test_preserves_explicit_scalar_and_usage_values(self) -> None:
llm_usage = LLMUsage.empty_usage()
llm_usage.total_tokens = 9
runtime_state = create_graph_runtime_state(
variable_pool=_build_variable_pool(),
start_at=3.0,
workflow_id="workflow-id",
total_tokens=7,
llm_usage=llm_usage,
outputs={"answer": "ok"},
node_run_steps=4,
)
assert runtime_state.total_tokens == 7
assert runtime_state.llm_usage == llm_usage
assert runtime_state.outputs == {"answer": "ok"}
assert runtime_state.node_run_steps == 4
def test_rejects_blank_workflow_id(self) -> None:
with pytest.raises(ValueError, match="workflow_id must be a non-empty string"):
create_graph_runtime_state(
variable_pool=_build_variable_pool(),
start_at=0.0,
workflow_id="",
)
class TestEnsureGraphRuntimeStateInitialized:
def test_materializes_missing_ready_queue_and_graph_execution(self) -> None:
runtime_state = GraphRuntimeState(
variable_pool=_build_variable_pool(),
start_at=0.0,
)
ensure_graph_runtime_state_initialized(
runtime_state,
workflow_id="workflow-id",
)
assert isinstance(runtime_state.ready_queue, InMemoryReadyQueue)
assert runtime_state.graph_execution.workflow_id == "workflow-id"
def test_backfills_empty_graph_execution_workflow_id(self) -> None:
graph_execution = GraphExecution(workflow_id="")
runtime_state = GraphRuntimeState(
variable_pool=_build_variable_pool(),
start_at=0.0,
ready_queue=InMemoryReadyQueue(),
graph_execution=graph_execution,
)
ensure_graph_runtime_state_initialized(
runtime_state,
workflow_id="workflow-id",
)
assert runtime_state.graph_execution is graph_execution
assert graph_execution.workflow_id == "workflow-id"
def test_preserves_existing_ready_queue_and_graph_execution(self) -> None:
ready_queue = InMemoryReadyQueue()
graph_execution = GraphExecution(workflow_id="workflow-id")
runtime_state = GraphRuntimeState(
variable_pool=_build_variable_pool(),
start_at=0.0,
ready_queue=ready_queue,
graph_execution=graph_execution,
)
ensure_graph_runtime_state_initialized(
runtime_state,
workflow_id="workflow-id",
)
assert runtime_state.ready_queue is ready_queue
assert runtime_state.graph_execution is graph_execution
def test_rejects_mismatched_workflow_id(self) -> None:
runtime_state = GraphRuntimeState(
variable_pool=_build_variable_pool(),
start_at=0.0,
graph_execution=GraphExecution(workflow_id="other-workflow"),
)
with pytest.raises(ValueError, match="workflow_id does not match graph execution workflow_id"):
ensure_graph_runtime_state_initialized(
runtime_state,
workflow_id="workflow-id",
)
class TestBindGraphRuntimeStateToGraph:
def test_creates_response_coordinator_and_attaches_graph(self, monkeypatch: pytest.MonkeyPatch) -> None:
runtime_state = GraphRuntimeState(
variable_pool=_build_variable_pool(),
start_at=0.0,
)
graph = _FakeGraph()
coordinator = sentinel.response_coordinator
monkeypatch.setattr(
"core.workflow.runtime_state.ResponseStreamCoordinator",
lambda *, variable_pool, graph: (
coordinator if variable_pool is runtime_state.variable_pool and graph is graph else None
),
)
bind_graph_runtime_state_to_graph(
runtime_state,
graph,
workflow_id="workflow-id",
)
assert runtime_state.response_coordinator is coordinator
def test_preserves_existing_response_coordinator(self, monkeypatch: pytest.MonkeyPatch) -> None:
existing_coordinator = sentinel.existing_coordinator
runtime_state = GraphRuntimeState(
variable_pool=_build_variable_pool(),
start_at=0.0,
response_coordinator=existing_coordinator,
)
monkeypatch.setattr(
"core.workflow.runtime_state.ResponseStreamCoordinator",
lambda **kwargs: pytest.fail(f"unexpected response coordinator construction: {kwargs}"),
)
bind_graph_runtime_state_to_graph(
runtime_state,
_FakeGraph(),
workflow_id="workflow-id",
)
assert runtime_state.response_coordinator is existing_coordinator
def test_rejects_attaching_a_different_graph_instance(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
"core.workflow.runtime_state.ResponseStreamCoordinator",
lambda **kwargs: sentinel.response_coordinator,
)
runtime_state = GraphRuntimeState(
variable_pool=_build_variable_pool(),
start_at=0.0,
)
first_graph = _FakeGraph()
second_graph = _FakeGraph()
bind_graph_runtime_state_to_graph(
runtime_state,
first_graph,
workflow_id="workflow-id",
)
with pytest.raises(ValueError, match="already attached to a different graph instance"):
bind_graph_runtime_state_to_graph(
runtime_state,
second_graph,
workflow_id="workflow-id",
)
class TestSnapshotGraphRuntimeState:
def test_serializes_sparse_state_after_explicit_initialization(self) -> None:
runtime_state = GraphRuntimeState(
variable_pool=_build_variable_pool(),
start_at=0.0,
)
snapshot = snapshot_graph_runtime_state(
runtime_state,
workflow_id="workflow-id",
)
payload = json.loads(snapshot)
assert payload["ready_queue"] is not None
assert json.loads(payload["graph_execution"])["workflow_id"] == "workflow-id"
assert payload["outputs"] == {}

View File

@@ -107,11 +107,12 @@ class TestWorkflowChildEngineBuilder:
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(
workflow_entry,
"GraphRuntimeState",
"create_graph_runtime_state",
return_value=child_graph_runtime_state,
) as graph_runtime_state_cls,
) as create_graph_runtime_state,
patch.object(workflow_entry, "DifyNodeFactory", return_value=sentinel.factory) as dify_node_factory,
patch.object(workflow_entry.Graph, "init", return_value=child_graph) as graph_init,
patch.object(workflow_entry, "bind_graph_runtime_state_to_graph") as bind_runtime_state,
patch.object(workflow_entry, "GraphEngine", return_value=child_engine) as graph_engine_cls,
patch.object(workflow_entry, "GraphEngineConfig", return_value=sentinel.graph_engine_config),
patch.object(workflow_entry, "InMemoryChannel", return_value=sentinel.command_channel),
@@ -126,9 +127,10 @@ class TestWorkflowChildEngineBuilder:
)
assert result is child_engine
graph_runtime_state_cls.assert_called_once_with(
create_graph_runtime_state.assert_called_once_with(
variable_pool=sentinel.child_variable_pool,
start_at=123.0,
workflow_id="workflow-id",
execution_context=sentinel.execution_context,
)
dify_node_factory.assert_called_once_with(
@@ -140,6 +142,11 @@ class TestWorkflowChildEngineBuilder:
node_factory=sentinel.factory,
root_node_id="root",
)
bind_runtime_state.assert_called_once_with(
child_graph_runtime_state,
child_graph,
workflow_id="workflow-id",
)
graph_engine_cls.assert_called_once_with(
workflow_id="workflow-id",
graph=child_graph,
@@ -247,6 +254,7 @@ class TestWorkflowEntryInit:
patch.object(workflow_entry.dify_config, "ENABLE_OTEL", False),
patch.object(workflow_entry, "is_instrument_flag_enabled", return_value=True),
patch.object(workflow_entry, "capture_current_context", return_value=sentinel.execution_context),
patch.object(workflow_entry, "bind_graph_runtime_state_to_graph"),
patch.object(workflow_entry, "GraphEngine", return_value=graph_engine) as graph_engine_cls,
patch.object(workflow_entry, "GraphEngineConfig", return_value=sentinel.graph_engine_config),
patch.object(workflow_entry, "InMemoryChannel", return_value=sentinel.command_channel),
@@ -352,7 +360,7 @@ class TestWorkflowEntrySingleStepRun:
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(
workflow_entry,
"GraphRuntimeState",
"create_graph_runtime_state",
return_value=SimpleNamespace(variable_pool=variable_pool),
),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
@@ -413,7 +421,7 @@ class TestWorkflowEntrySingleStepRun:
with (
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeNode),
@@ -482,7 +490,7 @@ class TestWorkflowEntrySingleStepRun:
with (
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeDatasourceNode),
@@ -542,7 +550,7 @@ class TestWorkflowEntrySingleStepRun:
with (
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeNode),
@@ -653,7 +661,9 @@ class TestWorkflowEntryHelpers:
patch.object(
workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context
) as graph_init_context_cls,
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(
workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state
) as create_graph_runtime_state,
patch.object(
workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}
) as build_dify_run_context,
@@ -693,13 +703,14 @@ class TestWorkflowEntryHelpers:
invoke_from=InvokeFrom.DEBUGGER,
)
graph_init_context_cls.assert_called_once_with(
workflow_id="",
workflow_id="free-node:node-id",
graph_config=workflow_entry.WorkflowEntry._create_single_node_graph(
"node-id", {"type": BuiltinNodeTypes.PARAMETER_EXTRACTOR, "title": "Node"}
),
run_context={"_dify": "context"},
call_depth=0,
)
create_graph_runtime_state.assert_called_once()
dify_node_factory_cls.assert_called_once_with(
graph_init_context=sentinel.graph_init_context,
graph_runtime_state=sentinel.graph_runtime_state,
@@ -739,7 +750,7 @@ class TestWorkflowEntryHelpers:
patch.object(workflow_entry, "VariablePool", return_value=sentinel.variable_pool),
patch.object(workflow_entry, "add_variables_to_pool"),
patch.object(workflow_entry, "DifyGraphInitContext", return_value=sentinel.graph_init_context),
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "create_graph_runtime_state", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(

View File

@@ -26,7 +26,10 @@ class TestWorkflowEntryRedisChannel:
redis_channel = RedisChannel(mock_redis_client, "test:channel:key")
# Patch GraphEngine to verify it receives the Redis channel
with patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine:
with (
patch("core.workflow.workflow_entry.bind_graph_runtime_state_to_graph"),
patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine,
):
mock_graph_engine = MockGraphEngine.return_value # Create WorkflowEntry with Redis channel
workflow_entry = WorkflowEntry(
tenant_id="test-tenant",
@@ -60,6 +63,7 @@ class TestWorkflowEntryRedisChannel:
# Patch GraphEngine and InMemoryChannel
with (
patch("core.workflow.workflow_entry.bind_graph_runtime_state_to_graph"),
patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine,
patch("core.workflow.workflow_entry.InMemoryChannel", autospec=True) as MockInMemoryChannel,
):
@@ -107,7 +111,10 @@ class TestWorkflowEntryRedisChannel:
mock_event2 = MagicMock()
# Patch GraphEngine
with patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine:
with (
patch("core.workflow.workflow_entry.bind_graph_runtime_state_to_graph"),
patch("core.workflow.workflow_entry.GraphEngine", autospec=True) as MockGraphEngine,
):
mock_graph_engine = MagicMock()
mock_graph_engine.run.return_value = iter([mock_event1, mock_event2])
MockGraphEngine.return_value = mock_graph_engine

View File

@@ -2754,8 +2754,8 @@ class TestWorkflowServiceFreeNodeExecution:
with (
patch("services.workflow_service.DifyGraphInitContext") as mock_graph_init_context_cls,
patch("services.workflow_service.GraphRuntimeState"),
patch("services.workflow_service.build_dify_run_context") as mock_build_dify_run_context,
patch("services.workflow_service.create_graph_runtime_state"),
patch("services.workflow_service.DifyHumanInputNodeRuntime") as mock_runtime_cls,
patch("services.workflow_service.HumanInputNode") as mock_node_cls,
):

View File

@@ -9,11 +9,12 @@ from threading import Event
import pytest
from graphon.entities.pause_reason import HumanInputRequired
from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionStatus
from graphon.runtime import GraphRuntimeState, VariablePool
from graphon.runtime import VariablePool
from core.app.app_config.entities import WorkflowUIBasedAppConfig
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext, _WorkflowGenerateEntityWrapper
from core.workflow.runtime_state import create_graph_runtime_state, snapshot_graph_runtime_state
from models.enums import CreatorUserRole
from models.model import AppMode
from models.workflow import WorkflowRun
@@ -116,13 +117,20 @@ def _build_resumption_context(task_id: str) -> WorkflowResumptionContext:
call_depth=0,
workflow_execution_id="run-1",
)
runtime_state = GraphRuntimeState(variable_pool=VariablePool(), start_at=0.0)
runtime_state = create_graph_runtime_state(
variable_pool=VariablePool(),
start_at=0.0,
workflow_id="workflow-1",
)
runtime_state.register_paused_node("node-1")
runtime_state.outputs = {"result": "value"}
wrapper = _WorkflowGenerateEntityWrapper(entity=generate_entity)
return WorkflowResumptionContext(
generate_entity=wrapper,
serialized_graph_runtime_state=runtime_state.dumps(),
serialized_graph_runtime_state=snapshot_graph_runtime_state(
runtime_state,
workflow_id="workflow-1",
),
)