Compare commits

..

3 Commits

Author SHA1 Message Date
木之本澪
5d927b413f test: migrate workflow_node_execution_service_repository SQL tests to testcontainers (#32591)
Some checks are pending
autofix.ci / autofix (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Waiting to run
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Blocked by required conditions
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Blocked by required conditions
Main CI Pipeline / Check Changed Files (push) Waiting to run
Main CI Pipeline / API Tests (push) Blocked by required conditions
Main CI Pipeline / Web Tests (push) Blocked by required conditions
Main CI Pipeline / Style Check (push) Waiting to run
Main CI Pipeline / VDB Tests (push) Blocked by required conditions
Main CI Pipeline / DB Migration Test (push) Blocked by required conditions
Co-authored-by: KinomotoMio <200703522+KinomotoMio@users.noreply.github.com>
2026-02-26 03:42:08 +09:00
木之本澪
39de931555 test: migrate restore_archived_workflow_run SQL tests to testcontainers (#32590)
Co-authored-by: KinomotoMio <200703522+KinomotoMio@users.noreply.github.com>
2026-02-26 03:24:58 +09:00
木之本澪
05c827606b test: migrate test_dataset_service_get_segments SQL tests to testcontainers (#32544)
Co-authored-by: KinomotoMio <200703522+KinomotoMio@users.noreply.github.com>
2026-02-26 02:12:41 +09:00
7 changed files with 990 additions and 743 deletions

View File

@@ -0,0 +1,498 @@
"""
Integration tests for SegmentService.get_segments method using a real database.
Tests the retrieval of document segments with pagination and filtering:
- Basic pagination (page, limit)
- Status filtering
- Keyword search
- Ordering by position and id (to avoid duplicate data)
"""
from uuid import uuid4
from extensions.ext_database import db
from models import Account, Tenant, TenantAccountJoin, TenantAccountRole
from models.dataset import Dataset, DatasetPermissionEnum, Document, DocumentSegment
from services.dataset_service import SegmentService
class SegmentServiceTestDataFactory:
"""
Factory class for creating test data for segment tests.
"""
@staticmethod
def create_account_with_tenant(
role: TenantAccountRole = TenantAccountRole.OWNER,
tenant: Tenant | None = None,
) -> tuple[Account, Tenant]:
"""Create a real account and tenant with specified role."""
account = Account(
email=f"{uuid4()}@example.com",
name=f"user-{uuid4()}",
interface_language="en-US",
status="active",
)
db.session.add(account)
db.session.commit()
if tenant is None:
tenant = Tenant(name=f"tenant-{uuid4()}", status="normal")
db.session.add(tenant)
db.session.commit()
join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=role,
current=True,
)
db.session.add(join)
db.session.commit()
account.current_tenant = tenant
return account, tenant
@staticmethod
def create_dataset(tenant_id: str, created_by: str) -> Dataset:
"""Create a real dataset."""
dataset = Dataset(
tenant_id=tenant_id,
name=f"Test Dataset {uuid4()}",
description="Test description",
data_source_type="upload_file",
indexing_technique="high_quality",
created_by=created_by,
permission=DatasetPermissionEnum.ONLY_ME,
provider="vendor",
retrieval_model={"top_k": 2},
)
db.session.add(dataset)
db.session.commit()
return dataset
@staticmethod
def create_document(tenant_id: str, dataset_id: str, created_by: str) -> Document:
"""Create a real document."""
document = Document(
tenant_id=tenant_id,
dataset_id=dataset_id,
position=1,
data_source_type="upload_file",
batch=f"batch-{uuid4()}",
name=f"test-doc-{uuid4()}.txt",
created_from="api",
created_by=created_by,
)
db.session.add(document)
db.session.commit()
return document
@staticmethod
def create_segment(
tenant_id: str,
dataset_id: str,
document_id: str,
created_by: str,
position: int = 1,
content: str = "Test content",
status: str = "completed",
word_count: int = 10,
tokens: int = 15,
) -> DocumentSegment:
"""Create a real document segment."""
segment = DocumentSegment(
tenant_id=tenant_id,
dataset_id=dataset_id,
document_id=document_id,
position=position,
content=content,
status=status,
word_count=word_count,
tokens=tokens,
created_by=created_by,
)
db.session.add(segment)
db.session.commit()
return segment
class TestSegmentServiceGetSegments:
"""
Comprehensive integration tests for SegmentService.get_segments method.
Tests cover:
- Basic pagination functionality
- Status list filtering
- Keyword search filtering
- Ordering (position + id for uniqueness)
- Empty results
- Combined filters
"""
def test_get_segments_basic_pagination(self, db_session_with_containers):
"""
Test basic pagination functionality.
Verifies:
- Query is built with document_id and tenant_id filters
- Pagination uses correct page and limit parameters
- Returns segments and total count
"""
# Arrange
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
segment1 = SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=1,
content="First segment",
)
segment2 = SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=2,
content="Second segment",
)
# Act
items, total = SegmentService.get_segments(document_id=document.id, tenant_id=tenant.id, page=1, limit=20)
# Assert
assert len(items) == 2
assert total == 2
assert items[0].id == segment1.id
assert items[1].id == segment2.id
def test_get_segments_with_status_filter(self, db_session_with_containers):
"""
Test filtering by status list.
Verifies:
- Status list filter is applied to query
- Only segments with matching status are returned
"""
# Arrange
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=1,
status="completed",
)
SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=2,
status="indexing",
)
SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=3,
status="waiting",
)
# Act
items, total = SegmentService.get_segments(
document_id=document.id, tenant_id=tenant.id, status_list=["completed", "indexing"]
)
# Assert
assert len(items) == 2
assert total == 2
statuses = {item.status for item in items}
assert statuses == {"completed", "indexing"}
def test_get_segments_with_empty_status_list(self, db_session_with_containers):
"""
Test with empty status list.
Verifies:
- Empty status list is handled correctly
- No status filter is applied to avoid WHERE false condition
"""
# Arrange
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=1,
status="completed",
)
SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=2,
status="indexing",
)
# Act
items, total = SegmentService.get_segments(document_id=document.id, tenant_id=tenant.id, status_list=[])
# Assert — empty status_list should return all segments (no status filter applied)
assert len(items) == 2
assert total == 2
def test_get_segments_with_keyword_search(self, db_session_with_containers):
"""
Test keyword search functionality.
Verifies:
- Keyword filter uses ilike for case-insensitive search
- Search pattern includes wildcards (%keyword%)
"""
# Arrange
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=1,
content="This contains search term in the middle",
)
SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=2,
content="This does not match",
)
# Act
items, total = SegmentService.get_segments(document_id=document.id, tenant_id=tenant.id, keyword="search term")
# Assert
assert len(items) == 1
assert total == 1
assert "search term" in items[0].content
def test_get_segments_ordering_by_position_and_id(self, db_session_with_containers):
"""
Test ordering by position and id.
Verifies:
- Results are ordered by position ASC
- Results are secondarily ordered by id ASC to ensure uniqueness
- This prevents duplicate data across pages when positions are not unique
"""
# Arrange
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
# Create segments with different positions
seg_pos2 = SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=2,
content="Position 2",
)
seg_pos1 = SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=1,
content="Position 1",
)
seg_pos3 = SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=3,
content="Position 3",
)
# Act
items, total = SegmentService.get_segments(document_id=document.id, tenant_id=tenant.id)
# Assert — segments should be ordered by position ASC
assert len(items) == 3
assert total == 3
assert items[0].id == seg_pos1.id
assert items[1].id == seg_pos2.id
assert items[2].id == seg_pos3.id
def test_get_segments_empty_results(self, db_session_with_containers):
"""
Test when no segments match the criteria.
Verifies:
- Empty list is returned for items
- Total count is 0
"""
# Arrange
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
non_existent_doc_id = str(uuid4())
# Act
items, total = SegmentService.get_segments(document_id=non_existent_doc_id, tenant_id=tenant.id)
# Assert
assert items == []
assert total == 0
def test_get_segments_combined_filters(self, db_session_with_containers):
"""
Test with multiple filters combined.
Verifies:
- All filters work together correctly
- Status list and keyword search both applied
"""
# Arrange
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
# Create segments with various statuses and content
SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=1,
status="completed",
content="This is important information",
)
SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=2,
status="indexing",
content="This is also important",
)
SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=3,
status="completed",
content="This is irrelevant",
)
# Act — filter by status=completed AND keyword=important
items, total = SegmentService.get_segments(
document_id=document.id,
tenant_id=tenant.id,
status_list=["completed"],
keyword="important",
page=1,
limit=10,
)
# Assert — only the first segment matches both filters
assert len(items) == 1
assert total == 1
assert items[0].status == "completed"
assert "important" in items[0].content
def test_get_segments_with_none_status_list(self, db_session_with_containers):
"""
Test with None status list.
Verifies:
- None status list is handled correctly
- No status filter is applied
"""
# Arrange
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=1,
status="completed",
)
SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=2,
status="waiting",
)
# Act
items, total = SegmentService.get_segments(
document_id=document.id,
tenant_id=tenant.id,
status_list=None,
)
# Assert — None status_list should return all segments
assert len(items) == 2
assert total == 2
def test_get_segments_pagination_max_per_page_limit(self, db_session_with_containers):
"""
Test that max_per_page is correctly set to 100.
Verifies:
- max_per_page parameter is set to 100
- This prevents excessive page sizes
"""
# Arrange
owner, tenant = SegmentServiceTestDataFactory.create_account_with_tenant()
dataset = SegmentServiceTestDataFactory.create_dataset(tenant.id, owner.id)
document = SegmentServiceTestDataFactory.create_document(tenant.id, dataset.id, owner.id)
# Create 105 segments to exceed max_per_page of 100
for i in range(105):
SegmentServiceTestDataFactory.create_segment(
tenant_id=tenant.id,
dataset_id=dataset.id,
document_id=document.id,
created_by=owner.id,
position=i + 1,
content=f"Segment {i + 1}",
)
# Act — request limit=200, but max_per_page=100 should cap it
items, total = SegmentService.get_segments(
document_id=document.id,
tenant_id=tenant.id,
limit=200,
)
# Assert — total is 105, but items per page capped at 100
assert total == 105
assert len(items) == 100

