Compare commits

...

4 Commits

10 changed files with 478 additions and 72 deletions

View File

@@ -256,6 +256,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
single_iteration_run=AdvancedChatAppGenerateEntity.SingleIterationRunEntity(
node_id=node_id, inputs=args["inputs"]
),
workflow_run_id=str(uuid.uuid4()),
)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())
@@ -339,6 +340,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
invoke_from=InvokeFrom.DEBUGGER,
extras={"auto_generate_conversation_name": False},
single_loop_run=AdvancedChatAppGenerateEntity.SingleLoopRunEntity(node_id=node_id, inputs=args["inputs"]),
workflow_run_id=str(uuid.uuid4()),
)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())

View File

@@ -26,7 +26,10 @@ from core.variables.variables import VariableUnion
from core.workflow.enums import WorkflowType
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.workflow.graph_engine.layers.persistence import (
PersistenceWorkflowInfo,
WorkflowPersistenceLayer,
)
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.runtime import GraphRuntimeState, VariablePool
@@ -109,6 +112,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
workflow=self._workflow,
single_iteration_run=self.application_generate_entity.single_iteration_run,
single_loop_run=self.application_generate_entity.single_loop_run,
system_variables=system_inputs,
)
else:
inputs = self.application_generate_entity.inputs
@@ -186,20 +190,20 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
self._queue_manager.graph_runtime_state = graph_runtime_state
persistence_layer = WorkflowPersistenceLayer(
application_generate_entity=self.application_generate_entity,
workflow_info=PersistenceWorkflowInfo(
workflow_id=self._workflow.id,
workflow_type=WorkflowType(self._workflow.type),
version=self._workflow.version,
graph_data=self._workflow.graph_dict,
),
workflow_execution_repository=self._workflow_execution_repository,
workflow_node_execution_repository=self._workflow_node_execution_repository,
trace_manager=self.application_generate_entity.trace_manager,
)
workflow_entry.graph_engine.layer(persistence_layer)
if not self.application_generate_entity.is_single_stepping_container_nodes():
persistence_layer = WorkflowPersistenceLayer(
application_generate_entity=self.application_generate_entity,
workflow_info=PersistenceWorkflowInfo(
workflow_id=self._workflow.id,
workflow_type=WorkflowType(self._workflow.type),
version=self._workflow.version,
graph_data=self._workflow.graph_dict,
),
workflow_execution_repository=self._workflow_execution_repository,
workflow_node_execution_repository=self._workflow_node_execution_repository,
trace_manager=self.application_generate_entity.trace_manager,
)
workflow_entry.graph_engine.layer(persistence_layer)
for layer in self._graph_engine_layers:
workflow_entry.graph_engine.layer(layer)

View File

