From 02fef84d7f0213c9bf9c8a127c70bfc8a9a8fa2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E4=B9=8B=E6=9C=AC=E6=BE=AA?= Date: Wed, 25 Feb 2026 04:01:26 +0800 Subject: [PATCH] test: migrate node execution repository sql tests to testcontainers (#32524) Co-authored-by: KinomotoMio <200703522+KinomotoMio@users.noreply.github.com> --- ..._api_workflow_node_execution_repository.py | 143 ++++++++++++++++++ ..._api_workflow_node_execution_repository.py | 40 ----- 2 files changed, 143 insertions(+), 40 deletions(-) create mode 100644 api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_api_workflow_node_execution_repository.py delete mode 100644 api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_node_execution_repository.py diff --git a/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_api_workflow_node_execution_repository.py b/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_api_workflow_node_execution_repository.py new file mode 100644 index 0000000000..556c029b24 --- /dev/null +++ b/api/tests/test_containers_integration_tests/repositories/test_sqlalchemy_api_workflow_node_execution_repository.py @@ -0,0 +1,143 @@ +"""Integration tests for DifyAPISQLAlchemyWorkflowNodeExecutionRepository using testcontainers.""" + +from __future__ import annotations + +from datetime import timedelta +from uuid import uuid4 + +from sqlalchemy import Engine, delete +from sqlalchemy.orm import Session, sessionmaker + +from core.workflow.enums import WorkflowNodeExecutionStatus +from libs.datetime_utils import naive_utc_now +from models.enums import CreatorUserRole +from models.workflow import WorkflowNodeExecutionModel +from repositories.sqlalchemy_api_workflow_node_execution_repository import ( + DifyAPISQLAlchemyWorkflowNodeExecutionRepository, +) + + +def _create_node_execution( + session: Session, + *, + tenant_id: str, + app_id: str, + workflow_id: str, + workflow_run_id: str, + status: WorkflowNodeExecutionStatus, + index: int, + created_by: str, + created_at_offset_seconds: int, +) -> WorkflowNodeExecutionModel: + now = naive_utc_now() + node_execution = WorkflowNodeExecutionModel( + id=str(uuid4()), + tenant_id=tenant_id, + app_id=app_id, + workflow_id=workflow_id, + triggered_from="workflow-run", + workflow_run_id=workflow_run_id, + index=index, + predecessor_node_id=None, + node_execution_id=None, + node_id=f"node-{index}", + node_type="llm", + title=f"Node {index}", + inputs="{}", + process_data="{}", + outputs="{}", + status=status, + error=None, + elapsed_time=0.0, + execution_metadata="{}", + created_at=now + timedelta(seconds=created_at_offset_seconds), + created_by_role=CreatorUserRole.ACCOUNT, + created_by=created_by, + finished_at=None, + ) + session.add(node_execution) + session.flush() + return node_execution + + +class TestDifyAPISQLAlchemyWorkflowNodeExecutionRepository: + def test_get_executions_by_workflow_run_keeps_paused_records(self, db_session_with_containers: Session) -> None: + tenant_id = str(uuid4()) + app_id = str(uuid4()) + workflow_id = str(uuid4()) + workflow_run_id = str(uuid4()) + created_by = str(uuid4()) + + other_tenant_id = str(uuid4()) + other_app_id = str(uuid4()) + + included_paused = _create_node_execution( + db_session_with_containers, + tenant_id=tenant_id, + app_id=app_id, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + status=WorkflowNodeExecutionStatus.PAUSED, + index=1, + created_by=created_by, + created_at_offset_seconds=0, + ) + included_succeeded = _create_node_execution( + db_session_with_containers, + tenant_id=tenant_id, + app_id=app_id, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + status=WorkflowNodeExecutionStatus.SUCCEEDED, + index=2, + created_by=created_by, + created_at_offset_seconds=1, + ) + _create_node_execution( + db_session_with_containers, + tenant_id=tenant_id, + app_id=app_id, + workflow_id=workflow_id, + workflow_run_id=str(uuid4()), + status=WorkflowNodeExecutionStatus.PAUSED, + index=3, + created_by=created_by, + created_at_offset_seconds=2, + ) + _create_node_execution( + db_session_with_containers, + tenant_id=other_tenant_id, + app_id=other_app_id, + workflow_id=str(uuid4()), + workflow_run_id=workflow_run_id, + status=WorkflowNodeExecutionStatus.PAUSED, + index=4, + created_by=str(uuid4()), + created_at_offset_seconds=3, + ) + db_session_with_containers.commit() + + engine = db_session_with_containers.get_bind() + assert isinstance(engine, Engine) + repository = DifyAPISQLAlchemyWorkflowNodeExecutionRepository(sessionmaker(bind=engine, expire_on_commit=False)) + + try: + results = repository.get_executions_by_workflow_run( + tenant_id=tenant_id, + app_id=app_id, + workflow_run_id=workflow_run_id, + ) + + assert len(results) == 2 + assert [result.id for result in results] == [included_paused.id, included_succeeded.id] + assert any(result.status == WorkflowNodeExecutionStatus.PAUSED for result in results) + assert all(result.tenant_id == tenant_id for result in results) + assert all(result.app_id == app_id for result in results) + assert all(result.workflow_run_id == workflow_run_id for result in results) + finally: + db_session_with_containers.execute( + delete(WorkflowNodeExecutionModel).where( + WorkflowNodeExecutionModel.tenant_id.in_([tenant_id, other_tenant_id]) + ) + ) + db_session_with_containers.commit() diff --git a/api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_node_execution_repository.py b/api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_node_execution_repository.py deleted file mode 100644 index ceb1406a4b..0000000000 --- a/api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_node_execution_repository.py +++ /dev/null @@ -1,40 +0,0 @@ -"""Unit tests for DifyAPISQLAlchemyWorkflowNodeExecutionRepository implementation.""" - -from unittest.mock import Mock - -from sqlalchemy.orm import Session, sessionmaker - -from repositories.sqlalchemy_api_workflow_node_execution_repository import ( - DifyAPISQLAlchemyWorkflowNodeExecutionRepository, -) - - -class TestDifyAPISQLAlchemyWorkflowNodeExecutionRepository: - def test_get_executions_by_workflow_run_keeps_paused_records(self): - mock_session = Mock(spec=Session) - execute_result = Mock() - execute_result.scalars.return_value.all.return_value = [] - mock_session.execute.return_value = execute_result - - session_maker = Mock(spec=sessionmaker) - context_manager = Mock() - context_manager.__enter__ = Mock(return_value=mock_session) - context_manager.__exit__ = Mock(return_value=None) - session_maker.return_value = context_manager - - repository = DifyAPISQLAlchemyWorkflowNodeExecutionRepository(session_maker) - - repository.get_executions_by_workflow_run( - tenant_id="tenant-123", - app_id="app-123", - workflow_run_id="workflow-run-123", - ) - - stmt = mock_session.execute.call_args[0][0] - where_clauses = list(getattr(stmt, "_where_criteria", []) or []) - where_strs = [str(clause).lower() for clause in where_clauses] - - assert any("tenant_id" in clause for clause in where_strs) - assert any("app_id" in clause for clause in where_strs) - assert any("workflow_run_id" in clause for clause in where_strs) - assert not any("paused" in clause for clause in where_strs)