View File

@@ -0,0 +1,53 @@
"""
Testcontainers integration tests for workflow run restore functionality.
"""
from uuid import uuid4
from sqlalchemy import select
from models.workflow import WorkflowPause
from services.retention.workflow_run.restore_archived_workflow_run import WorkflowRunRestore
class TestWorkflowRunRestore:
"""Tests for the WorkflowRunRestore class."""
def test_restore_table_records_returns_rowcount(self, db_session_with_containers):
"""Restore should return inserted rowcount."""
restore = WorkflowRunRestore()
record_id = str(uuid4())
records = [
{
"id": record_id,
"workflow_id": str(uuid4()),
"workflow_run_id": str(uuid4()),
"state_object_key": f"workflow-state-{uuid4()}.json",
"created_at": "2024-01-01T00:00:00",
"updated_at": "2024-01-01T00:00:00",
}
]
restored = restore._restore_table_records(
db_session_with_containers,
"workflow_pauses",
records,
schema_version="1.0",
)
assert restored == 1
restored_pause = db_session_with_containers.scalar(select(WorkflowPause).where(WorkflowPause.id == record_id))
assert restored_pause is not None
def test_restore_table_records_unknown_table(self, db_session_with_containers):
"""Unknown table names should be ignored gracefully."""
restore = WorkflowRunRestore()
restored = restore._restore_table_records(
db_session_with_containers,
"unknown_table",
[{"id": str(uuid4())}],
schema_version="1.0",
)
assert restored == 0