@@ -92,6 +92,22 @@ class PipelineRunner(WorkflowBasedAppRunner):
db.session.close()
files = self.application_generate_entity.files
system_inputs = SystemVariable(
files=files,
user_id=user_id,
app_id=app_config.app_id,
workflow_id=app_config.workflow_id,
workflow_execution_id=self.application_generate_entity.workflow_execution_id,
document_id=self.application_generate_entity.document_id,
original_document_id=self.application_generate_entity.original_document_id,
batch=self.application_generate_entity.batch,
dataset_id=self.application_generate_entity.dataset_id,
datasource_type=self.application_generate_entity.datasource_type,
datasource_info=self.application_generate_entity.datasource_info,
invoke_from=self.application_generate_entity.invoke_from.value,
)
# if only single iteration run is requested
if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
# Handle single iteration or single loop run
@@ -99,27 +115,12 @@ class PipelineRunner(WorkflowBasedAppRunner):
workflow=workflow,
single_iteration_run=self.application_generate_entity.single_iteration_run,
single_loop_run=self.application_generate_entity.single_loop_run,
system_variables=system_inputs,
)
else:
inputs = self.application_generate_entity.inputs
files = self.application_generate_entity.files
# Create a variable pool.
system_inputs = SystemVariable(
files=files,
user_id=user_id,
app_id=app_config.app_id,
workflow_id=app_config.workflow_id,
workflow_execution_id=self.application_generate_entity.workflow_execution_id,
document_id=self.application_generate_entity.document_id,
original_document_id=self.application_generate_entity.original_document_id,
batch=self.application_generate_entity.batch,
dataset_id=self.application_generate_entity.dataset_id,
datasource_type=self.application_generate_entity.datasource_type,
datasource_info=self.application_generate_entity.datasource_info,
invoke_from=self.application_generate_entity.invoke_from.value,
)
rag_pipeline_variables = []
if workflow.rag_pipeline_variables:
for v in workflow.rag_pipeline_variables:
@@ -171,21 +172,21 @@ class PipelineRunner(WorkflowBasedAppRunner):
)
self._queue_manager.graph_runtime_state = graph_runtime_state
if not self.application_generate_entity.is_single_stepping_container_nodes():
persistence_layer = WorkflowPersistenceLayer(
application_generate_entity=self.application_generate_entity,
workflow_info=PersistenceWorkflowInfo(
workflow_id=workflow.id,
workflow_type=WorkflowType(workflow.type),
version=workflow.version,
graph_data=workflow.graph_dict,
),
workflow_execution_repository=self._workflow_execution_repository,
workflow_node_execution_repository=self._workflow_node_execution_repository,
trace_manager=self.application_generate_entity.trace_manager,
)
persistence_layer = WorkflowPersistenceLayer(
application_generate_entity=self.application_generate_entity,
workflow_info=PersistenceWorkflowInfo(
workflow_id=workflow.id,
workflow_type=WorkflowType(workflow.type),
version=workflow.version,
graph_data=workflow.graph_dict,
),
workflow_execution_repository=self._workflow_execution_repository,
workflow_node_execution_repository=self._workflow_node_execution_repository,
trace_manager=self.application_generate_entity.trace_manager,
)
workflow_entry.graph_engine.layer(persistence_layer)
workflow_entry.graph_engine.layer(persistence_layer)
generator = workflow_entry.run()

View File

@@ -10,7 +10,10 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat
from core.workflow.enums import WorkflowType
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.workflow.graph_engine.layers.persistence import (
PersistenceWorkflowInfo,
WorkflowPersistenceLayer,
)
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.runtime import GraphRuntimeState, VariablePool
@@ -80,6 +83,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
workflow=self._workflow,
single_iteration_run=self.application_generate_entity.single_iteration_run,
single_loop_run=self.application_generate_entity.single_loop_run,
system_variables=system_inputs,
)
else:
inputs = self.application_generate_entity.inputs
@@ -132,20 +136,21 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
command_channel=command_channel,
)
persistence_layer = WorkflowPersistenceLayer(
application_generate_entity=self.application_generate_entity,
workflow_info=PersistenceWorkflowInfo(
workflow_id=self._workflow.id,
workflow_type=WorkflowType(self._workflow.type),
version=self._workflow.version,
graph_data=self._workflow.graph_dict,
),
workflow_execution_repository=self._workflow_execution_repository,
workflow_node_execution_repository=self._workflow_node_execution_repository,
trace_manager=self.application_generate_entity.trace_manager,
)
if not self.application_generate_entity.is_single_stepping_container_nodes():
persistence_layer = WorkflowPersistenceLayer(
application_generate_entity=self.application_generate_entity,
workflow_info=PersistenceWorkflowInfo(
workflow_id=self._workflow.id,
workflow_type=WorkflowType(self._workflow.type),
version=self._workflow.version,
graph_data=self._workflow.graph_dict,
),
workflow_execution_repository=self._workflow_execution_repository,
workflow_node_execution_repository=self._workflow_node_execution_repository,
trace_manager=self.application_generate_entity.trace_manager,
)
workflow_entry.graph_engine.layer(persistence_layer)
workflow_entry.graph_engine.layer(persistence_layer)
for layer in self._graph_engine_layers:
workflow_entry.graph_engine.layer(layer)

