mirror of
https://github.com/langgenius/dify.git
synced 2026-02-26 11:25:10 +00:00
Compare commits
1 Commits
inject-cod
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1add10aa92 |
@@ -110,6 +110,7 @@ ignore_imports =
|
||||
core.workflow.nodes.agent.agent_node -> core.model_manager
|
||||
core.workflow.nodes.agent.agent_node -> core.provider_manager
|
||||
core.workflow.nodes.agent.agent_node -> core.tools.tool_manager
|
||||
core.workflow.nodes.code.code_node -> core.helper.code_executor.code_executor
|
||||
core.workflow.nodes.datasource.datasource_node -> models.model
|
||||
core.workflow.nodes.datasource.datasource_node -> models.tools
|
||||
core.workflow.nodes.datasource.datasource_node -> services.datasource_provider_service
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
from collections.abc import Mapping
|
||||
from typing import TYPE_CHECKING, Any, final
|
||||
from typing import TYPE_CHECKING, final
|
||||
|
||||
from typing_extensions import override
|
||||
|
||||
from configs import dify_config
|
||||
from core.helper.code_executor.code_executor import CodeExecutionError, CodeExecutor
|
||||
from core.helper.code_executor.code_executor import CodeExecutor
|
||||
from core.helper.code_executor.code_node_provider import CodeNodeProvider
|
||||
from core.helper.ssrf_proxy import ssrf_proxy
|
||||
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
|
||||
@@ -14,8 +13,7 @@ from core.workflow.enums import NodeType
|
||||
from core.workflow.file.file_manager import file_manager
|
||||
from core.workflow.graph.graph import NodeFactory
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.code.code_node import CodeNode, WorkflowCodeExecutor
|
||||
from core.workflow.nodes.code.entities import CodeLanguage
|
||||
from core.workflow.nodes.code.code_node import CodeNode
|
||||
from core.workflow.nodes.code.limits import CodeNodeLimits
|
||||
from core.workflow.nodes.document_extractor import DocumentExtractorNode, UnstructuredApiConfig
|
||||
from core.workflow.nodes.http_request import HttpRequestNode, build_http_request_config
|
||||
@@ -29,24 +27,6 @@ if TYPE_CHECKING:
|
||||
from core.workflow.runtime import GraphRuntimeState
|
||||
|
||||
|
||||
class DefaultWorkflowCodeExecutor:
|
||||
def execute(
|
||||
self,
|
||||
*,
|
||||
language: CodeLanguage,
|
||||
code: str,
|
||||
inputs: Mapping[str, Any],
|
||||
) -> Mapping[str, Any]:
|
||||
return CodeExecutor.execute_workflow_code_template(
|
||||
language=language,
|
||||
code=code,
|
||||
inputs=inputs,
|
||||
)
|
||||
|
||||
def is_execution_error(self, error: Exception) -> bool:
|
||||
return isinstance(error, CodeExecutionError)
|
||||
|
||||
|
||||
@final
|
||||
class DifyNodeFactory(NodeFactory):
|
||||
"""
|
||||
@@ -63,7 +43,7 @@ class DifyNodeFactory(NodeFactory):
|
||||
) -> None:
|
||||
self.graph_init_params = graph_init_params
|
||||
self.graph_runtime_state = graph_runtime_state
|
||||
self._code_executor: WorkflowCodeExecutor = DefaultWorkflowCodeExecutor()
|
||||
self._code_executor: type[CodeExecutor] = CodeExecutor
|
||||
self._code_providers: tuple[type[CodeNodeProvider], ...] = CodeNode.default_code_providers()
|
||||
self._code_limits = CodeNodeLimits(
|
||||
max_string_length=dify_config.CODE_MAX_STRING_LENGTH,
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
from collections.abc import Mapping, Sequence
|
||||
from decimal import Decimal
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, Protocol, cast
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, cast
|
||||
|
||||
from core.helper.code_executor.code_executor import CodeExecutionError, CodeExecutor, CodeLanguage
|
||||
from core.helper.code_executor.code_node_provider import CodeNodeProvider
|
||||
from core.helper.code_executor.javascript.javascript_code_provider import JavascriptCodeProvider
|
||||
from core.helper.code_executor.python3.python3_code_provider import Python3CodeProvider
|
||||
@@ -10,7 +11,7 @@ from core.variables.types import SegmentType
|
||||
from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus
|
||||
from core.workflow.node_events import NodeRunResult
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.code.entities import CodeLanguage, CodeNodeData
|
||||
from core.workflow.nodes.code.entities import CodeNodeData
|
||||
from core.workflow.nodes.code.limits import CodeNodeLimits
|
||||
|
||||
from .exc import (
|
||||
@@ -24,18 +25,6 @@ if TYPE_CHECKING:
|
||||
from core.workflow.runtime import GraphRuntimeState
|
||||
|
||||
|
||||
class WorkflowCodeExecutor(Protocol):
|
||||
def execute(
|
||||
self,
|
||||
*,
|
||||
language: CodeLanguage,
|
||||
code: str,
|
||||
inputs: Mapping[str, Any],
|
||||
) -> Mapping[str, Any]: ...
|
||||
|
||||
def is_execution_error(self, error: Exception) -> bool: ...
|
||||
|
||||
|
||||
class CodeNode(Node[CodeNodeData]):
|
||||
node_type = NodeType.CODE
|
||||
_DEFAULT_CODE_PROVIDERS: ClassVar[tuple[type[CodeNodeProvider], ...]] = (
|
||||
@@ -51,7 +40,7 @@ class CodeNode(Node[CodeNodeData]):
|
||||
graph_init_params: "GraphInitParams",
|
||||
graph_runtime_state: "GraphRuntimeState",
|
||||
*,
|
||||
code_executor: WorkflowCodeExecutor,
|
||||
code_executor: type[CodeExecutor] | None = None,
|
||||
code_providers: Sequence[type[CodeNodeProvider]] | None = None,
|
||||
code_limits: CodeNodeLimits,
|
||||
) -> None:
|
||||
@@ -61,7 +50,7 @@ class CodeNode(Node[CodeNodeData]):
|
||||
graph_init_params=graph_init_params,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
)
|
||||
self._code_executor: WorkflowCodeExecutor = code_executor
|
||||
self._code_executor: type[CodeExecutor] = code_executor or CodeExecutor
|
||||
self._code_providers: tuple[type[CodeNodeProvider], ...] = (
|
||||
tuple(code_providers) if code_providers else self._DEFAULT_CODE_PROVIDERS
|
||||
)
|
||||
@@ -109,7 +98,7 @@ class CodeNode(Node[CodeNodeData]):
|
||||
# Run code
|
||||
try:
|
||||
_ = self._select_code_provider(code_language)
|
||||
result = self._code_executor.execute(
|
||||
result = self._code_executor.execute_workflow_code_template(
|
||||
language=code_language,
|
||||
code=code,
|
||||
inputs=variables,
|
||||
@@ -117,13 +106,7 @@ class CodeNode(Node[CodeNodeData]):
|
||||
|
||||
# Transform result
|
||||
result = self._transform_result(result=result, output_schema=self.node_data.outputs)
|
||||
except CodeNodeError as e:
|
||||
return NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.FAILED, inputs=variables, error=str(e), error_type=type(e).__name__
|
||||
)
|
||||
except Exception as e:
|
||||
if not self._code_executor.is_execution_error(e):
|
||||
raise
|
||||
except (CodeExecutionError, CodeNodeError) as e:
|
||||
return NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.FAILED, inputs=variables, error=str(e), error_type=type(e).__name__
|
||||
)
|
||||
|
||||
@@ -68,7 +68,6 @@ def init_code_node(code_config: dict):
|
||||
config=code_config,
|
||||
graph_init_params=init_params,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
code_executor=node_factory._code_executor,
|
||||
code_limits=CodeNodeLimits(
|
||||
max_string_length=dify_config.CODE_MAX_STRING_LENGTH,
|
||||
max_number=dify_config.CODE_MAX_NUMBER,
|
||||
|
||||
@@ -1,498 +0,0 @@
|
||||
"""
|
||||
Integration tests for SegmentService.get_segments method using a real database.
|
||||
|
||||
Tests the retrieval of document segments with pagination and filtering:
|
||||
- Basic pagination (page, limit)
|
||||
- Status filtering
|
||||
- Keyword search
|
||||
- Ordering by position and id (to avoid duplicate data)
|
||||
"""
|
||||
|
||||
from uuid import uuid4
|
||||
|
||||
from extensions.ext_database import db
|
||||
from models import Account, Tenant, TenantAccountJoin, TenantAccountRole
|
||||
from models.dataset import Dataset, DatasetPermissionEnum, Document, DocumentSegment
|
||||
from services.dataset_service import SegmentService
|
||||
|
||||
|
||||
class SegmentServiceTestDataFactory:
|
||||
"""
|
||||
Factory class for creating test data for segment tests.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def create_account_with_tenant(
|
||||
role: TenantAccountRole = TenantAccountRole.OWNER,
|
||||
tenant: Tenant | None = None,
|
||||
) -> tuple[Account, Tenant]:
|
||||
"""Create a real account and tenant with specified role."""
|
||||
account = Account(
|
||||
email=f"{uuid4()}@example.com",
|
||||
name=f"user-{uuid4()}",
|
||||
interface_language="en-US",
|
||||
status="active",
|
||||
)
|
||||
db.session.add(account)
|
||||
db.session.commit()
|
||||
|
||||
if tenant is None:
|
||||
tenant = Tenant(name=f"tenant-{uuid4()}", status="normal")
|
||||
db.session.add(tenant)
|
||||
db.session.commit()
|
||||
|
||||
join = TenantAccountJoin(
|
||||
tenant_id=tenant.id,
|
||||
account_id=account.id,
|
||||
role=role,
|
||||
current=True,
|
||||
)
|
||||
db.session.add(join)
|
||||
db.session.commit()
|
||||
|
||||
account.current_tenant = tenant
|
||||
return account, tenant
|
||||
|
||||
@staticmethod
|
||||
def create_dataset(tenant_id: str, created_by: str) -> Dataset:
|
||||
"""Create a real dataset."""
|
||||
dataset = Dataset(
|
||||
tenant_id=tenant_id,
|
||||
name=f"Test Dataset {uuid4()}",
|
||||
description="Test description",
|
||||
data_source_type="upload_file",
|
||||
indexing_technique="high_quality",
|
||||
created_by=created_by,
|
||||
permission=DatasetPermissionEnum.ONLY_ME,
|
||||
provider="vendor",
|
||||
retrieval_model={"top_k": 2},
|
||||
)
|
||||
db.session.add(dataset)
|
||||
db.session.commit()
|
||||
return dataset
|
||||
|
||||
@staticmethod
|
||||
def create_document(tenant_id: str, dataset_id: str, created_by: str) -> Document:
|
||||
"""Create a real document."""
|
||||
document = Document(
|
||||
tenant_id=tenant_id,
|
||||
dataset_id=dataset_id,
|
||||
position=1,
|
||||
data_source_type="upload_file",
|
||||
batch=f"batch-{uuid4()}",
|
||||
name=f"test-doc-{uuid4()}.txt",
|
||||
created_from="api",
|
||||
created_by=created_by,
|
||||
)
|
||||
db.session.add(document)
|
||||
db.session.commit()
|
||||
return document
|
||||
|
||||
@staticmethod
|
||||
def create_segment(
|
||||
tenant_id: str,
|
||||
dataset_id: str,
|
||||
document_id: str,
|
||||
created_by: str,
|
||||
position: int = 1,
|
||||
content: str = "Test content",
|
||||
status: str = "completed",
|
||||
word_count: int = 10,
|
||||
tokens: int = 15,
|
||||
) -> DocumentSegment:
|
||||
"""Create a real document segment."""
|
||||
segment = DocumentSegment(
|
||||
tenant_id=tenant_id,
|
||||
dataset_id=dataset_id,
|
||||
document_id=document_id,
|
||||
position=position,
|
||||
content=content,
|
||||
status=status,
|
||||
word_count=word_count,
|
||||
tokens=tokens,
|
||||
created_by=created_by,
|
||||
)
|
||||
db.session.add(segment)
|
||||
db.session.commit()
|
||||
return segment
|
||||
|
||||
|
||||
class TestSegmentServiceGetSegments:
|
||||
"""
|
||||
Comprehensive integration tests for SegmentService.get_segments method.
|
||||
|
||||
Tests cover:
|
||||
- Basic pagination functionality
|
||||
- Status list filtering
|
||||
- Keyword search filtering
|
||||
- Ordering (position + id for uniqueness)
|
||||
- Empty results
|
||||
- Combined filters
|
||||
"""
|
||||
|
||||
def test_get_segments_basic_pagination(self, db_session_with_containers):
|
||||
"""
|
||||
Test basic pagination functionality.
|
||||
|
||||
Verifies:
|
||||
- Query is built with document_id and tenant_id filters
|
||||
- Pagination uses correct page and limit parameters
|
||||
- Returns segments and total count
|
||||
"""
|
||||
# Arrange
|
||||
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
|
||||
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
|
||||
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
|
||||
|
||||
segment1 = SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=1,
|
||||
content="First segment",
|
||||
)
|
||||
segment2 = SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=2,
|
||||
content="Second segment",
|
||||
)
|
||||
|
||||
# Act
|
||||
items, total = SegmentService.get_segments(document_id=document.id, tenant_id=tenant.id, page=1, limit=20)
|
||||
|
||||
# Assert
|
||||
assert len(items) == 2
|
||||
assert total == 2
|
||||
assert items[0].id == segment1.id
|
||||
assert items[1].id == segment2.id
|
||||
|
||||
def test_get_segments_with_status_filter(self, db_session_with_containers):
|
||||
"""
|
||||
Test filtering by status list.
|
||||
|
||||
Verifies:
|
||||
- Status list filter is applied to query
|
||||
- Only segments with matching status are returned
|
||||
"""
|
||||
# Arrange
|
||||
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
|
||||
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
|
||||
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
|
||||
|
||||
SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=1,
|
||||
status="completed",
|
||||
)
|
||||
SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=2,
|
||||
status="indexing",
|
||||
)
|
||||
SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=3,
|
||||
status="waiting",
|
||||
)
|
||||
|
||||
# Act
|
||||
items, total = SegmentService.get_segments(
|
||||
document_id=document.id, tenant_id=tenant.id, status_list=["completed", "indexing"]
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert len(items) == 2
|
||||
assert total == 2
|
||||
statuses = {item.status for item in items}
|
||||
assert statuses == {"completed", "indexing"}
|
||||
|
||||
def test_get_segments_with_empty_status_list(self, db_session_with_containers):
|
||||
"""
|
||||
Test with empty status list.
|
||||
|
||||
Verifies:
|
||||
- Empty status list is handled correctly
|
||||
- No status filter is applied to avoid WHERE false condition
|
||||
"""
|
||||
# Arrange
|
||||
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
|
||||
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
|
||||
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
|
||||
|
||||
SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=1,
|
||||
status="completed",
|
||||
)
|
||||
SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=2,
|
||||
status="indexing",
|
||||
)
|
||||
|
||||
# Act
|
||||
items, total = SegmentService.get_segments(document_id=document.id, tenant_id=tenant.id, status_list=[])
|
||||
|
||||
# Assert — empty status_list should return all segments (no status filter applied)
|
||||
assert len(items) == 2
|
||||
assert total == 2
|
||||
|
||||
def test_get_segments_with_keyword_search(self, db_session_with_containers):
|
||||
"""
|
||||
Test keyword search functionality.
|
||||
|
||||
Verifies:
|
||||
- Keyword filter uses ilike for case-insensitive search
|
||||
- Search pattern includes wildcards (%keyword%)
|
||||
"""
|
||||
# Arrange
|
||||
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
|
||||
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
|
||||
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
|
||||
|
||||
SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=1,
|
||||
content="This contains search term in the middle",
|
||||
)
|
||||
SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=2,
|
||||
content="This does not match",
|
||||
)
|
||||
|
||||
# Act
|
||||
items, total = SegmentService.get_segments(document_id=document.id, tenant_id=tenant.id, keyword="search term")
|
||||
|
||||
# Assert
|
||||
assert len(items) == 1
|
||||
assert total == 1
|
||||
assert "search term" in items[0].content
|
||||
|
||||
def test_get_segments_ordering_by_position_and_id(self, db_session_with_containers):
|
||||
"""
|
||||
Test ordering by position and id.
|
||||
|
||||
Verifies:
|
||||
- Results are ordered by position ASC
|
||||
- Results are secondarily ordered by id ASC to ensure uniqueness
|
||||
- This prevents duplicate data across pages when positions are not unique
|
||||
"""
|
||||
# Arrange
|
||||
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
|
||||
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
|
||||
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
|
||||
|
||||
# Create segments with different positions
|
||||
seg_pos2 = SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=2,
|
||||
content="Position 2",
|
||||
)
|
||||
seg_pos1 = SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=1,
|
||||
content="Position 1",
|
||||
)
|
||||
seg_pos3 = SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=3,
|
||||
content="Position 3",
|
||||
)
|
||||
|
||||
# Act
|
||||
items, total = SegmentService.get_segments(document_id=document.id, tenant_id=tenant.id)
|
||||
|
||||
# Assert — segments should be ordered by position ASC
|
||||
assert len(items) == 3
|
||||
assert total == 3
|
||||
assert items[0].id == seg_pos1.id
|
||||
assert items[1].id == seg_pos2.id
|
||||
assert items[2].id == seg_pos3.id
|
||||
|
||||
def test_get_segments_empty_results(self, db_session_with_containers):
|
||||
"""
|
||||
Test when no segments match the criteria.
|
||||
|
||||
Verifies:
|
||||
- Empty list is returned for items
|
||||
- Total count is 0
|
||||
"""
|
||||
# Arrange
|
||||
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
|
||||
non_existent_doc_id = str(uuid4())
|
||||
|
||||
# Act
|
||||
items, total = SegmentService.get_segments(document_id=non_existent_doc_id, tenant_id=tenant.id)
|
||||
|
||||
# Assert
|
||||
assert items == []
|
||||
assert total == 0
|
||||
|
||||
def test_get_segments_combined_filters(self, db_session_with_containers):
|
||||
"""
|
||||
Test with multiple filters combined.
|
||||
|
||||
Verifies:
|
||||
- All filters work together correctly
|
||||
- Status list and keyword search both applied
|
||||
"""
|
||||
# Arrange
|
||||
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
|
||||
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
|
||||
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
|
||||
|
||||
# Create segments with various statuses and content
|
||||
SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=1,
|
||||
status="completed",
|
||||
content="This is important information",
|
||||
)
|
||||
SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=2,
|
||||
status="indexing",
|
||||
content="This is also important",
|
||||
)
|
||||
SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=3,
|
||||
status="completed",
|
||||
content="This is irrelevant",
|
||||
)
|
||||
|
||||
# Act — filter by status=completed AND keyword=important
|
||||
items, total = SegmentService.get_segments(
|
||||
document_id=document.id,
|
||||
tenant_id=tenant.id,
|
||||
status_list=["completed"],
|
||||
keyword="important",
|
||||
page=1,
|
||||
limit=10,
|
||||
)
|
||||
|
||||
# Assert — only the first segment matches both filters
|
||||
assert len(items) == 1
|
||||
assert total == 1
|
||||
assert items[0].status == "completed"
|
||||
assert "important" in items[0].content
|
||||
|
||||
def test_get_segments_with_none_status_list(self, db_session_with_containers):
|
||||
"""
|
||||
Test with None status list.
|
||||
|
||||
Verifies:
|
||||
- None status list is handled correctly
|
||||
- No status filter is applied
|
||||
"""
|
||||
# Arrange
|
||||
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
|
||||
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
|
||||
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
|
||||
|
||||
SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=1,
|
||||
status="completed",
|
||||
)
|
||||
SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=2,
|
||||
status="waiting",
|
||||
)
|
||||
|
||||
# Act
|
||||
items, total = SegmentService.get_segments(
|
||||
document_id=document.id,
|
||||
tenant_id=tenant.id,
|
||||
status_list=None,
|
||||
)
|
||||
|
||||
# Assert — None status_list should return all segments
|
||||
assert len(items) == 2
|
||||
assert total == 2
|
||||
|
||||
def test_get_segments_pagination_max_per_page_limit(self, db_session_with_containers):
|
||||
"""
|
||||
Test that max_per_page is correctly set to 100.
|
||||
|
||||
Verifies:
|
||||
- max_per_page parameter is set to 100
|
||||
- This prevents excessive page sizes
|
||||
"""
|
||||
# Arrange
|
||||
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
|
||||
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
|
||||
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
|
||||
|
||||
# Create 105 segments to exceed max_per_page of 100
|
||||
for i in range(105):
|
||||
SegmentServiceTestDataFactory.create_segment(
|
||||
tenant_id=tenant.id,
|
||||
dataset_id=dataset.id,
|
||||
document_id=document.id,
|
||||
created_by=owner.id,
|
||||
position=i + 1,
|
||||
content=f"Segment {i + 1}",
|
||||
)
|
||||
|
||||
# Act — request limit=200, but max_per_page=100 should cap it
|
||||
items, total = SegmentService.get_segments(
|
||||
document_id=document.id,
|
||||
tenant_id=tenant.id,
|
||||
limit=200,
|
||||
)
|
||||
|
||||
# Assert — total is 105, but items per page capped at 100
|
||||
assert total == 105
|
||||
assert len(items) == 100
|
||||
@@ -1,53 +0,0 @@
|
||||
"""
|
||||
Testcontainers integration tests for workflow run restore functionality.
|
||||
"""
|
||||
|
||||
from uuid import uuid4
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
from models.workflow import WorkflowPause
|
||||
from services.retention.workflow_run.restore_archived_workflow_run import WorkflowRunRestore
|
||||
|
||||
|
||||
class TestWorkflowRunRestore:
|
||||
"""Tests for the WorkflowRunRestore class."""
|
||||
|
||||
def test_restore_table_records_returns_rowcount(self, db_session_with_containers):
|
||||
"""Restore should return inserted rowcount."""
|
||||
restore = WorkflowRunRestore()
|
||||
record_id = str(uuid4())
|
||||
records = [
|
||||
{
|
||||
"id": record_id,
|
||||
"workflow_id": str(uuid4()),
|
||||
"workflow_run_id": str(uuid4()),
|
||||
"state_object_key": f"workflow-state-{uuid4()}.json",
|
||||
"created_at": "2024-01-01T00:00:00",
|
||||
"updated_at": "2024-01-01T00:00:00",
|
||||
}
|
||||
]
|
||||
|
||||
restored = restore._restore_table_records(
|
||||
db_session_with_containers,
|
||||
"workflow_pauses",
|
||||
records,
|
||||
schema_version="1.0",
|
||||
)
|
||||
|
||||
assert restored == 1
|
||||
restored_pause = db_session_with_containers.scalar(select(WorkflowPause).where(WorkflowPause.id == record_id))
|
||||
assert restored_pause is not None
|
||||
|
||||
def test_restore_table_records_unknown_table(self, db_session_with_containers):
|
||||
"""Unknown table names should be ignored gracefully."""
|
||||
restore = WorkflowRunRestore()
|
||||
|
||||
restored = restore._restore_table_records(
|
||||
db_session_with_containers,
|
||||
"unknown_table",
|
||||
[{"id": str(uuid4())}],
|
||||
schema_version="1.0",
|
||||
)
|
||||
|
||||
assert restored == 0
|
||||
@@ -1,436 +0,0 @@
|
||||
from datetime import datetime, timedelta
|
||||
from uuid import uuid4
|
||||
|
||||
from sqlalchemy import Engine, select
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from core.workflow.enums import WorkflowNodeExecutionStatus
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from models.enums import CreatorUserRole
|
||||
from models.workflow import WorkflowNodeExecutionModel
|
||||
from repositories.sqlalchemy_api_workflow_node_execution_repository import (
|
||||
DifyAPISQLAlchemyWorkflowNodeExecutionRepository,
|
||||
)
|
||||
|
||||
|
||||
class TestSQLAlchemyWorkflowNodeExecutionServiceRepository:
|
||||
@staticmethod
|
||||
def _create_repository(db_session_with_containers: Session) -> DifyAPISQLAlchemyWorkflowNodeExecutionRepository:
|
||||
engine = db_session_with_containers.get_bind()
|
||||
assert isinstance(engine, Engine)
|
||||
return DifyAPISQLAlchemyWorkflowNodeExecutionRepository(
|
||||
session_maker=sessionmaker(bind=engine, expire_on_commit=False)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _create_execution(
|
||||
db_session_with_containers: Session,
|
||||
*,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
workflow_id: str,
|
||||
workflow_run_id: str,
|
||||
node_id: str,
|
||||
status: WorkflowNodeExecutionStatus,
|
||||
index: int,
|
||||
created_at: datetime,
|
||||
) -> WorkflowNodeExecutionModel:
|
||||
execution = WorkflowNodeExecutionModel(
|
||||
id=str(uuid4()),
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
triggered_from="workflow-run",
|
||||
workflow_run_id=workflow_run_id,
|
||||
index=index,
|
||||
predecessor_node_id=None,
|
||||
node_execution_id=None,
|
||||
node_id=node_id,
|
||||
node_type="llm",
|
||||
title=f"Node {index}",
|
||||
inputs="{}",
|
||||
process_data="{}",
|
||||
outputs="{}",
|
||||
status=status,
|
||||
error=None,
|
||||
elapsed_time=0.0,
|
||||
execution_metadata="{}",
|
||||
created_at=created_at,
|
||||
created_by_role=CreatorUserRole.ACCOUNT,
|
||||
created_by=str(uuid4()),
|
||||
finished_at=None,
|
||||
)
|
||||
db_session_with_containers.add(execution)
|
||||
db_session_with_containers.commit()
|
||||
return execution
|
||||
|
||||
def test_get_node_last_execution_found(self, db_session_with_containers):
|
||||
"""Test getting the last execution for a node when it exists."""
|
||||
# Arrange
|
||||
tenant_id = str(uuid4())
|
||||
app_id = str(uuid4())
|
||||
workflow_id = str(uuid4())
|
||||
node_id = "node-202"
|
||||
workflow_run_id = str(uuid4())
|
||||
now = naive_utc_now()
|
||||
self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id=node_id,
|
||||
status=WorkflowNodeExecutionStatus.PAUSED,
|
||||
index=1,
|
||||
created_at=now - timedelta(minutes=2),
|
||||
)
|
||||
expected = self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id=node_id,
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=2,
|
||||
created_at=now - timedelta(minutes=1),
|
||||
)
|
||||
repository = self._create_repository(db_session_with_containers)
|
||||
|
||||
# Act
|
||||
result = repository.get_node_last_execution(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
node_id=node_id,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result is not None
|
||||
assert result.id == expected.id
|
||||
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
|
||||
|
||||
def test_get_node_last_execution_not_found(self, db_session_with_containers):
|
||||
"""Test getting the last execution for a node when it doesn't exist."""
|
||||
# Arrange
|
||||
tenant_id = str(uuid4())
|
||||
app_id = str(uuid4())
|
||||
workflow_id = str(uuid4())
|
||||
repository = self._create_repository(db_session_with_containers)
|
||||
|
||||
# Act
|
||||
result = repository.get_node_last_execution(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
node_id="node-202",
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result is None
|
||||
|
||||
def test_get_executions_by_workflow_run_empty(self, db_session_with_containers):
|
||||
"""Test getting executions for a workflow run when none exist."""
|
||||
# Arrange
|
||||
tenant_id = str(uuid4())
|
||||
app_id = str(uuid4())
|
||||
workflow_run_id = str(uuid4())
|
||||
repository = self._create_repository(db_session_with_containers)
|
||||
|
||||
# Act
|
||||
result = repository.get_executions_by_workflow_run(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result == []
|
||||
|
||||
def test_get_execution_by_id_found(self, db_session_with_containers):
|
||||
"""Test getting execution by ID when it exists."""
|
||||
# Arrange
|
||||
execution = self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=str(uuid4()),
|
||||
app_id=str(uuid4()),
|
||||
workflow_id=str(uuid4()),
|
||||
workflow_run_id=str(uuid4()),
|
||||
node_id="node-202",
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=1,
|
||||
created_at=naive_utc_now(),
|
||||
)
|
||||
repository = self._create_repository(db_session_with_containers)
|
||||
|
||||
# Act
|
||||
result = repository.get_execution_by_id(execution.id)
|
||||
|
||||
# Assert
|
||||
assert result is not None
|
||||
assert result.id == execution.id
|
||||
|
||||
def test_get_execution_by_id_not_found(self, db_session_with_containers):
|
||||
"""Test getting execution by ID when it doesn't exist."""
|
||||
# Arrange
|
||||
repository = self._create_repository(db_session_with_containers)
|
||||
missing_execution_id = str(uuid4())
|
||||
|
||||
# Act
|
||||
result = repository.get_execution_by_id(missing_execution_id)
|
||||
|
||||
# Assert
|
||||
assert result is None
|
||||
|
||||
def test_delete_expired_executions(self, db_session_with_containers):
|
||||
"""Test deleting expired executions."""
|
||||
# Arrange
|
||||
tenant_id = str(uuid4())
|
||||
app_id = str(uuid4())
|
||||
workflow_id = str(uuid4())
|
||||
workflow_run_id = str(uuid4())
|
||||
now = naive_utc_now()
|
||||
before_date = now - timedelta(days=1)
|
||||
old_execution_1 = self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id="node-1",
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=1,
|
||||
created_at=now - timedelta(days=3),
|
||||
)
|
||||
old_execution_2 = self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id="node-2",
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=2,
|
||||
created_at=now - timedelta(days=2),
|
||||
)
|
||||
kept_execution = self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id="node-3",
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=3,
|
||||
created_at=now,
|
||||
)
|
||||
old_execution_1_id = old_execution_1.id
|
||||
old_execution_2_id = old_execution_2.id
|
||||
kept_execution_id = kept_execution.id
|
||||
repository = self._create_repository(db_session_with_containers)
|
||||
|
||||
# Act
|
||||
result = repository.delete_expired_executions(
|
||||
tenant_id=tenant_id,
|
||||
before_date=before_date,
|
||||
batch_size=1000,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result == 2
|
||||
remaining_ids = {
|
||||
execution.id
|
||||
for execution in db_session_with_containers.scalars(
|
||||
select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.tenant_id == tenant_id)
|
||||
).all()
|
||||
}
|
||||
assert old_execution_1_id not in remaining_ids
|
||||
assert old_execution_2_id not in remaining_ids
|
||||
assert kept_execution_id in remaining_ids
|
||||
|
||||
def test_delete_executions_by_app(self, db_session_with_containers):
|
||||
"""Test deleting executions by app."""
|
||||
# Arrange
|
||||
tenant_id = str(uuid4())
|
||||
target_app_id = str(uuid4())
|
||||
workflow_id = str(uuid4())
|
||||
workflow_run_id = str(uuid4())
|
||||
created_at = naive_utc_now()
|
||||
deleted_1 = self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=target_app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id="node-1",
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=1,
|
||||
created_at=created_at,
|
||||
)
|
||||
deleted_2 = self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=target_app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id="node-2",
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=2,
|
||||
created_at=created_at,
|
||||
)
|
||||
kept = self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=str(uuid4()),
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id="node-3",
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=3,
|
||||
created_at=created_at,
|
||||
)
|
||||
deleted_1_id = deleted_1.id
|
||||
deleted_2_id = deleted_2.id
|
||||
kept_id = kept.id
|
||||
repository = self._create_repository(db_session_with_containers)
|
||||
|
||||
# Act
|
||||
result = repository.delete_executions_by_app(
|
||||
tenant_id=tenant_id,
|
||||
app_id=target_app_id,
|
||||
batch_size=1000,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result == 2
|
||||
remaining_ids = {
|
||||
execution.id
|
||||
for execution in db_session_with_containers.scalars(
|
||||
select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.tenant_id == tenant_id)
|
||||
).all()
|
||||
}
|
||||
assert deleted_1_id not in remaining_ids
|
||||
assert deleted_2_id not in remaining_ids
|
||||
assert kept_id in remaining_ids
|
||||
|
||||
def test_get_expired_executions_batch(self, db_session_with_containers):
|
||||
"""Test getting expired executions batch for backup."""
|
||||
# Arrange
|
||||
tenant_id = str(uuid4())
|
||||
app_id = str(uuid4())
|
||||
workflow_id = str(uuid4())
|
||||
workflow_run_id = str(uuid4())
|
||||
now = naive_utc_now()
|
||||
before_date = now - timedelta(days=1)
|
||||
old_execution_1 = self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id="node-1",
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=1,
|
||||
created_at=now - timedelta(days=3),
|
||||
)
|
||||
old_execution_2 = self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id="node-2",
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=2,
|
||||
created_at=now - timedelta(days=2),
|
||||
)
|
||||
self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id="node-3",
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=3,
|
||||
created_at=now,
|
||||
)
|
||||
repository = self._create_repository(db_session_with_containers)
|
||||
|
||||
# Act
|
||||
result = repository.get_expired_executions_batch(
|
||||
tenant_id=tenant_id,
|
||||
before_date=before_date,
|
||||
batch_size=1000,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert len(result) == 2
|
||||
result_ids = {execution.id for execution in result}
|
||||
assert old_execution_1.id in result_ids
|
||||
assert old_execution_2.id in result_ids
|
||||
|
||||
def test_delete_executions_by_ids(self, db_session_with_containers):
|
||||
"""Test deleting executions by IDs."""
|
||||
# Arrange
|
||||
tenant_id = str(uuid4())
|
||||
app_id = str(uuid4())
|
||||
workflow_id = str(uuid4())
|
||||
workflow_run_id = str(uuid4())
|
||||
created_at = naive_utc_now()
|
||||
execution_1 = self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id="node-1",
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=1,
|
||||
created_at=created_at,
|
||||
)
|
||||
execution_2 = self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id="node-2",
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=2,
|
||||
created_at=created_at,
|
||||
)
|
||||
execution_3 = self._create_execution(
|
||||
db_session_with_containers,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id="node-3",
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
index=3,
|
||||
created_at=created_at,
|
||||
)
|
||||
repository = self._create_repository(db_session_with_containers)
|
||||
execution_ids = [execution_1.id, execution_2.id, execution_3.id]
|
||||
|
||||
# Act
|
||||
result = repository.delete_executions_by_ids(execution_ids)
|
||||
|
||||
# Assert
|
||||
assert result == 3
|
||||
remaining = db_session_with_containers.scalars(
|
||||
select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(execution_ids))
|
||||
).all()
|
||||
assert remaining == []
|
||||
|
||||
def test_delete_executions_by_ids_empty_list(self, db_session_with_containers):
|
||||
"""Test deleting executions with empty ID list."""
|
||||
# Arrange
|
||||
repository = self._create_repository(db_session_with_containers)
|
||||
|
||||
# Act
|
||||
result = repository.delete_executions_by_ids([])
|
||||
|
||||
# Assert
|
||||
assert result == 0
|
||||
@@ -24,16 +24,6 @@ DEFAULT_CODE_LIMITS = CodeNodeLimits(
|
||||
)
|
||||
|
||||
|
||||
class _NoopCodeExecutor:
|
||||
def execute(self, *, language: object, code: str, inputs: dict[str, object]) -> dict[str, object]:
|
||||
_ = (language, code, inputs)
|
||||
return {}
|
||||
|
||||
def is_execution_error(self, error: Exception) -> bool:
|
||||
_ = error
|
||||
return False
|
||||
|
||||
|
||||
class TestMockTemplateTransformNode:
|
||||
"""Test cases for MockTemplateTransformNode."""
|
||||
|
||||
@@ -329,7 +319,6 @@ class TestMockCodeNode:
|
||||
graph_init_params=graph_init_params,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
mock_config=mock_config,
|
||||
code_executor=_NoopCodeExecutor(),
|
||||
code_limits=DEFAULT_CODE_LIMITS,
|
||||
)
|
||||
|
||||
@@ -395,7 +384,6 @@ class TestMockCodeNode:
|
||||
graph_init_params=graph_init_params,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
mock_config=mock_config,
|
||||
code_executor=_NoopCodeExecutor(),
|
||||
code_limits=DEFAULT_CODE_LIMITS,
|
||||
)
|
||||
|
||||
@@ -465,7 +453,6 @@ class TestMockCodeNode:
|
||||
graph_init_params=graph_init_params,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
mock_config=mock_config,
|
||||
code_executor=_NoopCodeExecutor(),
|
||||
code_limits=DEFAULT_CODE_LIMITS,
|
||||
)
|
||||
|
||||
|
||||
@@ -0,0 +1,472 @@
|
||||
"""
|
||||
Unit tests for SegmentService.get_segments method.
|
||||
|
||||
Tests the retrieval of document segments with pagination and filtering:
|
||||
- Basic pagination (page, limit)
|
||||
- Status filtering
|
||||
- Keyword search
|
||||
- Ordering by position and id (to avoid duplicate data)
|
||||
"""
|
||||
|
||||
from unittest.mock import Mock, create_autospec, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from models.dataset import DocumentSegment
|
||||
|
||||
|
||||
class SegmentServiceTestDataFactory:
|
||||
"""
|
||||
Factory class for creating test data and mock objects for segment tests.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def create_segment_mock(
|
||||
segment_id: str = "segment-123",
|
||||
document_id: str = "doc-123",
|
||||
tenant_id: str = "tenant-123",
|
||||
dataset_id: str = "dataset-123",
|
||||
position: int = 1,
|
||||
content: str = "Test content",
|
||||
status: str = "completed",
|
||||
**kwargs,
|
||||
) -> Mock:
|
||||
"""
|
||||
Create a mock document segment.
|
||||
|
||||
Args:
|
||||
segment_id: Unique identifier for the segment
|
||||
document_id: Parent document ID
|
||||
tenant_id: Tenant ID the segment belongs to
|
||||
dataset_id: Parent dataset ID
|
||||
position: Position within the document
|
||||
content: Segment text content
|
||||
status: Indexing status
|
||||
**kwargs: Additional attributes
|
||||
|
||||
Returns:
|
||||
Mock: DocumentSegment mock object
|
||||
"""
|
||||
segment = create_autospec(DocumentSegment, instance=True)
|
||||
segment.id = segment_id
|
||||
segment.document_id = document_id
|
||||
segment.tenant_id = tenant_id
|
||||
segment.dataset_id = dataset_id
|
||||
segment.position = position
|
||||
segment.content = content
|
||||
segment.status = status
|
||||
for key, value in kwargs.items():
|
||||
setattr(segment, key, value)
|
||||
return segment
|
||||
|
||||
|
||||
class TestSegmentServiceGetSegments:
|
||||
"""
|
||||
Comprehensive unit tests for SegmentService.get_segments method.
|
||||
|
||||
Tests cover:
|
||||
- Basic pagination functionality
|
||||
- Status list filtering
|
||||
- Keyword search filtering
|
||||
- Ordering (position + id for uniqueness)
|
||||
- Empty results
|
||||
- Combined filters
|
||||
"""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_segment_service_dependencies(self):
|
||||
"""
|
||||
Common mock setup for segment service dependencies.
|
||||
|
||||
Patches:
|
||||
- db: Database operations and pagination
|
||||
- select: SQLAlchemy query builder
|
||||
"""
|
||||
with (
|
||||
patch("services.dataset_service.db") as mock_db,
|
||||
patch("services.dataset_service.select") as mock_select,
|
||||
):
|
||||
yield {
|
||||
"db": mock_db,
|
||||
"select": mock_select,
|
||||
}
|
||||
|
||||
def test_get_segments_basic_pagination(self, mock_segment_service_dependencies):
|
||||
"""
|
||||
Test basic pagination functionality.
|
||||
|
||||
Verifies:
|
||||
- Query is built with document_id and tenant_id filters
|
||||
- Pagination uses correct page and limit parameters
|
||||
- Returns segments and total count
|
||||
"""
|
||||
# Arrange
|
||||
document_id = "doc-123"
|
||||
tenant_id = "tenant-123"
|
||||
page = 1
|
||||
limit = 20
|
||||
|
||||
# Create mock segments
|
||||
segment1 = SegmentServiceTestDataFactory.create_segment_mock(
|
||||
segment_id="seg-1", position=1, content="First segment"
|
||||
)
|
||||
segment2 = SegmentServiceTestDataFactory.create_segment_mock(
|
||||
segment_id="seg-2", position=2, content="Second segment"
|
||||
)
|
||||
|
||||
# Mock pagination result
|
||||
mock_paginated = Mock()
|
||||
mock_paginated.items = [segment1, segment2]
|
||||
mock_paginated.total = 2
|
||||
|
||||
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
|
||||
|
||||
# Mock select builder
|
||||
mock_query = Mock()
|
||||
mock_segment_service_dependencies["select"].return_value = mock_query
|
||||
mock_query.where.return_value = mock_query
|
||||
mock_query.order_by.return_value = mock_query
|
||||
|
||||
# Act
|
||||
from services.dataset_service import SegmentService
|
||||
|
||||
items, total = SegmentService.get_segments(document_id=document_id, tenant_id=tenant_id, page=page, limit=limit)
|
||||
|
||||
# Assert
|
||||
assert len(items) == 2
|
||||
assert total == 2
|
||||
assert items[0].id == "seg-1"
|
||||
assert items[1].id == "seg-2"
|
||||
mock_segment_service_dependencies["db"].paginate.assert_called_once()
|
||||
call_kwargs = mock_segment_service_dependencies["db"].paginate.call_args[1]
|
||||
assert call_kwargs["page"] == page
|
||||
assert call_kwargs["per_page"] == limit
|
||||
assert call_kwargs["max_per_page"] == 100
|
||||
assert call_kwargs["error_out"] is False
|
||||
|
||||
def test_get_segments_with_status_filter(self, mock_segment_service_dependencies):
|
||||
"""
|
||||
Test filtering by status list.
|
||||
|
||||
Verifies:
|
||||
- Status list filter is applied to query
|
||||
- Only segments with matching status are returned
|
||||
"""
|
||||
# Arrange
|
||||
document_id = "doc-123"
|
||||
tenant_id = "tenant-123"
|
||||
status_list = ["completed", "indexing"]
|
||||
|
||||
segment1 = SegmentServiceTestDataFactory.create_segment_mock(segment_id="seg-1", status="completed")
|
||||
segment2 = SegmentServiceTestDataFactory.create_segment_mock(segment_id="seg-2", status="indexing")
|
||||
|
||||
mock_paginated = Mock()
|
||||
mock_paginated.items = [segment1, segment2]
|
||||
mock_paginated.total = 2
|
||||
|
||||
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
|
||||
|
||||
mock_query = Mock()
|
||||
mock_segment_service_dependencies["select"].return_value = mock_query
|
||||
mock_query.where.return_value = mock_query
|
||||
mock_query.order_by.return_value = mock_query
|
||||
|
||||
# Act
|
||||
from services.dataset_service import SegmentService
|
||||
|
||||
items, total = SegmentService.get_segments(
|
||||
document_id=document_id, tenant_id=tenant_id, status_list=status_list
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert len(items) == 2
|
||||
assert total == 2
|
||||
# Verify where was called multiple times (base filters + status filter)
|
||||
assert mock_query.where.call_count >= 2
|
||||
|
||||
def test_get_segments_with_empty_status_list(self, mock_segment_service_dependencies):
|
||||
"""
|
||||
Test with empty status list.
|
||||
|
||||
Verifies:
|
||||
- Empty status list is handled correctly
|
||||
- No status filter is applied to avoid WHERE false condition
|
||||
"""
|
||||
# Arrange
|
||||
document_id = "doc-123"
|
||||
tenant_id = "tenant-123"
|
||||
status_list = []
|
||||
|
||||
segment = SegmentServiceTestDataFactory.create_segment_mock(segment_id="seg-1")
|
||||
|
||||
mock_paginated = Mock()
|
||||
mock_paginated.items = [segment]
|
||||
mock_paginated.total = 1
|
||||
|
||||
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
|
||||
|
||||
mock_query = Mock()
|
||||
mock_segment_service_dependencies["select"].return_value = mock_query
|
||||
mock_query.where.return_value = mock_query
|
||||
mock_query.order_by.return_value = mock_query
|
||||
|
||||
# Act
|
||||
from services.dataset_service import SegmentService
|
||||
|
||||
items, total = SegmentService.get_segments(
|
||||
document_id=document_id, tenant_id=tenant_id, status_list=status_list
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert len(items) == 1
|
||||
assert total == 1
|
||||
# Should only be called once (base filters, no status filter)
|
||||
assert mock_query.where.call_count == 1
|
||||
|
||||
def test_get_segments_with_keyword_search(self, mock_segment_service_dependencies):
|
||||
"""
|
||||
Test keyword search functionality.
|
||||
|
||||
Verifies:
|
||||
- Keyword filter uses ilike for case-insensitive search
|
||||
- Search pattern includes wildcards (%keyword%)
|
||||
"""
|
||||
# Arrange
|
||||
document_id = "doc-123"
|
||||
tenant_id = "tenant-123"
|
||||
keyword = "search term"
|
||||
|
||||
segment = SegmentServiceTestDataFactory.create_segment_mock(
|
||||
segment_id="seg-1", content="This contains search term"
|
||||
)
|
||||
|
||||
mock_paginated = Mock()
|
||||
mock_paginated.items = [segment]
|
||||
mock_paginated.total = 1
|
||||
|
||||
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
|
||||
|
||||
mock_query = Mock()
|
||||
mock_segment_service_dependencies["select"].return_value = mock_query
|
||||
mock_query.where.return_value = mock_query
|
||||
mock_query.order_by.return_value = mock_query
|
||||
|
||||
# Act
|
||||
from services.dataset_service import SegmentService
|
||||
|
||||
items, total = SegmentService.get_segments(document_id=document_id, tenant_id=tenant_id, keyword=keyword)
|
||||
|
||||
# Assert
|
||||
assert len(items) == 1
|
||||
assert total == 1
|
||||
# Verify where was called for base filters + keyword filter
|
||||
assert mock_query.where.call_count == 2
|
||||
|
||||
def test_get_segments_ordering_by_position_and_id(self, mock_segment_service_dependencies):
|
||||
"""
|
||||
Test ordering by position and id.
|
||||
|
||||
Verifies:
|
||||
- Results are ordered by position ASC
|
||||
- Results are secondarily ordered by id ASC to ensure uniqueness
|
||||
- This prevents duplicate data across pages when positions are not unique
|
||||
"""
|
||||
# Arrange
|
||||
document_id = "doc-123"
|
||||
tenant_id = "tenant-123"
|
||||
|
||||
# Create segments with same position but different ids
|
||||
segment1 = SegmentServiceTestDataFactory.create_segment_mock(
|
||||
segment_id="seg-1", position=1, content="Content 1"
|
||||
)
|
||||
segment2 = SegmentServiceTestDataFactory.create_segment_mock(
|
||||
segment_id="seg-2", position=1, content="Content 2"
|
||||
)
|
||||
segment3 = SegmentServiceTestDataFactory.create_segment_mock(
|
||||
segment_id="seg-3", position=2, content="Content 3"
|
||||
)
|
||||
|
||||
mock_paginated = Mock()
|
||||
mock_paginated.items = [segment1, segment2, segment3]
|
||||
mock_paginated.total = 3
|
||||
|
||||
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
|
||||
|
||||
mock_query = Mock()
|
||||
mock_segment_service_dependencies["select"].return_value = mock_query
|
||||
mock_query.where.return_value = mock_query
|
||||
mock_query.order_by.return_value = mock_query
|
||||
|
||||
# Act
|
||||
from services.dataset_service import SegmentService
|
||||
|
||||
items, total = SegmentService.get_segments(document_id=document_id, tenant_id=tenant_id)
|
||||
|
||||
# Assert
|
||||
assert len(items) == 3
|
||||
assert total == 3
|
||||
mock_query.order_by.assert_called_once()
|
||||
|
||||
def test_get_segments_empty_results(self, mock_segment_service_dependencies):
|
||||
"""
|
||||
Test when no segments match the criteria.
|
||||
|
||||
Verifies:
|
||||
- Empty list is returned for items
|
||||
- Total count is 0
|
||||
"""
|
||||
# Arrange
|
||||
document_id = "non-existent-doc"
|
||||
tenant_id = "tenant-123"
|
||||
|
||||
mock_paginated = Mock()
|
||||
mock_paginated.items = []
|
||||
mock_paginated.total = 0
|
||||
|
||||
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
|
||||
|
||||
mock_query = Mock()
|
||||
mock_segment_service_dependencies["select"].return_value = mock_query
|
||||
mock_query.where.return_value = mock_query
|
||||
mock_query.order_by.return_value = mock_query
|
||||
|
||||
# Act
|
||||
from services.dataset_service import SegmentService
|
||||
|
||||
items, total = SegmentService.get_segments(document_id=document_id, tenant_id=tenant_id)
|
||||
|
||||
# Assert
|
||||
assert items == []
|
||||
assert total == 0
|
||||
|
||||
def test_get_segments_combined_filters(self, mock_segment_service_dependencies):
|
||||
"""
|
||||
Test with multiple filters combined.
|
||||
|
||||
Verifies:
|
||||
- All filters work together correctly
|
||||
- Status list and keyword search both applied
|
||||
"""
|
||||
# Arrange
|
||||
document_id = "doc-123"
|
||||
tenant_id = "tenant-123"
|
||||
status_list = ["completed"]
|
||||
keyword = "important"
|
||||
page = 2
|
||||
limit = 10
|
||||
|
||||
segment = SegmentServiceTestDataFactory.create_segment_mock(
|
||||
segment_id="seg-1",
|
||||
status="completed",
|
||||
content="This is important information",
|
||||
)
|
||||
|
||||
mock_paginated = Mock()
|
||||
mock_paginated.items = [segment]
|
||||
mock_paginated.total = 1
|
||||
|
||||
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
|
||||
|
||||
mock_query = Mock()
|
||||
mock_segment_service_dependencies["select"].return_value = mock_query
|
||||
mock_query.where.return_value = mock_query
|
||||
mock_query.order_by.return_value = mock_query
|
||||
|
||||
# Act
|
||||
from services.dataset_service import SegmentService
|
||||
|
||||
items, total = SegmentService.get_segments(
|
||||
document_id=document_id,
|
||||
tenant_id=tenant_id,
|
||||
status_list=status_list,
|
||||
keyword=keyword,
|
||||
page=page,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert len(items) == 1
|
||||
assert total == 1
|
||||
# Verify filters: base + status + keyword
|
||||
assert mock_query.where.call_count == 3
|
||||
# Verify pagination parameters
|
||||
call_kwargs = mock_segment_service_dependencies["db"].paginate.call_args[1]
|
||||
assert call_kwargs["page"] == page
|
||||
assert call_kwargs["per_page"] == limit
|
||||
|
||||
def test_get_segments_with_none_status_list(self, mock_segment_service_dependencies):
|
||||
"""
|
||||
Test with None status list.
|
||||
|
||||
Verifies:
|
||||
- None status list is handled correctly
|
||||
- No status filter is applied
|
||||
"""
|
||||
# Arrange
|
||||
document_id = "doc-123"
|
||||
tenant_id = "tenant-123"
|
||||
|
||||
segment = SegmentServiceTestDataFactory.create_segment_mock(segment_id="seg-1")
|
||||
|
||||
mock_paginated = Mock()
|
||||
mock_paginated.items = [segment]
|
||||
mock_paginated.total = 1
|
||||
|
||||
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
|
||||
|
||||
mock_query = Mock()
|
||||
mock_segment_service_dependencies["select"].return_value = mock_query
|
||||
mock_query.where.return_value = mock_query
|
||||
mock_query.order_by.return_value = mock_query
|
||||
|
||||
# Act
|
||||
from services.dataset_service import SegmentService
|
||||
|
||||
items, total = SegmentService.get_segments(
|
||||
document_id=document_id,
|
||||
tenant_id=tenant_id,
|
||||
status_list=None,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert len(items) == 1
|
||||
assert total == 1
|
||||
# Should only be called once (base filters only, no status filter)
|
||||
assert mock_query.where.call_count == 1
|
||||
|
||||
def test_get_segments_pagination_max_per_page_limit(self, mock_segment_service_dependencies):
|
||||
"""
|
||||
Test that max_per_page is correctly set to 100.
|
||||
|
||||
Verifies:
|
||||
- max_per_page parameter is set to 100
|
||||
- This prevents excessive page sizes
|
||||
"""
|
||||
# Arrange
|
||||
document_id = "doc-123"
|
||||
tenant_id = "tenant-123"
|
||||
limit = 200 # Request more than max_per_page
|
||||
|
||||
mock_paginated = Mock()
|
||||
mock_paginated.items = []
|
||||
mock_paginated.total = 0
|
||||
|
||||
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
|
||||
|
||||
mock_query = Mock()
|
||||
mock_segment_service_dependencies["select"].return_value = mock_query
|
||||
mock_query.where.return_value = mock_query
|
||||
mock_query.order_by.return_value = mock_query
|
||||
|
||||
# Act
|
||||
from services.dataset_service import SegmentService
|
||||
|
||||
SegmentService.get_segments(
|
||||
document_id=document_id,
|
||||
tenant_id=tenant_id,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
# Assert
|
||||
call_kwargs = mock_segment_service_dependencies["db"].paginate.call_args[1]
|
||||
assert call_kwargs["max_per_page"] == 100
|
||||
@@ -3,6 +3,7 @@ Unit tests for workflow run restore functionality.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
|
||||
class TestWorkflowRunRestore:
|
||||
@@ -35,3 +36,30 @@ class TestWorkflowRunRestore:
|
||||
assert result["created_at"].year == 2024
|
||||
assert result["created_at"].month == 1
|
||||
assert result["name"] == "test"
|
||||
|
||||
def test_restore_table_records_returns_rowcount(self):
|
||||
"""Restore should return inserted rowcount."""
|
||||
from services.retention.workflow_run.restore_archived_workflow_run import WorkflowRunRestore
|
||||
|
||||
session = MagicMock()
|
||||
session.execute.return_value = MagicMock(rowcount=2)
|
||||
|
||||
restore = WorkflowRunRestore()
|
||||
records = [{"id": "p1", "workflow_run_id": "r1", "created_at": "2024-01-01T00:00:00"}]
|
||||
|
||||
restored = restore._restore_table_records(session, "workflow_pauses", records, schema_version="1.0")
|
||||
|
||||
assert restored == 2
|
||||
session.execute.assert_called_once()
|
||||
|
||||
def test_restore_table_records_unknown_table(self):
|
||||
"""Unknown table names should be ignored gracefully."""
|
||||
from services.retention.workflow_run.restore_archived_workflow_run import WorkflowRunRestore
|
||||
|
||||
session = MagicMock()
|
||||
|
||||
restore = WorkflowRunRestore()
|
||||
restored = restore._restore_table_records(session, "unknown_table", [{"id": "x1"}], schema_version="1.0")
|
||||
|
||||
assert restored == 0
|
||||
session.execute.assert_not_called()
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
from datetime import datetime
|
||||
from unittest.mock import MagicMock
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.workflow.enums import WorkflowNodeExecutionStatus
|
||||
from models.workflow import WorkflowNodeExecutionModel
|
||||
from repositories.sqlalchemy_api_workflow_node_execution_repository import (
|
||||
DifyAPISQLAlchemyWorkflowNodeExecutionRepository,
|
||||
)
|
||||
@@ -13,6 +18,109 @@ class TestSQLAlchemyWorkflowNodeExecutionServiceRepository:
|
||||
mock_session_maker = MagicMock()
|
||||
return DifyAPISQLAlchemyWorkflowNodeExecutionRepository(session_maker=mock_session_maker)
|
||||
|
||||
@pytest.fixture
|
||||
def mock_execution(self):
|
||||
execution = MagicMock(spec=WorkflowNodeExecutionModel)
|
||||
execution.id = str(uuid4())
|
||||
execution.tenant_id = "tenant-123"
|
||||
execution.app_id = "app-456"
|
||||
execution.workflow_id = "workflow-789"
|
||||
execution.workflow_run_id = "run-101"
|
||||
execution.node_id = "node-202"
|
||||
execution.index = 1
|
||||
execution.created_at = "2023-01-01T00:00:00Z"
|
||||
return execution
|
||||
|
||||
def test_get_node_last_execution_found(self, repository, mock_execution):
|
||||
"""Test getting the last execution for a node when it exists."""
|
||||
# Arrange
|
||||
mock_session = MagicMock(spec=Session)
|
||||
repository._session_maker.return_value.__enter__.return_value = mock_session
|
||||
mock_session.scalar.return_value = mock_execution
|
||||
|
||||
# Act
|
||||
result = repository.get_node_last_execution(
|
||||
tenant_id="tenant-123",
|
||||
app_id="app-456",
|
||||
workflow_id="workflow-789",
|
||||
node_id="node-202",
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result == mock_execution
|
||||
mock_session.scalar.assert_called_once()
|
||||
# Verify the query was constructed correctly
|
||||
call_args = mock_session.scalar.call_args[0][0]
|
||||
assert hasattr(call_args, "compile") # It's a SQLAlchemy statement
|
||||
|
||||
compiled = call_args.compile()
|
||||
assert WorkflowNodeExecutionStatus.PAUSED in compiled.params.values()
|
||||
|
||||
def test_get_node_last_execution_not_found(self, repository):
|
||||
"""Test getting the last execution for a node when it doesn't exist."""
|
||||
# Arrange
|
||||
mock_session = MagicMock(spec=Session)
|
||||
repository._session_maker.return_value.__enter__.return_value = mock_session
|
||||
mock_session.scalar.return_value = None
|
||||
|
||||
# Act
|
||||
result = repository.get_node_last_execution(
|
||||
tenant_id="tenant-123",
|
||||
app_id="app-456",
|
||||
workflow_id="workflow-789",
|
||||
node_id="node-202",
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result is None
|
||||
mock_session.scalar.assert_called_once()
|
||||
|
||||
def test_get_executions_by_workflow_run_empty(self, repository):
|
||||
"""Test getting executions for a workflow run when none exist."""
|
||||
# Arrange
|
||||
mock_session = MagicMock(spec=Session)
|
||||
repository._session_maker.return_value.__enter__.return_value = mock_session
|
||||
mock_session.execute.return_value.scalars.return_value.all.return_value = []
|
||||
|
||||
# Act
|
||||
result = repository.get_executions_by_workflow_run(
|
||||
tenant_id="tenant-123",
|
||||
app_id="app-456",
|
||||
workflow_run_id="run-101",
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result == []
|
||||
mock_session.execute.assert_called_once()
|
||||
|
||||
def test_get_execution_by_id_found(self, repository, mock_execution):
|
||||
"""Test getting execution by ID when it exists."""
|
||||
# Arrange
|
||||
mock_session = MagicMock(spec=Session)
|
||||
repository._session_maker.return_value.__enter__.return_value = mock_session
|
||||
mock_session.scalar.return_value = mock_execution
|
||||
|
||||
# Act
|
||||
result = repository.get_execution_by_id(mock_execution.id)
|
||||
|
||||
# Assert
|
||||
assert result == mock_execution
|
||||
mock_session.scalar.assert_called_once()
|
||||
|
||||
def test_get_execution_by_id_not_found(self, repository):
|
||||
"""Test getting execution by ID when it doesn't exist."""
|
||||
# Arrange
|
||||
mock_session = MagicMock(spec=Session)
|
||||
repository._session_maker.return_value.__enter__.return_value = mock_session
|
||||
mock_session.scalar.return_value = None
|
||||
|
||||
# Act
|
||||
result = repository.get_execution_by_id("non-existent-id")
|
||||
|
||||
# Assert
|
||||
assert result is None
|
||||
mock_session.scalar.assert_called_once()
|
||||
|
||||
def test_repository_implements_protocol(self, repository):
|
||||
"""Test that the repository implements the required protocol methods."""
|
||||
# Verify all protocol methods are implemented
|
||||
@@ -28,3 +136,135 @@ class TestSQLAlchemyWorkflowNodeExecutionServiceRepository:
|
||||
assert callable(repository.delete_executions_by_app)
|
||||
assert callable(repository.get_expired_executions_batch)
|
||||
assert callable(repository.delete_executions_by_ids)
|
||||
|
||||
def test_delete_expired_executions(self, repository):
|
||||
"""Test deleting expired executions."""
|
||||
# Arrange
|
||||
mock_session = MagicMock(spec=Session)
|
||||
repository._session_maker.return_value.__enter__.return_value = mock_session
|
||||
|
||||
# Mock the select query to return some IDs first time, then empty to stop loop
|
||||
execution_ids = ["id1", "id2"] # Less than batch_size to trigger break
|
||||
|
||||
# Mock execute method to handle both select and delete statements
|
||||
def mock_execute(stmt):
|
||||
mock_result = MagicMock()
|
||||
# For select statements, return execution IDs
|
||||
if hasattr(stmt, "limit"): # This is our select statement
|
||||
mock_result.scalars.return_value.all.return_value = execution_ids
|
||||
else: # This is our delete statement
|
||||
mock_result.rowcount = 2
|
||||
return mock_result
|
||||
|
||||
mock_session.execute.side_effect = mock_execute
|
||||
|
||||
before_date = datetime(2023, 1, 1)
|
||||
|
||||
# Act
|
||||
result = repository.delete_expired_executions(
|
||||
tenant_id="tenant-123",
|
||||
before_date=before_date,
|
||||
batch_size=1000,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result == 2
|
||||
assert mock_session.execute.call_count == 2 # One select call, one delete call
|
||||
mock_session.commit.assert_called_once()
|
||||
|
||||
def test_delete_executions_by_app(self, repository):
|
||||
"""Test deleting executions by app."""
|
||||
# Arrange
|
||||
mock_session = MagicMock(spec=Session)
|
||||
repository._session_maker.return_value.__enter__.return_value = mock_session
|
||||
|
||||
# Mock the select query to return some IDs first time, then empty to stop loop
|
||||
execution_ids = ["id1", "id2"]
|
||||
|
||||
# Mock execute method to handle both select and delete statements
|
||||
def mock_execute(stmt):
|
||||
mock_result = MagicMock()
|
||||
# For select statements, return execution IDs
|
||||
if hasattr(stmt, "limit"): # This is our select statement
|
||||
mock_result.scalars.return_value.all.return_value = execution_ids
|
||||
else: # This is our delete statement
|
||||
mock_result.rowcount = 2
|
||||
return mock_result
|
||||
|
||||
mock_session.execute.side_effect = mock_execute
|
||||
|
||||
# Act
|
||||
result = repository.delete_executions_by_app(
|
||||
tenant_id="tenant-123",
|
||||
app_id="app-456",
|
||||
batch_size=1000,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result == 2
|
||||
assert mock_session.execute.call_count == 2 # One select call, one delete call
|
||||
mock_session.commit.assert_called_once()
|
||||
|
||||
def test_get_expired_executions_batch(self, repository):
|
||||
"""Test getting expired executions batch for backup."""
|
||||
# Arrange
|
||||
mock_session = MagicMock(spec=Session)
|
||||
repository._session_maker.return_value.__enter__.return_value = mock_session
|
||||
|
||||
# Create mock execution objects
|
||||
mock_execution1 = MagicMock()
|
||||
mock_execution1.id = "exec-1"
|
||||
mock_execution2 = MagicMock()
|
||||
mock_execution2.id = "exec-2"
|
||||
|
||||
mock_session.execute.return_value.scalars.return_value.all.return_value = [mock_execution1, mock_execution2]
|
||||
|
||||
before_date = datetime(2023, 1, 1)
|
||||
|
||||
# Act
|
||||
result = repository.get_expired_executions_batch(
|
||||
tenant_id="tenant-123",
|
||||
before_date=before_date,
|
||||
batch_size=1000,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert len(result) == 2
|
||||
assert result[0].id == "exec-1"
|
||||
assert result[1].id == "exec-2"
|
||||
mock_session.execute.assert_called_once()
|
||||
|
||||
def test_delete_executions_by_ids(self, repository):
|
||||
"""Test deleting executions by IDs."""
|
||||
# Arrange
|
||||
mock_session = MagicMock(spec=Session)
|
||||
repository._session_maker.return_value.__enter__.return_value = mock_session
|
||||
|
||||
# Mock the delete query result
|
||||
mock_result = MagicMock()
|
||||
mock_result.rowcount = 3
|
||||
mock_session.execute.return_value = mock_result
|
||||
|
||||
execution_ids = ["id1", "id2", "id3"]
|
||||
|
||||
# Act
|
||||
result = repository.delete_executions_by_ids(execution_ids)
|
||||
|
||||
# Assert
|
||||
assert result == 3
|
||||
mock_session.execute.assert_called_once()
|
||||
mock_session.commit.assert_called_once()
|
||||
|
||||
def test_delete_executions_by_ids_empty_list(self, repository):
|
||||
"""Test deleting executions with empty ID list."""
|
||||
# Arrange
|
||||
mock_session = MagicMock(spec=Session)
|
||||
repository._session_maker.return_value.__enter__.return_value = mock_session
|
||||
|
||||
# Act
|
||||
result = repository.delete_executions_by_ids([])
|
||||
|
||||
# Assert
|
||||
assert result == 0
|
||||
mock_session.query.assert_not_called()
|
||||
mock_session.commit.assert_not_called()
|
||||
|
||||
6
api/uv.lock
generated
6
api/uv.lock
generated
@@ -5047,11 +5047,11 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "pypdf"
|
||||
version = "6.7.1"
|
||||
version = "6.7.2"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/ff/63/3437c4363483f2a04000a48f1cd48c40097f69d580363712fa8b0b4afe45/pypdf-6.7.1.tar.gz", hash = "sha256:6b7a63be5563a0a35d54c6d6b550d75c00b8ccf36384be96365355e296e6b3b0", size = 5302208, upload-time = "2026-02-17T17:00:48.88Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/fe/b2/335465d6cff28a772ace8a58beb168f125c2e1d8f7a31527da180f4d89a1/pypdf-6.7.2.tar.gz", hash = "sha256:82a1a48de500ceea59a52a7d979f5095927ef802e4e4fac25ab862a73468acbb", size = 5302986, upload-time = "2026-02-22T11:33:30.776Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/68/77/38bd7744bb9e06d465b0c23879e6d2c187d93a383f8fa485c862822bb8a3/pypdf-6.7.1-py3-none-any.whl", hash = "sha256:a02ccbb06463f7c334ce1612e91b3e68a8e827f3cee100b9941771e6066b094e", size = 331048, upload-time = "2026-02-17T17:00:46.991Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/df/df/38b06d6e74646a4281856920a11efb431559bdeb643bf1e192bff5e29082/pypdf-6.7.2-py3-none-any.whl", hash = "sha256:331b63cd66f63138f152a700565b3e0cebdf4ec8bec3b7594b2522418782f1f3", size = 331245, upload-time = "2026-02-22T11:33:29.204Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -184,5 +184,5 @@ export const changeNodesAndEdgesId = (nodes: Node[], edges: Edge[]) => {
|
||||
}
|
||||
|
||||
export const hasErrorHandleNode = (nodeType?: BlockEnum) => {
|
||||
return nodeType === BlockEnum.LLM || nodeType === BlockEnum.Tool || nodeType === BlockEnum.HttpRequest || nodeType === BlockEnum.Code || nodeType === BlockEnum.Agent
|
||||
return nodeType === BlockEnum.LLM || nodeType === BlockEnum.Tool || nodeType === BlockEnum.HttpRequest || nodeType === BlockEnum.Code
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user