View File

@@ -0,0 +1,436 @@
from datetime import datetime, timedelta
from uuid import uuid4
from sqlalchemy import Engine, select
from sqlalchemy.orm import Session, sessionmaker
from core.workflow.enums import WorkflowNodeExecutionStatus
from libs.datetime_utils import naive_utc_now
from models.enums import CreatorUserRole
from models.workflow import WorkflowNodeExecutionModel
from repositories.sqlalchemy_api_workflow_node_execution_repository import (
DifyAPISQLAlchemyWorkflowNodeExecutionRepository,
)
class TestSQLAlchemyWorkflowNodeExecutionServiceRepository:
@staticmethod
def _create_repository(db_session_with_containers: Session) -> DifyAPISQLAlchemyWorkflowNodeExecutionRepository:
engine = db_session_with_containers.get_bind()
assert isinstance(engine, Engine)
return DifyAPISQLAlchemyWorkflowNodeExecutionRepository(
session_maker=sessionmaker(bind=engine, expire_on_commit=False)
)
@staticmethod
def _create_execution(
db_session_with_containers: Session,
*,
tenant_id: str,
app_id: str,
workflow_id: str,
workflow_run_id: str,
node_id: str,
status: WorkflowNodeExecutionStatus,
index: int,
created_at: datetime,
) -> WorkflowNodeExecutionModel:
execution = WorkflowNodeExecutionModel(
id=str(uuid4()),
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
triggered_from="workflow-run",
workflow_run_id=workflow_run_id,
index=index,
predecessor_node_id=None,
node_execution_id=None,
node_id=node_id,
node_type="llm",
title=f"Node {index}",
inputs="{}",
process_data="{}",
outputs="{}",
status=status,
error=None,
elapsed_time=0.0,
execution_metadata="{}",
created_at=created_at,
created_by_role=CreatorUserRole.ACCOUNT,
created_by=str(uuid4()),
finished_at=None,
)
db_session_with_containers.add(execution)
db_session_with_containers.commit()
return execution
def test_get_node_last_execution_found(self, db_session_with_containers):
"""Test getting the last execution for a node when it exists."""
# Arrange
tenant_id = str(uuid4())
app_id = str(uuid4())
workflow_id = str(uuid4())
node_id = "node-202"
workflow_run_id = str(uuid4())
now = naive_utc_now()
self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id=node_id,
status=WorkflowNodeExecutionStatus.PAUSED,
index=1,
created_at=now - timedelta(minutes=2),
)
expected = self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id=node_id,
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=2,
created_at=now - timedelta(minutes=1),
)
repository = self._create_repository(db_session_with_containers)
# Act
result = repository.get_node_last_execution(
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
node_id=node_id,
)
# Assert
assert result is not None
assert result.id == expected.id
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
def test_get_node_last_execution_not_found(self, db_session_with_containers):
"""Test getting the last execution for a node when it doesn't exist."""
# Arrange
tenant_id = str(uuid4())
app_id = str(uuid4())
workflow_id = str(uuid4())
repository = self._create_repository(db_session_with_containers)
# Act
result = repository.get_node_last_execution(
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
node_id="node-202",
)
# Assert
assert result is None
def test_get_executions_by_workflow_run_empty(self, db_session_with_containers):
"""Test getting executions for a workflow run when none exist."""
# Arrange
tenant_id = str(uuid4())
app_id = str(uuid4())
workflow_run_id = str(uuid4())
repository = self._create_repository(db_session_with_containers)
# Act
result = repository.get_executions_by_workflow_run(
tenant_id=tenant_id,
app_id=app_id,
workflow_run_id=workflow_run_id,
)
# Assert
assert result == []
def test_get_execution_by_id_found(self, db_session_with_containers):
"""Test getting execution by ID when it exists."""
# Arrange
execution = self._create_execution(
db_session_with_containers,
tenant_id=str(uuid4()),
app_id=str(uuid4()),
workflow_id=str(uuid4()),
workflow_run_id=str(uuid4()),
node_id="node-202",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=1,
created_at=naive_utc_now(),
)
repository = self._create_repository(db_session_with_containers)
# Act
result = repository.get_execution_by_id(execution.id)
# Assert
assert result is not None
assert result.id == execution.id
def test_get_execution_by_id_not_found(self, db_session_with_containers):
"""Test getting execution by ID when it doesn't exist."""
# Arrange
repository = self._create_repository(db_session_with_containers)
missing_execution_id = str(uuid4())
# Act
result = repository.get_execution_by_id(missing_execution_id)
# Assert
assert result is None
def test_delete_expired_executions(self, db_session_with_containers):
"""Test deleting expired executions."""
# Arrange
tenant_id = str(uuid4())
app_id = str(uuid4())
workflow_id = str(uuid4())
workflow_run_id = str(uuid4())
now = naive_utc_now()
before_date = now - timedelta(days=1)
old_execution_1 = self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id="node-1",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=1,
created_at=now - timedelta(days=3),
)
old_execution_2 = self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id="node-2",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=2,
created_at=now - timedelta(days=2),
)
kept_execution = self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id="node-3",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=3,
created_at=now,
)
old_execution_1_id = old_execution_1.id
old_execution_2_id = old_execution_2.id
kept_execution_id = kept_execution.id
repository = self._create_repository(db_session_with_containers)
# Act
result = repository.delete_expired_executions(
tenant_id=tenant_id,
before_date=before_date,
batch_size=1000,
)
# Assert
assert result == 2
remaining_ids = {
execution.id
for execution in db_session_with_containers.scalars(
select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.tenant_id == tenant_id)
).all()
}
assert old_execution_1_id not in remaining_ids
assert old_execution_2_id not in remaining_ids
assert kept_execution_id in remaining_ids
def test_delete_executions_by_app(self, db_session_with_containers):
"""Test deleting executions by app."""
# Arrange
tenant_id = str(uuid4())
target_app_id = str(uuid4())
workflow_id = str(uuid4())
workflow_run_id = str(uuid4())
created_at = naive_utc_now()
deleted_1 = self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=target_app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id="node-1",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=1,
created_at=created_at,
)
deleted_2 = self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=target_app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id="node-2",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=2,
created_at=created_at,
)
kept = self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=str(uuid4()),
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id="node-3",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=3,
created_at=created_at,
)
deleted_1_id = deleted_1.id
deleted_2_id = deleted_2.id
kept_id = kept.id
repository = self._create_repository(db_session_with_containers)
# Act
result = repository.delete_executions_by_app(
tenant_id=tenant_id,
app_id=target_app_id,
batch_size=1000,
)
# Assert
assert result == 2
remaining_ids = {
execution.id
for execution in db_session_with_containers.scalars(
select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.tenant_id == tenant_id)
).all()
}
assert deleted_1_id not in remaining_ids
assert deleted_2_id not in remaining_ids
assert kept_id in remaining_ids
def test_get_expired_executions_batch(self, db_session_with_containers):
"""Test getting expired executions batch for backup."""
# Arrange
tenant_id = str(uuid4())
app_id = str(uuid4())
workflow_id = str(uuid4())
workflow_run_id = str(uuid4())
now = naive_utc_now()
before_date = now - timedelta(days=1)
old_execution_1 = self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id="node-1",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=1,
created_at=now - timedelta(days=3),
)
old_execution_2 = self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id="node-2",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=2,
created_at=now - timedelta(days=2),
)
self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id="node-3",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=3,
created_at=now,
)
repository = self._create_repository(db_session_with_containers)
# Act
result = repository.get_expired_executions_batch(
tenant_id=tenant_id,
before_date=before_date,
batch_size=1000,
)
# Assert
assert len(result) == 2
result_ids = {execution.id for execution in result}
assert old_execution_1.id in result_ids
assert old_execution_2.id in result_ids
def test_delete_executions_by_ids(self, db_session_with_containers):
"""Test deleting executions by IDs."""
# Arrange
tenant_id = str(uuid4())
app_id = str(uuid4())
workflow_id = str(uuid4())
workflow_run_id = str(uuid4())
created_at = naive_utc_now()
execution_1 = self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id="node-1",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=1,
created_at=created_at,
)
execution_2 = self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id="node-2",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=2,
created_at=created_at,
)
execution_3 = self._create_execution(
db_session_with_containers,
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id="node-3",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
index=3,
created_at=created_at,
)
repository = self._create_repository(db_session_with_containers)
execution_ids = [execution_1.id, execution_2.id, execution_3.id]
# Act
result = repository.delete_executions_by_ids(execution_ids)
# Assert
assert result == 3
remaining = db_session_with_containers.scalars(
select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(execution_ids))
).all()
assert remaining == []
def test_delete_executions_by_ids_empty_list(self, db_session_with_containers):
"""Test deleting executions with empty ID list."""
# Arrange
repository = self._create_repository(db_session_with_containers)
# Act
result = repository.delete_executions_by_ids([])
# Assert
assert result == 0