View File

@@ -130,6 +130,8 @@ class WorkflowBasedAppRunner:
workflow: Workflow,
single_iteration_run: Any | None = None,
single_loop_run: Any | None = None,
*,
system_variables: SystemVariable | None = None,
) -> tuple[Graph, VariablePool, GraphRuntimeState]:
"""
Prepare graph, variable pool, and runtime state for single node execution
@@ -147,9 +149,10 @@ class WorkflowBasedAppRunner:
ValueError: If neither single_iteration_run nor single_loop_run is specified
"""
# Create initial runtime state with variable pool containing environment variables
system_variables = system_variables or SystemVariable.empty()
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(
system_variables=SystemVariable.empty(),
system_variables=system_variables,
user_inputs={},
environment_variables=workflow.environment_variables,
),
@@ -220,7 +223,7 @@ class WorkflowBasedAppRunner:
# filter nodes only in the specified node type (iteration or loop)
main_node_config = next((n for n in graph_config.get("nodes", []) if n.get("id") == node_id), None)
start_node_id = main_node_config.get("data", {}).get("start_node_id") if main_node_config else None
node_configs = [
base_node_configs = [
node
for node in graph_config.get("nodes", [])
if node.get("id") == node_id
@@ -228,26 +231,74 @@ class WorkflowBasedAppRunner:
or (start_node_id and node.get("id") == start_node_id)
]
graph_config["nodes"] = node_configs
# Build a base graph config (without synthetic entry) to keep node-level context minimal.
base_graph_config = graph_config.copy()
base_graph_config["nodes"] = base_node_configs
node_ids = [node.get("id") for node in node_configs]
node_ids = [node.get("id") for node in base_node_configs if isinstance(node.get("id"), str)]
# filter edges only in the specified node type
edge_configs = [
base_edge_configs = [
edge
for edge in graph_config.get("edges", [])
if (edge.get("source") is None or edge.get("source") in node_ids)
and (edge.get("target") is None or edge.get("target") in node_ids)
]
graph_config["edges"] = edge_configs
base_graph_config["edges"] = base_edge_configs
# Inject a synthetic start node so Graph validation accepts the single-node graph
# (loop/iteration nodes are containers and cannot serve as graph roots).
synthetic_start_node_id = f"{node_id}_single_step_start"
synthetic_start_node = {
"id": synthetic_start_node_id,
"type": "custom",
"data": {
"type": NodeType.START,
"title": "Start",
"desc": "Synthetic start for single-step run",
"version": "1",
"variables": [],
},
}
synthetic_end_node_id = f"{node_id}_single_step_end"
synthetic_end_node = {
"id": synthetic_end_node_id,
"type": "custom",
"data": {
"type": NodeType.END,
"title": "End",
"desc": "Synthetic end for single-step run",
"version": "1",
"outputs": [],
},
}
graph_config_with_entry = base_graph_config.copy()
graph_config_with_entry["nodes"] = [*base_node_configs, synthetic_start_node, synthetic_end_node]
graph_config_with_entry["edges"] = [
*base_edge_configs,
{
"source": synthetic_start_node_id,
"target": node_id,
"sourceHandle": "source",
"targetHandle": "target",
},
{
"source": node_id,
"target": synthetic_end_node_id,
"sourceHandle": "source",
"targetHandle": "target",
},
]
# Create required parameters for Graph.init
graph_init_params = GraphInitParams(
tenant_id=workflow.tenant_id,
app_id=self._app_id,
workflow_id=workflow.id,
graph_config=graph_config,
graph_config=base_graph_config,
user_id="",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.SERVICE_API,
@@ -260,14 +311,16 @@ class WorkflowBasedAppRunner:
)
# init graph
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=node_id)
graph = Graph.init(
graph_config=graph_config_with_entry, node_factory=node_factory, root_node_id=synthetic_start_node_id
)
if not graph:
raise ValueError("graph not found in workflow")
# fetch node config from node id
target_node_config = None
for node in node_configs:
for node in base_node_configs:
if node.get("id") == node_id:
target_node_config = node
break

View File

@@ -228,6 +228,9 @@ class AdvancedChatAppGenerateEntity(ConversationAppGenerateEntity):
single_loop_run: SingleLoopRunEntity | None = None
def is_single_stepping_container_nodes(self) -> bool:
return self.single_iteration_run is not None or self.single_loop_run is not None
class WorkflowAppGenerateEntity(AppGenerateEntity):
"""
@@ -258,6 +261,9 @@ class WorkflowAppGenerateEntity(AppGenerateEntity):
single_loop_run: SingleLoopRunEntity | None = None
def is_single_stepping_container_nodes(self) -> bool:
return self.single_iteration_run is not None or self.single_loop_run is not None
class RagPipelineGenerateEntity(WorkflowAppGenerateEntity):
"""

View File

@@ -149,18 +149,49 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
return isinstance(variable, NoneSegment) or len(variable.value) == 0
def _handle_empty_iteration(self, variable: ArraySegment | NoneSegment) -> Generator[NodeEventBase, None, None]:
started_at = naive_utc_now()
inputs: dict[str, object] = {"iterator_selector": []}
usage = LLMUsage.empty_usage()
yield IterationStartedEvent(
start_at=started_at,
inputs=inputs,
metadata={"iteration_length": 0},
)
# Try our best to preserve the type information.
if isinstance(variable, ArraySegment):
output = variable.model_copy(update={"value": []})
else:
output = ArrayAnySegment(value=[])
yield IterationSucceededEvent(
start_at=started_at,
inputs=inputs,
outputs={"output": []},
steps=0,
metadata={
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,
WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency,
WorkflowNodeExecutionMetadataKey.ITERATION_DURATION_MAP: {},
},
)
yield StreamCompletedEvent(
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
# TODO(QuantumGhost): is it possible to compute the type of `output`
# from graph definition?
outputs={"output": output},
inputs=inputs,
metadata={
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,
WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency,
WorkflowNodeExecutionMetadataKey.ITERATION_DURATION_MAP: {},
},
llm_usage=usage,
)
)

