Compare commits

...

2 Commits

Author SHA1 Message Date
木之本澪
df3c66a8ac test: migrate duplicate_document_indexing_task SQL tests to testcontainers (#32540)
Some checks are pending
autofix.ci / autofix (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Waiting to run
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Blocked by required conditions
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Blocked by required conditions
Main CI Pipeline / Check Changed Files (push) Waiting to run
Main CI Pipeline / API Tests (push) Blocked by required conditions
Main CI Pipeline / Web Tests (push) Blocked by required conditions
Main CI Pipeline / Style Check (push) Waiting to run
Main CI Pipeline / VDB Tests (push) Blocked by required conditions
Main CI Pipeline / DB Migration Test (push) Blocked by required conditions
Co-authored-by: KinomotoMio <200703522+KinomotoMio@users.noreply.github.com>
2026-03-05 02:35:24 +09:00
Stephen Zhou
7252ce6f26 chore: add react refresh plugin for vinext (#32996) 2026-03-05 01:09:32 +08:00
3 changed files with 98 additions and 396 deletions

View File

@@ -3,6 +3,7 @@ from unittest.mock import MagicMock, patch
import pytest
from faker import Faker
from core.indexing_runner import DocumentIsPausedError
from enums.cloud_plan import CloudPlan
from models import Account, Tenant, TenantAccountJoin, TenantAccountRole
from models.dataset import Dataset, Document, DocumentSegment
@@ -282,7 +283,7 @@ class TestDuplicateDocumentIndexingTasks:
return dataset, documents
def test_duplicate_document_indexing_task_success(
def _test_duplicate_document_indexing_task_success(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
@@ -324,7 +325,7 @@ class TestDuplicateDocumentIndexingTasks:
processed_documents = call_args[0][0] # First argument should be documents list
assert len(processed_documents) == 3
def test_duplicate_document_indexing_task_with_segment_cleanup(
def _test_duplicate_document_indexing_task_with_segment_cleanup(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
@@ -374,7 +375,7 @@ class TestDuplicateDocumentIndexingTasks:
mock_external_service_dependencies["indexing_runner"].assert_called_once()
mock_external_service_dependencies["indexing_runner_instance"].run.assert_called_once()
def test_duplicate_document_indexing_task_dataset_not_found(
def _test_duplicate_document_indexing_task_dataset_not_found(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
@@ -445,7 +446,7 @@ class TestDuplicateDocumentIndexingTasks:
processed_documents = call_args[0][0] # First argument should be documents list
assert len(processed_documents) == 2 # Only existing documents
def test_duplicate_document_indexing_task_indexing_runner_exception(
def _test_duplicate_document_indexing_task_indexing_runner_exception(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
@@ -486,7 +487,7 @@ class TestDuplicateDocumentIndexingTasks:
assert updated_document.indexing_status == "parsing"
assert updated_document.processing_started_at is not None
def test_duplicate_document_indexing_task_billing_sandbox_plan_batch_limit(
def _test_duplicate_document_indexing_task_billing_sandbox_plan_batch_limit(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
@@ -549,7 +550,7 @@ class TestDuplicateDocumentIndexingTasks:
# Verify indexing runner was not called due to early validation error
mock_external_service_dependencies["indexing_runner_instance"].run.assert_not_called()
def test_duplicate_document_indexing_task_billing_vector_space_limit_exceeded(
def _test_duplicate_document_indexing_task_billing_vector_space_limit_exceeded(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
@@ -783,3 +784,90 @@ class TestDuplicateDocumentIndexingTasks:
document_ids=document_ids,
)
mock_queue.delete_task_key.assert_not_called()
def test_successful_duplicate_document_indexing(
self, db_session_with_containers, mock_external_service_dependencies
):
"""Test successful duplicate document indexing flow."""
self._test_duplicate_document_indexing_task_success(
db_session_with_containers, mock_external_service_dependencies
)
def test_duplicate_document_indexing_dataset_not_found(
self, db_session_with_containers, mock_external_service_dependencies
):
"""Test duplicate document indexing when dataset is not found."""
self._test_duplicate_document_indexing_task_dataset_not_found(
db_session_with_containers, mock_external_service_dependencies
)
def test_duplicate_document_indexing_with_billing_enabled_sandbox_plan(
self, db_session_with_containers, mock_external_service_dependencies
):
"""Test duplicate document indexing with billing enabled and sandbox plan."""
self._test_duplicate_document_indexing_task_billing_sandbox_plan_batch_limit(
db_session_with_containers, mock_external_service_dependencies
)
def test_duplicate_document_indexing_with_billing_limit_exceeded(
self, db_session_with_containers, mock_external_service_dependencies
):
"""Test duplicate document indexing when billing limit is exceeded."""
self._test_duplicate_document_indexing_task_billing_vector_space_limit_exceeded(
db_session_with_containers, mock_external_service_dependencies
)
def test_duplicate_document_indexing_runner_error(
self, db_session_with_containers, mock_external_service_dependencies
):
"""Test duplicate document indexing when IndexingRunner raises an error."""
self._test_duplicate_document_indexing_task_indexing_runner_exception(
db_session_with_containers, mock_external_service_dependencies
)
def _test_duplicate_document_indexing_task_document_is_paused(
self, db_session_with_containers, mock_external_service_dependencies
):
"""Test duplicate document indexing when document is paused."""
# Arrange
dataset, documents = self._create_test_dataset_and_documents(
db_session_with_containers, mock_external_service_dependencies, document_count=2
)
for document in documents:
document.is_paused = True
db_session_with_containers.add(document)
db_session_with_containers.commit()
document_ids = [doc.id for doc in documents]
mock_external_service_dependencies["indexing_runner_instance"].run.side_effect = DocumentIsPausedError(
"Document paused"
)
# Act
_duplicate_document_indexing_task(dataset.id, document_ids)
db_session_with_containers.expire_all()
# Assert
for doc_id in document_ids:
updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first()
assert updated_document.is_paused is True
assert updated_document.indexing_status == "parsing"
assert updated_document.display_status == "paused"
assert updated_document.processing_started_at is not None
mock_external_service_dependencies["indexing_runner_instance"].run.assert_called_once()
def test_duplicate_document_indexing_document_is_paused(
self, db_session_with_containers, mock_external_service_dependencies
):
"""Test duplicate document indexing when document is paused."""
self._test_duplicate_document_indexing_task_document_is_paused(
db_session_with_containers, mock_external_service_dependencies
)
def test_duplicate_document_indexing_cleans_old_segments(
self, db_session_with_containers, mock_external_service_dependencies
):
"""Test that duplicate document indexing cleans old segments."""
self._test_duplicate_document_indexing_task_with_segment_cleanup(
db_session_with_containers, mock_external_service_dependencies
)

View File

@@ -1,158 +1,38 @@
"""
Unit tests for duplicate document indexing tasks.
This module tests the duplicate document indexing task functionality including:
- Task enqueuing to different queues (normal, priority, tenant-isolated)
- Batch processing of multiple duplicate documents
- Progress tracking through task lifecycle
- Error handling and retry mechanisms
- Cleanup of old document data before re-indexing
"""
"""Unit tests for queue/wrapper behaviors in duplicate document indexing tasks (non-database logic)."""
import uuid
from unittest.mock import MagicMock, Mock, patch
from unittest.mock import Mock, patch
import pytest
from core.indexing_runner import DocumentIsPausedError, IndexingRunner
from core.rag.pipeline.queue import TenantIsolatedTaskQueue
from enums.cloud_plan import CloudPlan
from models.dataset import Dataset, Document, DocumentSegment
from tasks.duplicate_document_indexing_task import (
_duplicate_document_indexing_task,
_duplicate_document_indexing_task_with_tenant_queue,
duplicate_document_indexing_task,
normal_duplicate_document_indexing_task,
priority_duplicate_document_indexing_task,
)
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture
def tenant_id():
"""Generate a unique tenant ID for testing."""
return str(uuid.uuid4())
@pytest.fixture
def dataset_id():
"""Generate a unique dataset ID for testing."""
return str(uuid.uuid4())
@pytest.fixture
def document_ids():
"""Generate a list of document IDs for testing."""
return [str(uuid.uuid4()) for _ in range(3)]
@pytest.fixture
def mock_dataset(dataset_id, tenant_id):
"""Create a mock Dataset object."""
dataset = Mock(spec=Dataset)
dataset.id = dataset_id
dataset.tenant_id = tenant_id
dataset.indexing_technique = "high_quality"
dataset.embedding_model_provider = "openai"
dataset.embedding_model = "text-embedding-ada-002"
return dataset
@pytest.fixture
def mock_documents(document_ids, dataset_id):
"""Create mock Document objects."""
documents = []
for doc_id in document_ids:
doc = Mock(spec=Document)
doc.id = doc_id
doc.dataset_id = dataset_id
doc.indexing_status = "waiting"
doc.error = None
doc.stopped_at = None
doc.processing_started_at = None
doc.doc_form = "text_model"
documents.append(doc)
return documents
@pytest.fixture
def mock_document_segments(document_ids):
"""Create mock DocumentSegment objects."""
segments = []
for doc_id in document_ids:
for i in range(3):
segment = Mock(spec=DocumentSegment)
segment.id = str(uuid.uuid4())
segment.document_id = doc_id
segment.index_node_id = f"node-{doc_id}-{i}"
segments.append(segment)
return segments
@pytest.fixture
def mock_db_session():
"""Mock database session via session_factory.create_session()."""
with patch("tasks.duplicate_document_indexing_task.session_factory", autospec=True) as mock_sf:
session = MagicMock()
# Allow tests to observe session.close() via context manager teardown
session.close = MagicMock()
cm = MagicMock()
cm.__enter__.return_value = session
def _exit_side_effect(*args, **kwargs):
session.close()
cm.__exit__.side_effect = _exit_side_effect
mock_sf.create_session.return_value = cm
query = MagicMock()
session.query.return_value = query
query.where.return_value = query
session.scalars.return_value = MagicMock()
yield session
@pytest.fixture
def mock_indexing_runner():
"""Mock IndexingRunner."""
with patch("tasks.duplicate_document_indexing_task.IndexingRunner", autospec=True) as mock_runner_class:
mock_runner = MagicMock(spec=IndexingRunner)
mock_runner_class.return_value = mock_runner
yield mock_runner
@pytest.fixture
def mock_feature_service():
"""Mock FeatureService."""
with patch("tasks.duplicate_document_indexing_task.FeatureService", autospec=True) as mock_service:
mock_features = Mock()
mock_features.billing = Mock()
mock_features.billing.enabled = False
mock_features.vector_space = Mock()
mock_features.vector_space.size = 0
mock_features.vector_space.limit = 1000
mock_service.get_features.return_value = mock_features
yield mock_service
@pytest.fixture
def mock_index_processor_factory():
"""Mock IndexProcessorFactory."""
with patch("tasks.duplicate_document_indexing_task.IndexProcessorFactory", autospec=True) as mock_factory:
mock_processor = MagicMock()
mock_processor.clean = Mock()
mock_factory.return_value.init_index_processor.return_value = mock_processor
yield mock_factory
@pytest.fixture
def mock_tenant_isolated_queue():
"""Mock TenantIsolatedTaskQueue."""
with patch("tasks.duplicate_document_indexing_task.TenantIsolatedTaskQueue", autospec=True) as mock_queue_class:
mock_queue = MagicMock(spec=TenantIsolatedTaskQueue)
mock_queue = Mock(spec=TenantIsolatedTaskQueue)
mock_queue.pull_tasks.return_value = []
mock_queue.delete_task_key = Mock()
mock_queue.set_task_waiting_time = Mock()
@@ -160,11 +40,6 @@ def mock_tenant_isolated_queue():
yield mock_queue
# ============================================================================
# Tests for deprecated duplicate_document_indexing_task
# ============================================================================
class TestDuplicateDocumentIndexingTask:
"""Tests for the deprecated duplicate_document_indexing_task function."""
@@ -190,258 +65,6 @@ class TestDuplicateDocumentIndexingTask:
mock_core_func.assert_called_once_with(dataset_id, document_ids)
# ============================================================================
# Tests for _duplicate_document_indexing_task core function
# ============================================================================
class TestDuplicateDocumentIndexingTaskCore:
"""Tests for the _duplicate_document_indexing_task core function."""
def test_successful_duplicate_document_indexing(
self,
mock_db_session,
mock_indexing_runner,
mock_feature_service,
mock_index_processor_factory,
mock_dataset,
mock_documents,
mock_document_segments,
dataset_id,
document_ids,
):
"""Test successful duplicate document indexing flow."""
# Arrange
# Dataset via query.first()
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
# scalars() call sequence:
# 1) documents list
# 2..N) segments per document
def _scalars_side_effect(*args, **kwargs):
m = MagicMock()
# First call returns documents; subsequent calls return segments
if not hasattr(_scalars_side_effect, "_calls"):
_scalars_side_effect._calls = 0
if _scalars_side_effect._calls == 0:
m.all.return_value = mock_documents
else:
m.all.return_value = mock_document_segments
_scalars_side_effect._calls += 1
return m
mock_db_session.scalars.side_effect = _scalars_side_effect
# Act
_duplicate_document_indexing_task(dataset_id, document_ids)
# Assert
# Verify IndexingRunner was called
mock_indexing_runner.run.assert_called_once()
# Verify all documents were set to parsing status
for doc in mock_documents:
assert doc.indexing_status == "parsing"
assert doc.processing_started_at is not None
# Verify session operations
assert mock_db_session.commit.called
assert mock_db_session.close.called
def test_duplicate_document_indexing_dataset_not_found(self, mock_db_session, dataset_id, document_ids):
"""Test duplicate document indexing when dataset is not found."""
# Arrange
mock_db_session.query.return_value.where.return_value.first.return_value = None
# Act
_duplicate_document_indexing_task(dataset_id, document_ids)
# Assert
# Should close the session at least once
assert mock_db_session.close.called
def test_duplicate_document_indexing_with_billing_enabled_sandbox_plan(
self,
mock_db_session,
mock_feature_service,
mock_dataset,
dataset_id,
document_ids,
):
"""Test duplicate document indexing with billing enabled and sandbox plan."""
# Arrange
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
mock_features = mock_feature_service.get_features.return_value
mock_features.billing.enabled = True
mock_features.billing.subscription.plan = CloudPlan.SANDBOX
# Act
_duplicate_document_indexing_task(dataset_id, document_ids)
# Assert
# For sandbox plan with multiple documents, should fail
mock_db_session.commit.assert_called()
def test_duplicate_document_indexing_with_billing_limit_exceeded(
self,
mock_db_session,
mock_feature_service,
mock_dataset,
mock_documents,
dataset_id,
document_ids,
):
"""Test duplicate document indexing when billing limit is exceeded."""
# Arrange
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
# First scalars() -> documents; subsequent -> empty segments
def _scalars_side_effect(*args, **kwargs):
m = MagicMock()
if not hasattr(_scalars_side_effect, "_calls"):
_scalars_side_effect._calls = 0
if _scalars_side_effect._calls == 0:
m.all.return_value = mock_documents
else:
m.all.return_value = []
_scalars_side_effect._calls += 1
return m
mock_db_session.scalars.side_effect = _scalars_side_effect
mock_features = mock_feature_service.get_features.return_value
mock_features.billing.enabled = True
mock_features.billing.subscription.plan = CloudPlan.TEAM
mock_features.vector_space.size = 990
mock_features.vector_space.limit = 1000
# Act
_duplicate_document_indexing_task(dataset_id, document_ids)
# Assert
# Should commit the session
assert mock_db_session.commit.called
# Should close the session
assert mock_db_session.close.called
def test_duplicate_document_indexing_runner_error(
self,
mock_db_session,
mock_indexing_runner,
mock_feature_service,
mock_index_processor_factory,
mock_dataset,
mock_documents,
dataset_id,
document_ids,
):
"""Test duplicate document indexing when IndexingRunner raises an error."""
# Arrange
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
def _scalars_side_effect(*args, **kwargs):
m = MagicMock()
if not hasattr(_scalars_side_effect, "_calls"):
_scalars_side_effect._calls = 0
if _scalars_side_effect._calls == 0:
m.all.return_value = mock_documents
else:
m.all.return_value = []
_scalars_side_effect._calls += 1
return m
mock_db_session.scalars.side_effect = _scalars_side_effect
mock_indexing_runner.run.side_effect = Exception("Indexing error")
# Act
_duplicate_document_indexing_task(dataset_id, document_ids)
# Assert
# Should close the session even after error
mock_db_session.close.assert_called_once()
def test_duplicate_document_indexing_document_is_paused(
self,
mock_db_session,
mock_indexing_runner,
mock_feature_service,
mock_index_processor_factory,
mock_dataset,
mock_documents,
dataset_id,
document_ids,
):
"""Test duplicate document indexing when document is paused."""
# Arrange
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
def _scalars_side_effect(*args, **kwargs):
m = MagicMock()
if not hasattr(_scalars_side_effect, "_calls"):
_scalars_side_effect._calls = 0
if _scalars_side_effect._calls == 0:
m.all.return_value = mock_documents
else:
m.all.return_value = []
_scalars_side_effect._calls += 1
return m
mock_db_session.scalars.side_effect = _scalars_side_effect
mock_indexing_runner.run.side_effect = DocumentIsPausedError("Document paused")
# Act
_duplicate_document_indexing_task(dataset_id, document_ids)
# Assert
# Should handle DocumentIsPausedError gracefully
mock_db_session.close.assert_called_once()
def test_duplicate_document_indexing_cleans_old_segments(
self,
mock_db_session,
mock_indexing_runner,
mock_feature_service,
mock_index_processor_factory,
mock_dataset,
mock_documents,
mock_document_segments,
dataset_id,
document_ids,
):
"""Test that duplicate document indexing cleans old segments."""
# Arrange
mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset
def _scalars_side_effect(*args, **kwargs):
m = MagicMock()
if not hasattr(_scalars_side_effect, "_calls"):
_scalars_side_effect._calls = 0
if _scalars_side_effect._calls == 0:
m.all.return_value = mock_documents
else:
m.all.return_value = mock_document_segments
_scalars_side_effect._calls += 1
return m
mock_db_session.scalars.side_effect = _scalars_side_effect
mock_processor = mock_index_processor_factory.return_value.init_index_processor.return_value
# Act
_duplicate_document_indexing_task(dataset_id, document_ids)
# Assert
# Verify clean was called for each document
assert mock_processor.clean.call_count == len(mock_documents)
# Verify segments were deleted in batch (DELETE FROM document_segments)
execute_sqls = [" ".join(str(c[0][0]).split()) for c in mock_db_session.execute.call_args_list]
assert any("DELETE FROM document_segments" in sql for sql in execute_sqls)
# ============================================================================
# Tests for tenant queue wrapper function
# ============================================================================
class TestDuplicateDocumentIndexingTaskWithTenantQueue:
"""Tests for _duplicate_document_indexing_task_with_tenant_queue function."""
@@ -536,11 +159,6 @@ class TestDuplicateDocumentIndexingTaskWithTenantQueue:
mock_tenant_isolated_queue.pull_tasks.assert_called_once()
# ============================================================================
# Tests for normal_duplicate_document_indexing_task
# ============================================================================
class TestNormalDuplicateDocumentIndexingTask:
"""Tests for normal_duplicate_document_indexing_task function."""
@@ -581,11 +199,6 @@ class TestNormalDuplicateDocumentIndexingTask:
)
# ============================================================================
# Tests for priority_duplicate_document_indexing_task
# ============================================================================
class TestPriorityDuplicateDocumentIndexingTask:
"""Tests for priority_duplicate_document_indexing_task function."""

View File

@@ -91,6 +91,7 @@ export default defineConfig(({ mode }) => {
: [
createCodeInspectorPlugin(),
createForceInspectorClientInjectionPlugin(),
react(),
vinext(),
],
resolve: {