View File

@@ -1,472 +0,0 @@
"""
Unit tests for SegmentService.get_segments method.
Tests the retrieval of document segments with pagination and filtering:
- Basic pagination (page, limit)
- Status filtering
- Keyword search
- Ordering by position and id (to avoid duplicate data)
"""
from unittest.mock import Mock, create_autospec, patch
import pytest
from models.dataset import DocumentSegment
class SegmentServiceTestDataFactory:
"""
Factory class for creating test data and mock objects for segment tests.
"""
@staticmethod
def create_segment_mock(
segment_id: str = "segment-123",
document_id: str = "doc-123",
tenant_id: str = "tenant-123",
dataset_id: str = "dataset-123",
position: int = 1,
content: str = "Test content",
status: str = "completed",
**kwargs,
) -> Mock:
"""
Create a mock document segment.
Args:
segment_id: Unique identifier for the segment
document_id: Parent document ID
tenant_id: Tenant ID the segment belongs to
dataset_id: Parent dataset ID
position: Position within the document
content: Segment text content
status: Indexing status
**kwargs: Additional attributes
Returns:
Mock: DocumentSegment mock object
"""
segment = create_autospec(DocumentSegment, instance=True)
segment.id = segment_id
segment.document_id = document_id
segment.tenant_id = tenant_id
segment.dataset_id = dataset_id
segment.position = position
segment.content = content
segment.status = status
for key, value in kwargs.items():
setattr(segment, key, value)
return segment
class TestSegmentServiceGetSegments:
"""
Comprehensive unit tests for SegmentService.get_segments method.
Tests cover:
- Basic pagination functionality
- Status list filtering
- Keyword search filtering
- Ordering (position + id for uniqueness)
- Empty results
- Combined filters
"""
@pytest.fixture
def mock_segment_service_dependencies(self):
"""
Common mock setup for segment service dependencies.
Patches:
- db: Database operations and pagination
- select: SQLAlchemy query builder
"""
with (
patch("services.dataset_service.db") as mock_db,
patch("services.dataset_service.select") as mock_select,
):
yield {
"db": mock_db,
"select": mock_select,
}
def test_get_segments_basic_pagination(self, mock_segment_service_dependencies):
"""
Test basic pagination functionality.
Verifies:
- Query is built with document_id and tenant_id filters
- Pagination uses correct page and limit parameters
- Returns segments and total count
"""
# Arrange
document_id = "doc-123"
tenant_id = "tenant-123"
page = 1
limit = 20
# Create mock segments
segment1 = SegmentServiceTestDataFactory.create_segment_mock(
segment_id="seg-1", position=1, content="First segment"
)
segment2 = SegmentServiceTestDataFactory.create_segment_mock(
segment_id="seg-2", position=2, content="Second segment"
)
# Mock pagination result
mock_paginated = Mock()
mock_paginated.items = [segment1, segment2]
mock_paginated.total = 2
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
# Mock select builder
mock_query = Mock()
mock_segment_service_dependencies["select"].return_value = mock_query
mock_query.where.return_value = mock_query
mock_query.order_by.return_value = mock_query
# Act
from services.dataset_service import SegmentService
items, total = SegmentService.get_segments(document_id=document_id, tenant_id=tenant_id, page=page, limit=limit)
# Assert
assert len(items) == 2
assert total == 2
assert items[0].id == "seg-1"
assert items[1].id == "seg-2"
mock_segment_service_dependencies["db"].paginate.assert_called_once()
call_kwargs = mock_segment_service_dependencies["db"].paginate.call_args[1]
assert call_kwargs["page"] == page
assert call_kwargs["per_page"] == limit
assert call_kwargs["max_per_page"] == 100
assert call_kwargs["error_out"] is False
def test_get_segments_with_status_filter(self, mock_segment_service_dependencies):
"""
Test filtering by status list.
Verifies:
- Status list filter is applied to query
- Only segments with matching status are returned
"""
# Arrange
document_id = "doc-123"
tenant_id = "tenant-123"
status_list = ["completed", "indexing"]
segment1 = SegmentServiceTestDataFactory.create_segment_mock(segment_id="seg-1", status="completed")
segment2 = SegmentServiceTestDataFactory.create_segment_mock(segment_id="seg-2", status="indexing")
mock_paginated = Mock()
mock_paginated.items = [segment1, segment2]
mock_paginated.total = 2
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
mock_query = Mock()
mock_segment_service_dependencies["select"].return_value = mock_query
mock_query.where.return_value = mock_query
mock_query.order_by.return_value = mock_query
# Act
from services.dataset_service import SegmentService
items, total = SegmentService.get_segments(
document_id=document_id, tenant_id=tenant_id, status_list=status_list
)
# Assert
assert len(items) == 2
assert total == 2
# Verify where was called multiple times (base filters + status filter)
assert mock_query.where.call_count >= 2
def test_get_segments_with_empty_status_list(self, mock_segment_service_dependencies):
"""
Test with empty status list.
Verifies:
- Empty status list is handled correctly
- No status filter is applied to avoid WHERE false condition
"""
# Arrange
document_id = "doc-123"
tenant_id = "tenant-123"
status_list = []
segment = SegmentServiceTestDataFactory.create_segment_mock(segment_id="seg-1")
mock_paginated = Mock()
mock_paginated.items = [segment]
mock_paginated.total = 1
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
mock_query = Mock()
mock_segment_service_dependencies["select"].return_value = mock_query
mock_query.where.return_value = mock_query
mock_query.order_by.return_value = mock_query
# Act
from services.dataset_service import SegmentService
items, total = SegmentService.get_segments(
document_id=document_id, tenant_id=tenant_id, status_list=status_list
)
# Assert
assert len(items) == 1
assert total == 1
# Should only be called once (base filters, no status filter)
assert mock_query.where.call_count == 1
def test_get_segments_with_keyword_search(self, mock_segment_service_dependencies):
"""
Test keyword search functionality.
Verifies:
- Keyword filter uses ilike for case-insensitive search
- Search pattern includes wildcards (%keyword%)
"""
# Arrange
document_id = "doc-123"
tenant_id = "tenant-123"
keyword = "search term"
segment = SegmentServiceTestDataFactory.create_segment_mock(
segment_id="seg-1", content="This contains search term"
)
mock_paginated = Mock()
mock_paginated.items = [segment]
mock_paginated.total = 1
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
mock_query = Mock()
mock_segment_service_dependencies["select"].return_value = mock_query
mock_query.where.return_value = mock_query
mock_query.order_by.return_value = mock_query
# Act
from services.dataset_service import SegmentService
items, total = SegmentService.get_segments(document_id=document_id, tenant_id=tenant_id, keyword=keyword)
# Assert
assert len(items) == 1
assert total == 1
# Verify where was called for base filters + keyword filter
assert mock_query.where.call_count == 2
def test_get_segments_ordering_by_position_and_id(self, mock_segment_service_dependencies):
"""
Test ordering by position and id.
Verifies:
- Results are ordered by position ASC
- Results are secondarily ordered by id ASC to ensure uniqueness
- This prevents duplicate data across pages when positions are not unique
"""
# Arrange
document_id = "doc-123"
tenant_id = "tenant-123"
# Create segments with same position but different ids
segment1 = SegmentServiceTestDataFactory.create_segment_mock(
segment_id="seg-1", position=1, content="Content 1"
)
segment2 = SegmentServiceTestDataFactory.create_segment_mock(
segment_id="seg-2", position=1, content="Content 2"
)
segment3 = SegmentServiceTestDataFactory.create_segment_mock(
segment_id="seg-3", position=2, content="Content 3"
)
mock_paginated = Mock()
mock_paginated.items = [segment1, segment2, segment3]
mock_paginated.total = 3
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
mock_query = Mock()
mock_segment_service_dependencies["select"].return_value = mock_query
mock_query.where.return_value = mock_query
mock_query.order_by.return_value = mock_query
# Act
from services.dataset_service import SegmentService
items, total = SegmentService.get_segments(document_id=document_id, tenant_id=tenant_id)
# Assert
assert len(items) == 3
assert total == 3
mock_query.order_by.assert_called_once()
def test_get_segments_empty_results(self, mock_segment_service_dependencies):
"""
Test when no segments match the criteria.
Verifies:
- Empty list is returned for items
- Total count is 0
"""
# Arrange
document_id = "non-existent-doc"
tenant_id = "tenant-123"
mock_paginated = Mock()
mock_paginated.items = []
mock_paginated.total = 0
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
mock_query = Mock()
mock_segment_service_dependencies["select"].return_value = mock_query
mock_query.where.return_value = mock_query
mock_query.order_by.return_value = mock_query
# Act
from services.dataset_service import SegmentService
items, total = SegmentService.get_segments(document_id=document_id, tenant_id=tenant_id)
# Assert
assert items == []
assert total == 0
def test_get_segments_combined_filters(self, mock_segment_service_dependencies):
"""
Test with multiple filters combined.
Verifies:
- All filters work together correctly
- Status list and keyword search both applied
"""
# Arrange
document_id = "doc-123"
tenant_id = "tenant-123"
status_list = ["completed"]
keyword = "important"
page = 2
limit = 10
segment = SegmentServiceTestDataFactory.create_segment_mock(
segment_id="seg-1",
status="completed",
content="This is important information",
)
mock_paginated = Mock()
mock_paginated.items = [segment]
mock_paginated.total = 1
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
mock_query = Mock()
mock_segment_service_dependencies["select"].return_value = mock_query
mock_query.where.return_value = mock_query
mock_query.order_by.return_value = mock_query
# Act
from services.dataset_service import SegmentService
items, total = SegmentService.get_segments(
document_id=document_id,
tenant_id=tenant_id,
status_list=status_list,
keyword=keyword,
page=page,
limit=limit,
)
# Assert
assert len(items) == 1
assert total == 1
# Verify filters: base + status + keyword
assert mock_query.where.call_count == 3
# Verify pagination parameters
call_kwargs = mock_segment_service_dependencies["db"].paginate.call_args[1]
assert call_kwargs["page"] == page
assert call_kwargs["per_page"] == limit
def test_get_segments_with_none_status_list(self, mock_segment_service_dependencies):
"""
Test with None status list.
Verifies:
- None status list is handled correctly
- No status filter is applied
"""
# Arrange
document_id = "doc-123"
tenant_id = "tenant-123"
segment = SegmentServiceTestDataFactory.create_segment_mock(segment_id="seg-1")
mock_paginated = Mock()
mock_paginated.items = [segment]
mock_paginated.total = 1
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
mock_query = Mock()
mock_segment_service_dependencies["select"].return_value = mock_query
mock_query.where.return_value = mock_query
mock_query.order_by.return_value = mock_query
# Act
from services.dataset_service import SegmentService
items, total = SegmentService.get_segments(
document_id=document_id,
tenant_id=tenant_id,
status_list=None,
)
# Assert
assert len(items) == 1
assert total == 1
# Should only be called once (base filters only, no status filter)
assert mock_query.where.call_count == 1
def test_get_segments_pagination_max_per_page_limit(self, mock_segment_service_dependencies):
"""
Test that max_per_page is correctly set to 100.
Verifies:
- max_per_page parameter is set to 100
- This prevents excessive page sizes
"""
# Arrange
document_id = "doc-123"
tenant_id = "tenant-123"
limit = 200 # Request more than max_per_page
mock_paginated = Mock()
mock_paginated.items = []
mock_paginated.total = 0
mock_segment_service_dependencies["db"].paginate.return_value = mock_paginated
mock_query = Mock()
mock_segment_service_dependencies["select"].return_value = mock_query
mock_query.where.return_value = mock_query
mock_query.order_by.return_value = mock_query
# Act
from services.dataset_service import SegmentService
SegmentService.get_segments(
document_id=document_id,
tenant_id=tenant_id,
limit=limit,
)
# Assert
call_kwargs = mock_segment_service_dependencies["db"].paginate.call_args[1]
assert call_kwargs["max_per_page"] == 100