View File

@@ -0,0 +1,245 @@
from types import SimpleNamespace
from unittest.mock import MagicMock
import core.app.apps.workflow.app_runner as workflow_app_runner
from core.app.apps.workflow.app_runner import WorkflowAppRunner, WorkflowBasedAppRunner
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.enums import NodeType, SystemVariableKey, WorkflowType
from core.workflow.system_variable import SystemVariable
def test_prepare_single_iteration_injects_system_variables_and_fake_workflow():
node_id = "iteration_node"
execution_id = "workflow-exec-123"
workflow = SimpleNamespace(
id="workflow-id",
tenant_id="tenant-id",
app_id="app-id",
environment_variables=[],
graph_dict={
"nodes": [
{
"id": node_id,
"type": "custom",
"data": {
"type": NodeType.ITERATION,
"title": "Iteration",
"version": "1",
"iterator_selector": ["start", "items"],
"output_selector": [node_id, "output"],
},
}
],
"edges": [],
},
)
runner = WorkflowBasedAppRunner(queue_manager=MagicMock(), app_id="app-id")
system_inputs = SystemVariable(app_id="app-id", workflow_id="workflow-id", workflow_execution_id=execution_id)
graph, _, runtime_state = runner._prepare_single_node_execution(
workflow=workflow,
single_iteration_run=SimpleNamespace(node_id=node_id, inputs={"input_selector": [1, 2, 3]}),
system_variables=system_inputs,
)
assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id
assert runtime_state.variable_pool.get_by_prefix("sys")[SystemVariableKey.WORKFLOW_EXECUTION_ID] == execution_id
assert graph.root_node.id == f"{node_id}_single_step_start"
assert f"{node_id}_single_step_end" in graph.nodes
def test_prepare_single_loop_injects_system_variables_and_fake_workflow():
node_id = "loop_node"
execution_id = "workflow-exec-456"
workflow = SimpleNamespace(
id="workflow-id",
tenant_id="tenant-id",
app_id="app-id",
environment_variables=[],
graph_dict={
"nodes": [
{
"id": node_id,
"type": "custom",
"data": {
"type": NodeType.LOOP,
"title": "Loop",
"version": "1",
"loop_count": 1,
"break_conditions": [],
"logical_operator": "and",
"loop_variables": [],
"outputs": {},
},
}
],
"edges": [],
},
)
runner = WorkflowBasedAppRunner(queue_manager=MagicMock(), app_id="app-id")
system_inputs = SystemVariable(app_id="app-id", workflow_id="workflow-id", workflow_execution_id=execution_id)
graph, _, runtime_state = runner._prepare_single_node_execution(
workflow=workflow,
single_loop_run=SimpleNamespace(node_id=node_id, inputs={}),
system_variables=system_inputs,
)
assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id
assert graph.root_node.id == f"{node_id}_single_step_start"
assert f"{node_id}_single_step_end" in graph.nodes
class DummyCommandChannel:
def fetch_commands(self):
return []
def send_command(self, command):
return None
def _empty_graph_engine_run(self):
if False: # pragma: no cover
yield None
def _build_generate_entity(*, single_iteration_run=None, single_loop_run=None):
if isinstance(single_iteration_run, dict):
single_iteration_run = SimpleNamespace(**single_iteration_run)
if isinstance(single_loop_run, dict):
single_loop_run = SimpleNamespace(**single_loop_run)
base = SimpleNamespace(
app_config=SimpleNamespace(app_id="app-id", workflow_id="workflow-id"),
workflow_execution_id="workflow-exec-id",
files=[],
user_id="user-id",
inputs={},
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
task_id="task-id",
trace_manager=None,
single_iteration_run=single_iteration_run,
single_loop_run=single_loop_run,
)
def is_single_stepping_container_nodes():
return base.single_iteration_run is not None or base.single_loop_run is not None
base.is_single_stepping_container_nodes = is_single_stepping_container_nodes # type: ignore[attr-defined]
return base
def test_workflow_runner_attaches_persistence_for_full_run(monkeypatch):
from core.workflow.graph_engine.graph_engine import GraphEngine
monkeypatch.setattr(GraphEngine, "run", _empty_graph_engine_run)
persistence_ctor = MagicMock(name="persistence_layer_ctor")
monkeypatch.setattr(workflow_app_runner, "WorkflowPersistenceLayer", persistence_ctor)
monkeypatch.setattr(workflow_app_runner, "RedisChannel", lambda *args, **kwargs: DummyCommandChannel())
queue_manager = MagicMock()
workflow = SimpleNamespace(
id="workflow-id",
tenant_id="tenant-id",
app_id="app-id",
type=WorkflowType.WORKFLOW,
version="1",
graph_dict={
"nodes": [
{
"id": "start",
"type": "custom",
"data": {"type": NodeType.START, "title": "Start", "version": "1", "variables": []},
},
{
"id": "end",
"type": "custom",
"data": {"type": NodeType.END, "title": "End", "version": "1", "outputs": []},
},
],
"edges": [
{"source": "start", "target": "end", "sourceHandle": "source", "targetHandle": "target"},
],
},
environment_variables=[],
)
generate_entity = _build_generate_entity()
generate_entity.inputs = {"input": "value"}
runner = WorkflowAppRunner(
application_generate_entity=generate_entity,
queue_manager=queue_manager,
variable_loader=MagicMock(),
workflow=workflow,
system_user_id="system-user-id",
root_node_id=None,
workflow_execution_repository=MagicMock(),
workflow_node_execution_repository=MagicMock(),
graph_engine_layers=(),
)
runner.run()
assert persistence_ctor.call_count == 1
def test_workflow_runner_skips_persistence_for_single_step(monkeypatch):
from core.workflow.graph_engine.graph_engine import GraphEngine
monkeypatch.setattr(GraphEngine, "run", _empty_graph_engine_run)
persistence_ctor = MagicMock(name="persistence_layer_ctor")
monkeypatch.setattr(workflow_app_runner, "WorkflowPersistenceLayer", persistence_ctor)
monkeypatch.setattr(workflow_app_runner, "RedisChannel", lambda *args, **kwargs: DummyCommandChannel())
queue_manager = MagicMock()
workflow = SimpleNamespace(
id="workflow-id",
tenant_id="tenant-id",
app_id="app-id",
type=WorkflowType.WORKFLOW,
version="1",
graph_dict={
"nodes": [
{
"id": "loop",
"type": "custom",
"data": {
"type": NodeType.LOOP,
"title": "Loop",
"version": "1",
"loop_count": 1,
"break_conditions": [],
"logical_operator": "and",
"loop_variables": [],
"outputs": {},
},
}
],
"edges": [],
},
environment_variables=[],
)
generate_entity = _build_generate_entity(single_loop_run={"node_id": "loop", "inputs": {}})
runner = WorkflowAppRunner(
application_generate_entity=generate_entity,
queue_manager=queue_manager,
variable_loader=MagicMock(),
workflow=workflow,
system_user_id="system-user-id",
root_node_id=None,
workflow_execution_repository=MagicMock(),
workflow_node_execution_repository=MagicMock(),
graph_engine_layers=(),
)
runner.run()
assert persistence_ctor.call_count == 0