View File

@@ -3,7 +3,6 @@ Unit tests for workflow run restore functionality.
"""
from datetime import datetime
from unittest.mock import MagicMock
class TestWorkflowRunRestore:
@@ -36,30 +35,3 @@ class TestWorkflowRunRestore:
assert result["created_at"].year == 2024
assert result["created_at"].month == 1
assert result["name"] == "test"
def test_restore_table_records_returns_rowcount(self):
"""Restore should return inserted rowcount."""
from services.retention.workflow_run.restore_archived_workflow_run import WorkflowRunRestore
session = MagicMock()
session.execute.return_value = MagicMock(rowcount=2)
restore = WorkflowRunRestore()
records = [{"id": "p1", "workflow_run_id": "r1", "created_at": "2024-01-01T00:00:00"}]
restored = restore._restore_table_records(session, "workflow_pauses", records, schema_version="1.0")
assert restored == 2
session.execute.assert_called_once()
def test_restore_table_records_unknown_table(self):
"""Unknown table names should be ignored gracefully."""
from services.retention.workflow_run.restore_archived_workflow_run import WorkflowRunRestore
session = MagicMock()
restore = WorkflowRunRestore()
restored = restore._restore_table_records(session, "unknown_table", [{"id": "x1"}], schema_version="1.0")
assert restored == 0
session.execute.assert_not_called()

View File

@@ -1,12 +1,7 @@
from datetime import datetime
from unittest.mock import MagicMock
from uuid import uuid4
import pytest
from sqlalchemy.orm import Session
from core.workflow.enums import WorkflowNodeExecutionStatus
from models.workflow import WorkflowNodeExecutionModel
from repositories.sqlalchemy_api_workflow_node_execution_repository import (
DifyAPISQLAlchemyWorkflowNodeExecutionRepository,
)
@@ -18,109 +13,6 @@ class TestSQLAlchemyWorkflowNodeExecutionServiceRepository:
mock_session_maker = MagicMock()
return DifyAPISQLAlchemyWorkflowNodeExecutionRepository(session_maker=mock_session_maker)
@pytest.fixture
def mock_execution(self):
execution = MagicMock(spec=WorkflowNodeExecutionModel)
execution.id = str(uuid4())
execution.tenant_id = "tenant-123"
execution.app_id = "app-456"
execution.workflow_id = "workflow-789"
execution.workflow_run_id = "run-101"
execution.node_id = "node-202"
execution.index = 1
execution.created_at = "2023-01-01T00:00:00Z"
return execution
def test_get_node_last_execution_found(self, repository, mock_execution):
"""Test getting the last execution for a node when it exists."""
# Arrange
mock_session = MagicMock(spec=Session)
repository._session_maker.return_value.__enter__.return_value = mock_session
mock_session.scalar.return_value = mock_execution
# Act
result = repository.get_node_last_execution(
tenant_id="tenant-123",
app_id="app-456",
workflow_id="workflow-789",
node_id="node-202",
)
# Assert
assert result == mock_execution
mock_session.scalar.assert_called_once()
# Verify the query was constructed correctly
call_args = mock_session.scalar.call_args[0][0]
assert hasattr(call_args, "compile") # It's a SQLAlchemy statement
compiled = call_args.compile()
assert WorkflowNodeExecutionStatus.PAUSED in compiled.params.values()
def test_get_node_last_execution_not_found(self, repository):
"""Test getting the last execution for a node when it doesn't exist."""
# Arrange
mock_session = MagicMock(spec=Session)
repository._session_maker.return_value.__enter__.return_value = mock_session
mock_session.scalar.return_value = None
# Act
result = repository.get_node_last_execution(
tenant_id="tenant-123",
app_id="app-456",
workflow_id="workflow-789",
node_id="node-202",
)
# Assert
assert result is None
mock_session.scalar.assert_called_once()
def test_get_executions_by_workflow_run_empty(self, repository):
"""Test getting executions for a workflow run when none exist."""
# Arrange
mock_session = MagicMock(spec=Session)
repository._session_maker.return_value.__enter__.return_value = mock_session
mock_session.execute.return_value.scalars.return_value.all.return_value = []
# Act
result = repository.get_executions_by_workflow_run(
tenant_id="tenant-123",
app_id="app-456",
workflow_run_id="run-101",
)
# Assert
assert result == []
mock_session.execute.assert_called_once()
def test_get_execution_by_id_found(self, repository, mock_execution):
"""Test getting execution by ID when it exists."""
# Arrange
mock_session = MagicMock(spec=Session)
repository._session_maker.return_value.__enter__.return_value = mock_session
mock_session.scalar.return_value = mock_execution
# Act
result = repository.get_execution_by_id(mock_execution.id)
# Assert
assert result == mock_execution
mock_session.scalar.assert_called_once()
def test_get_execution_by_id_not_found(self, repository):
"""Test getting execution by ID when it doesn't exist."""
# Arrange
mock_session = MagicMock(spec=Session)
repository._session_maker.return_value.__enter__.return_value = mock_session
mock_session.scalar.return_value = None
# Act
result = repository.get_execution_by_id("non-existent-id")
# Assert
assert result is None
mock_session.scalar.assert_called_once()
def test_repository_implements_protocol(self, repository):
"""Test that the repository implements the required protocol methods."""
# Verify all protocol methods are implemented
@@ -136,135 +28,3 @@ class TestSQLAlchemyWorkflowNodeExecutionServiceRepository:
assert callable(repository.delete_executions_by_app)
assert callable(repository.get_expired_executions_batch)
assert callable(repository.delete_executions_by_ids)
def test_delete_expired_executions(self, repository):
"""Test deleting expired executions."""
# Arrange
mock_session = MagicMock(spec=Session)
repository._session_maker.return_value.__enter__.return_value = mock_session
# Mock the select query to return some IDs first time, then empty to stop loop
execution_ids = ["id1", "id2"] # Less than batch_size to trigger break
# Mock execute method to handle both select and delete statements
def mock_execute(stmt):
mock_result = MagicMock()
# For select statements, return execution IDs
if hasattr(stmt, "limit"): # This is our select statement
mock_result.scalars.return_value.all.return_value = execution_ids
else: # This is our delete statement
mock_result.rowcount = 2
return mock_result
mock_session.execute.side_effect = mock_execute
before_date = datetime(2023, 1, 1)
# Act
result = repository.delete_expired_executions(
tenant_id="tenant-123",
before_date=before_date,
batch_size=1000,
)
# Assert
assert result == 2
assert mock_session.execute.call_count == 2 # One select call, one delete call
mock_session.commit.assert_called_once()
def test_delete_executions_by_app(self, repository):
"""Test deleting executions by app."""
# Arrange
mock_session = MagicMock(spec=Session)
repository._session_maker.return_value.__enter__.return_value = mock_session
# Mock the select query to return some IDs first time, then empty to stop loop
execution_ids = ["id1", "id2"]
# Mock execute method to handle both select and delete statements
def mock_execute(stmt):
mock_result = MagicMock()
# For select statements, return execution IDs
if hasattr(stmt, "limit"): # This is our select statement
mock_result.scalars.return_value.all.return_value = execution_ids
else: # This is our delete statement
mock_result.rowcount = 2
return mock_result
mock_session.execute.side_effect = mock_execute
# Act
result = repository.delete_executions_by_app(
tenant_id="tenant-123",
app_id="app-456",
batch_size=1000,
)
# Assert
assert result == 2
assert mock_session.execute.call_count == 2 # One select call, one delete call
mock_session.commit.assert_called_once()
def test_get_expired_executions_batch(self, repository):
"""Test getting expired executions batch for backup."""
# Arrange
mock_session = MagicMock(spec=Session)
repository._session_maker.return_value.__enter__.return_value = mock_session
# Create mock execution objects
mock_execution1 = MagicMock()
mock_execution1.id = "exec-1"
mock_execution2 = MagicMock()
mock_execution2.id = "exec-2"
mock_session.execute.return_value.scalars.return_value.all.return_value = [mock_execution1, mock_execution2]
before_date = datetime(2023, 1, 1)
# Act
result = repository.get_expired_executions_batch(
tenant_id="tenant-123",
before_date=before_date,
batch_size=1000,
)
# Assert
assert len(result) == 2
assert result[0].id == "exec-1"
assert result[1].id == "exec-2"
mock_session.execute.assert_called_once()
def test_delete_executions_by_ids(self, repository):
"""Test deleting executions by IDs."""
# Arrange
mock_session = MagicMock(spec=Session)
repository._session_maker.return_value.__enter__.return_value = mock_session
# Mock the delete query result
mock_result = MagicMock()
mock_result.rowcount = 3
mock_session.execute.return_value = mock_result
execution_ids = ["id1", "id2", "id3"]
# Act
result = repository.delete_executions_by_ids(execution_ids)
# Assert
assert result == 3
mock_session.execute.assert_called_once()
mock_session.commit.assert_called_once()
def test_delete_executions_by_ids_empty_list(self, repository):
"""Test deleting executions with empty ID list."""
# Arrange
mock_session = MagicMock(spec=Session)
repository._session_maker.return_value.__enter__.return_value = mock_session
# Act
result = repository.delete_executions_by_ids([])
# Assert
assert result == 0
mock_session.query.assert_not_called()
mock_session.commit.assert_not_called()