View File

@@ -0,0 +1,51 @@
from core.workflow.entities import GraphInitParams
from core.workflow.graph_events import (
NodeRunIterationStartedEvent,
NodeRunIterationSucceededEvent,
NodeRunSucceededEvent,
)
from core.workflow.nodes.iteration.iteration_node import IterationNode
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.workflow.system_variable import SystemVariable
def test_iteration_node_emits_iteration_events_when_iterator_empty():
init_params = GraphInitParams(
tenant_id="tenant",
app_id="app",
workflow_id="workflow",
graph_config={},
user_id="user",
user_from="account",
invoke_from="debugger",
call_depth=0,
)
runtime_state = GraphRuntimeState(
variable_pool=VariablePool(system_variables=SystemVariable.empty(), user_inputs={}),
start_at=0.0,
)
runtime_state.variable_pool.add(("start", "items"), [])
node = IterationNode(
id="iteration-node",
config={
"id": "iteration-node",
"data": {
"title": "Iteration",
"iterator_selector": ["start", "items"],
"output_selector": ["iteration-node", "output"],
},
},
graph_init_params=init_params,
graph_runtime_state=runtime_state,
)
events = list(node.run())
assert any(isinstance(event, NodeRunIterationStartedEvent) for event in events)
iteration_succeeded_event = next(event for event in events if isinstance(event, NodeRunIterationSucceededEvent))
assert iteration_succeeded_event.steps == 0
assert iteration_succeeded_event.outputs == {"output": []}
assert any(isinstance(event, NodeRunSucceededEvent) for event in events)

View File

@@ -791,8 +791,12 @@ const useOneStepRun = <T>({
},
})
const { data: iterationData } = params
_runResult.created_by = iterationData.created_by.name
setRunResult(_runResult)
const nextRunResult = {
..._runResult,
created_by: (iterationData.created_by as any)?.name || '',
}
_runResult = nextRunResult
setRunResult(nextRunResult)
},
onIterationStart: (params) => {
const newIterationRunResult = produce(_iterationResult, (draft) => {
@@ -894,8 +898,12 @@ const useOneStepRun = <T>({
},
})
const { data: loopData } = params
_runResult.created_by = loopData.created_by.name
setRunResult(_runResult)
const nextRunResult = {
..._runResult,
created_by: (loopData.created_by as any)?.name || '',
}
_runResult = nextRunResult
setRunResult(nextRunResult)
},
onLoopStart: (params) => {
const newLoopRunResult = produce(_loopResult, (draft) => {