6
api/uv.lock generated
View File

@@ -5047,11 +5047,11 @@ wheels = [
[[package]]
name = "pypdf"
version = "6.7.2"
version = "6.7.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/fe/b2/335465d6cff28a772ace8a58beb168f125c2e1d8f7a31527da180f4d89a1/pypdf-6.7.2.tar.gz", hash = "sha256:82a1a48de500ceea59a52a7d979f5095927ef802e4e4fac25ab862a73468acbb", size = 5302986, upload-time = "2026-02-22T11:33:30.776Z" }
sdist = { url = "https://files.pythonhosted.org/packages/ff/63/3437c4363483f2a04000a48f1cd48c40097f69d580363712fa8b0b4afe45/pypdf-6.7.1.tar.gz", hash = "sha256:6b7a63be5563a0a35d54c6d6b550d75c00b8ccf36384be96365355e296e6b3b0", size = 5302208, upload-time = "2026-02-17T17:00:48.88Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/df/df/38b06d6e74646a4281856920a11efb431559bdeb643bf1e192bff5e29082/pypdf-6.7.2-py3-none-any.whl", hash = "sha256:331b63cd66f63138f152a700565b3e0cebdf4ec8bec3b7594b2522418782f1f3", size = 331245, upload-time = "2026-02-22T11:33:29.204Z" },
{ url = "https://files.pythonhosted.org/packages/68/77/38bd7744bb9e06d465b0c23879e6d2c187d93a383f8fa485c862822bb8a3/pypdf-6.7.1-py3-none-any.whl", hash = "sha256:a02ccbb06463f7c334ce1612e91b3e68a8e827f3cee100b9941771e6066b094e", size = 331048, upload-time = "2026-02-17T17:00:46.991Z" },
]
[[package]]