mirror of
https://github.com/langgenius/dify.git
synced 2026-02-26 19:35:10 +00:00
Compare commits
1 Commits
main
...
inject-red
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6a853d75ea |
88
.github/workflows/pyrefly-diff-comment.yml
vendored
88
.github/workflows/pyrefly-diff-comment.yml
vendored
@@ -1,88 +0,0 @@
|
||||
name: Comment with Pyrefly Diff
|
||||
|
||||
on:
|
||||
workflow_run:
|
||||
workflows:
|
||||
- Pyrefly Diff Check
|
||||
types:
|
||||
- completed
|
||||
|
||||
permissions: {}
|
||||
|
||||
jobs:
|
||||
comment:
|
||||
name: Comment PR with pyrefly diff
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
actions: read
|
||||
contents: read
|
||||
issues: write
|
||||
pull-requests: write
|
||||
if: ${{ github.event.workflow_run.conclusion == 'success' && github.event.workflow_run.pull_requests[0].head.repo.full_name != github.repository }}
|
||||
steps:
|
||||
- name: Download pyrefly diff artifact
|
||||
uses: actions/github-script@v8
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
const fs = require('fs');
|
||||
const artifacts = await github.rest.actions.listWorkflowRunArtifacts({
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
run_id: ${{ github.event.workflow_run.id }},
|
||||
});
|
||||
const match = artifacts.data.artifacts.find((artifact) =>
|
||||
artifact.name === 'pyrefly_diff'
|
||||
);
|
||||
if (!match) {
|
||||
throw new Error('pyrefly_diff artifact not found');
|
||||
}
|
||||
const download = await github.rest.actions.downloadArtifact({
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
artifact_id: match.id,
|
||||
archive_format: 'zip',
|
||||
});
|
||||
fs.writeFileSync('pyrefly_diff.zip', Buffer.from(download.data));
|
||||
|
||||
- name: Unzip artifact
|
||||
run: unzip -o pyrefly_diff.zip
|
||||
|
||||
- name: Post comment
|
||||
uses: actions/github-script@v8
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
const fs = require('fs');
|
||||
let diff = fs.readFileSync('pyrefly_diff.txt', { encoding: 'utf8' });
|
||||
let prNumber = null;
|
||||
try {
|
||||
prNumber = parseInt(fs.readFileSync('pr_number.txt', { encoding: 'utf8' }), 10);
|
||||
} catch (err) {
|
||||
// Fallback to workflow_run payload if artifact is missing or incomplete.
|
||||
const prs = context.payload.workflow_run.pull_requests || [];
|
||||
if (prs.length > 0 && prs[0].number) {
|
||||
prNumber = prs[0].number;
|
||||
}
|
||||
}
|
||||
if (!prNumber) {
|
||||
throw new Error('PR number not found in artifact or workflow_run payload');
|
||||
}
|
||||
|
||||
const MAX_CHARS = 65000;
|
||||
if (diff.length > MAX_CHARS) {
|
||||
diff = diff.slice(0, MAX_CHARS);
|
||||
diff = diff.slice(0, diff.lastIndexOf('\\n'));
|
||||
diff += '\\n\\n... (truncated) ...';
|
||||
}
|
||||
|
||||
const body = diff.trim()
|
||||
? `### Pyrefly Diff (base → PR)\\n\\`\\`\\`diff\\n${diff}\\n\\`\\`\\``
|
||||
: '### Pyrefly Diff\\nNo changes detected.';
|
||||
|
||||
await github.rest.issues.createComment({
|
||||
issue_number: prNumber,
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
body,
|
||||
});
|
||||
85
.github/workflows/pyrefly-diff.yml
vendored
85
.github/workflows/pyrefly-diff.yml
vendored
@@ -1,85 +0,0 @@
|
||||
name: Pyrefly Diff Check
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
paths:
|
||||
- 'api/**/*.py'
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
pyrefly-diff:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
issues: write
|
||||
pull-requests: write
|
||||
steps:
|
||||
- name: Checkout PR branch
|
||||
uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Setup Python & UV
|
||||
uses: astral-sh/setup-uv@v5
|
||||
with:
|
||||
enable-cache: true
|
||||
|
||||
- name: Install dependencies
|
||||
run: uv sync --project api --dev
|
||||
|
||||
- name: Run pyrefly on PR branch
|
||||
run: |
|
||||
uv run --directory api pyrefly check > /tmp/pyrefly_pr.txt 2>&1 || true
|
||||
|
||||
- name: Checkout base branch
|
||||
run: git checkout ${{ github.base_ref }}
|
||||
|
||||
- name: Run pyrefly on base branch
|
||||
run: |
|
||||
uv run --directory api pyrefly check > /tmp/pyrefly_base.txt 2>&1 || true
|
||||
|
||||
- name: Compute diff
|
||||
run: |
|
||||
diff /tmp/pyrefly_base.txt /tmp/pyrefly_pr.txt > pyrefly_diff.txt || true
|
||||
|
||||
- name: Save PR number
|
||||
run: |
|
||||
echo ${{ github.event.pull_request.number }} > pr_number.txt
|
||||
|
||||
- name: Upload pyrefly diff
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: pyrefly_diff
|
||||
path: |
|
||||
pyrefly_diff.txt
|
||||
pr_number.txt
|
||||
|
||||
- name: Comment PR with pyrefly diff
|
||||
if: ${{ github.event.pull_request.head.repo.full_name == github.repository }}
|
||||
uses: actions/github-script@v8
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
const fs = require('fs');
|
||||
let diff = fs.readFileSync('pyrefly_diff.txt', { encoding: 'utf8' });
|
||||
const prNumber = context.payload.pull_request.number;
|
||||
|
||||
const MAX_CHARS = 65000;
|
||||
if (diff.length > MAX_CHARS) {
|
||||
diff = diff.slice(0, MAX_CHARS);
|
||||
diff = diff.slice(0, diff.lastIndexOf('\n'));
|
||||
diff += '\n\n... (truncated) ...';
|
||||
}
|
||||
|
||||
const body = diff.trim()
|
||||
? `### Pyrefly Diff (base → PR)\n\`\`\`diff\n${diff}\n\`\`\``
|
||||
: '### Pyrefly Diff\nNo changes detected.';
|
||||
|
||||
await github.rest.issues.createComment({
|
||||
issue_number: prNumber,
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
body,
|
||||
});
|
||||
@@ -56,8 +56,6 @@ ignore_imports =
|
||||
core.workflow.nodes.llm.llm_utils -> extensions.ext_database
|
||||
core.workflow.nodes.llm.node -> extensions.ext_database
|
||||
core.workflow.nodes.tool.tool_node -> extensions.ext_database
|
||||
core.workflow.graph_engine.command_channels.redis_channel -> extensions.ext_redis
|
||||
core.workflow.graph_engine.manager -> extensions.ext_redis
|
||||
# TODO(QuantumGhost): use DI to avoid depending on global DB.
|
||||
core.workflow.nodes.human_input.human_input_node -> extensions.ext_database
|
||||
|
||||
@@ -105,7 +103,6 @@ forbidden_modules =
|
||||
core.variables
|
||||
ignore_imports =
|
||||
core.workflow.nodes.loop.loop_node -> core.app.workflow.node_factory
|
||||
core.workflow.graph_engine.command_channels.redis_channel -> extensions.ext_redis
|
||||
core.workflow.workflow_entry -> core.app.workflow.layers.observability
|
||||
core.workflow.nodes.agent.agent_node -> core.model_manager
|
||||
core.workflow.nodes.agent.agent_node -> core.provider_manager
|
||||
@@ -243,7 +240,6 @@ ignore_imports =
|
||||
core.workflow.variable_loader -> core.variables
|
||||
core.workflow.variable_loader -> core.variables.consts
|
||||
core.workflow.workflow_type_encoder -> core.variables
|
||||
core.workflow.graph_engine.manager -> extensions.ext_redis
|
||||
core.workflow.nodes.agent.agent_node -> extensions.ext_database
|
||||
core.workflow.nodes.datasource.datasource_node -> extensions.ext_database
|
||||
core.workflow.nodes.knowledge_index.knowledge_index_node -> extensions.ext_database
|
||||
|
||||
@@ -33,6 +33,7 @@ from core.workflow.enums import NodeType
|
||||
from core.workflow.file.models import File
|
||||
from core.workflow.graph_engine.manager import GraphEngineManager
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from factories import file_factory, variable_factory
|
||||
from fields.member_fields import simple_account_fields
|
||||
from fields.workflow_fields import workflow_fields, workflow_pagination_fields
|
||||
@@ -740,7 +741,7 @@ class WorkflowTaskStopApi(Resource):
|
||||
AppQueueManager.set_stop_flag_no_user_check(task_id)
|
||||
|
||||
# New graph engine command channel mechanism
|
||||
GraphEngineManager.send_stop_command(task_id)
|
||||
GraphEngineManager(redis_client).send_stop_command(task_id)
|
||||
|
||||
return {"result": "success"}
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ from core.errors.error import (
|
||||
from core.model_runtime.errors.invoke import InvokeError
|
||||
from core.workflow.graph_engine.manager import GraphEngineManager
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from fields.app_fields import (
|
||||
app_detail_fields_with_site,
|
||||
deleted_tool_fields,
|
||||
@@ -225,7 +226,7 @@ class TrialAppWorkflowTaskStopApi(TrialAppResource):
|
||||
AppQueueManager.set_stop_flag_no_user_check(task_id)
|
||||
|
||||
# New graph engine command channel mechanism
|
||||
GraphEngineManager.send_stop_command(task_id)
|
||||
GraphEngineManager(redis_client).send_stop_command(task_id)
|
||||
|
||||
return {"result": "success"}
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ from core.errors.error import (
|
||||
)
|
||||
from core.model_runtime.errors.invoke import InvokeError
|
||||
from core.workflow.graph_engine.manager import GraphEngineManager
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs import helper
|
||||
from libs.login import current_account_with_tenant
|
||||
from models.model import AppMode, InstalledApp
|
||||
@@ -100,6 +101,6 @@ class InstalledAppWorkflowTaskStopApi(InstalledAppResource):
|
||||
AppQueueManager.set_stop_flag_no_user_check(task_id)
|
||||
|
||||
# New graph engine command channel mechanism
|
||||
GraphEngineManager.send_stop_command(task_id)
|
||||
GraphEngineManager(redis_client).send_stop_command(task_id)
|
||||
|
||||
return {"result": "success"}
|
||||
|
||||
@@ -31,6 +31,7 @@ from core.model_runtime.errors.invoke import InvokeError
|
||||
from core.workflow.enums import WorkflowExecutionStatus
|
||||
from core.workflow.graph_engine.manager import GraphEngineManager
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from fields.workflow_app_log_fields import build_workflow_app_log_pagination_model
|
||||
from libs import helper
|
||||
from libs.helper import OptionalTimestampField, TimestampField
|
||||
@@ -280,7 +281,7 @@ class WorkflowTaskStopApi(Resource):
|
||||
AppQueueManager.set_stop_flag_no_user_check(task_id)
|
||||
|
||||
# New graph engine command channel mechanism
|
||||
GraphEngineManager.send_stop_command(task_id)
|
||||
GraphEngineManager(redis_client).send_stop_command(task_id)
|
||||
|
||||
return {"result": "success"}
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ from core.errors.error import (
|
||||
)
|
||||
from core.model_runtime.errors.invoke import InvokeError
|
||||
from core.workflow.graph_engine.manager import GraphEngineManager
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs import helper
|
||||
from models.model import App, AppMode, EndUser
|
||||
from services.app_generate_service import AppGenerateService
|
||||
@@ -121,6 +122,6 @@ class WorkflowTaskStopApi(WebApiResource):
|
||||
AppQueueManager.set_stop_flag_no_user_check(task_id)
|
||||
|
||||
# New graph engine command channel mechanism
|
||||
GraphEngineManager.send_stop_command(task_id)
|
||||
GraphEngineManager(redis_client).send_stop_command(task_id)
|
||||
|
||||
return {"result": "success"}
|
||||
|
||||
@@ -7,12 +7,28 @@ Each instance uses a unique key for its command queue.
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import TYPE_CHECKING, Any, final
|
||||
from contextlib import AbstractContextManager
|
||||
from typing import Any, Protocol, final
|
||||
|
||||
from ..entities.commands import AbortCommand, CommandType, GraphEngineCommand, PauseCommand, UpdateVariablesCommand
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from extensions.ext_redis import RedisClientWrapper
|
||||
|
||||
class RedisPipelineProtocol(Protocol):
|
||||
"""Minimal Redis pipeline contract used by the command channel."""
|
||||
|
||||
def lrange(self, name: str, start: int, end: int) -> Any: ...
|
||||
def delete(self, *names: str) -> Any: ...
|
||||
def execute(self) -> list[Any]: ...
|
||||
def rpush(self, name: str, *values: str) -> Any: ...
|
||||
def expire(self, name: str, time: int) -> Any: ...
|
||||
def set(self, name: str, value: str, ex: int | None = None) -> Any: ...
|
||||
def get(self, name: str) -> Any: ...
|
||||
|
||||
|
||||
class RedisClientProtocol(Protocol):
|
||||
"""Redis client contract required by the command channel."""
|
||||
|
||||
def pipeline(self) -> AbstractContextManager[RedisPipelineProtocol]: ...
|
||||
|
||||
|
||||
@final
|
||||
@@ -26,7 +42,7 @@ class RedisChannel:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
redis_client: "RedisClientWrapper",
|
||||
redis_client: RedisClientProtocol,
|
||||
channel_key: str,
|
||||
command_ttl: int = 3600,
|
||||
) -> None:
|
||||
|
||||
@@ -3,13 +3,14 @@ GraphEngine Manager for sending control commands via Redis channel.
|
||||
|
||||
This module provides a simplified interface for controlling workflow executions
|
||||
using the new Redis command channel, without requiring user permission checks.
|
||||
Callers must provide a Redis client dependency from outside the workflow package.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from collections.abc import Sequence
|
||||
from typing import final
|
||||
|
||||
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
|
||||
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel, RedisClientProtocol
|
||||
from core.workflow.graph_engine.entities.commands import (
|
||||
AbortCommand,
|
||||
GraphEngineCommand,
|
||||
@@ -17,7 +18,6 @@ from core.workflow.graph_engine.entities.commands import (
|
||||
UpdateVariablesCommand,
|
||||
VariableUpdate,
|
||||
)
|
||||
from extensions.ext_redis import redis_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -31,8 +31,12 @@ class GraphEngineManager:
|
||||
by sending commands through Redis channels, without user validation.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def send_stop_command(task_id: str, reason: str | None = None) -> None:
|
||||
_redis_client: RedisClientProtocol
|
||||
|
||||
def __init__(self, redis_client: RedisClientProtocol) -> None:
|
||||
self._redis_client = redis_client
|
||||
|
||||
def send_stop_command(self, task_id: str, reason: str | None = None) -> None:
|
||||
"""
|
||||
Send a stop command to a running workflow.
|
||||
|
||||
@@ -41,34 +45,31 @@ class GraphEngineManager:
|
||||
reason: Optional reason for stopping (defaults to "User requested stop")
|
||||
"""
|
||||
abort_command = AbortCommand(reason=reason or "User requested stop")
|
||||
GraphEngineManager._send_command(task_id, abort_command)
|
||||
self._send_command(task_id, abort_command)
|
||||
|
||||
@staticmethod
|
||||
def send_pause_command(task_id: str, reason: str | None = None) -> None:
|
||||
def send_pause_command(self, task_id: str, reason: str | None = None) -> None:
|
||||
"""Send a pause command to a running workflow."""
|
||||
|
||||
pause_command = PauseCommand(reason=reason or "User requested pause")
|
||||
GraphEngineManager._send_command(task_id, pause_command)
|
||||
self._send_command(task_id, pause_command)
|
||||
|
||||
@staticmethod
|
||||
def send_update_variables_command(task_id: str, updates: Sequence[VariableUpdate]) -> None:
|
||||
def send_update_variables_command(self, task_id: str, updates: Sequence[VariableUpdate]) -> None:
|
||||
"""Send a command to update variables in a running workflow."""
|
||||
|
||||
if not updates:
|
||||
return
|
||||
|
||||
update_command = UpdateVariablesCommand(updates=updates)
|
||||
GraphEngineManager._send_command(task_id, update_command)
|
||||
self._send_command(task_id, update_command)
|
||||
|
||||
@staticmethod
|
||||
def _send_command(task_id: str, command: GraphEngineCommand) -> None:
|
||||
def _send_command(self, task_id: str, command: GraphEngineCommand) -> None:
|
||||
"""Send a command to the workflow-specific Redis channel."""
|
||||
|
||||
if not task_id:
|
||||
return
|
||||
|
||||
channel_key = f"workflow:{task_id}:commands"
|
||||
channel = RedisChannel(redis_client, channel_key)
|
||||
channel = RedisChannel(self._redis_client, channel_key)
|
||||
|
||||
try:
|
||||
channel.send_command(command)
|
||||
|
||||
@@ -111,6 +111,7 @@ class RedisClientWrapper:
|
||||
def zcard(self, name: str | bytes) -> Any: ...
|
||||
def getdel(self, name: str | bytes) -> Any: ...
|
||||
def pubsub(self) -> PubSub: ...
|
||||
def pipeline(self, transaction: bool = True, shard_hint: str | None = None) -> Any: ...
|
||||
|
||||
def __getattr__(self, item: str) -> Any:
|
||||
if self._client is None:
|
||||
|
||||
@@ -83,5 +83,5 @@ class AwsS3Storage(BaseStorage):
|
||||
except:
|
||||
return False
|
||||
|
||||
def delete(self, filename: str):
|
||||
def delete(self, filename):
|
||||
self.client.delete_object(Bucket=self.bucket_name, Key=filename)
|
||||
|
||||
@@ -75,7 +75,7 @@ class AzureBlobStorage(BaseStorage):
|
||||
blob = client.get_blob_client(container=self.bucket_name, blob=filename)
|
||||
return blob.exists()
|
||||
|
||||
def delete(self, filename: str):
|
||||
def delete(self, filename):
|
||||
if not self.bucket_name:
|
||||
return
|
||||
|
||||
|
||||
@@ -53,5 +53,5 @@ class BaiduObsStorage(BaseStorage):
|
||||
return False
|
||||
return True
|
||||
|
||||
def delete(self, filename: str):
|
||||
def delete(self, filename):
|
||||
self.client.delete_object(bucket_name=self.bucket_name, key=filename)
|
||||
|
||||
@@ -28,7 +28,7 @@ class BaseStorage(ABC):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def delete(self, filename: str):
|
||||
def delete(self, filename):
|
||||
raise NotImplementedError
|
||||
|
||||
def scan(self, path, files=True, directories=False) -> list[str]:
|
||||
|
||||
@@ -61,6 +61,6 @@ class GoogleCloudStorage(BaseStorage):
|
||||
blob = bucket.blob(filename)
|
||||
return blob.exists()
|
||||
|
||||
def delete(self, filename: str):
|
||||
def delete(self, filename):
|
||||
bucket = self.client.get_bucket(self.bucket_name)
|
||||
bucket.delete_blob(filename)
|
||||
|
||||
@@ -41,7 +41,7 @@ class HuaweiObsStorage(BaseStorage):
|
||||
return False
|
||||
return True
|
||||
|
||||
def delete(self, filename: str):
|
||||
def delete(self, filename):
|
||||
self.client.deleteObject(bucketName=self.bucket_name, objectKey=filename)
|
||||
|
||||
def _get_meta(self, filename):
|
||||
|
||||
@@ -55,5 +55,5 @@ class OracleOCIStorage(BaseStorage):
|
||||
except:
|
||||
return False
|
||||
|
||||
def delete(self, filename: str):
|
||||
def delete(self, filename):
|
||||
self.client.delete_object(Bucket=self.bucket_name, Key=filename)
|
||||
|
||||
@@ -51,7 +51,7 @@ class SupabaseStorage(BaseStorage):
|
||||
return True
|
||||
return False
|
||||
|
||||
def delete(self, filename: str):
|
||||
def delete(self, filename):
|
||||
self.client.storage.from_(self.bucket_name).remove([filename])
|
||||
|
||||
def bucket_exists(self):
|
||||
|
||||
@@ -47,5 +47,5 @@ class TencentCosStorage(BaseStorage):
|
||||
def exists(self, filename):
|
||||
return self.client.object_exists(Bucket=self.bucket_name, Key=filename)
|
||||
|
||||
def delete(self, filename: str):
|
||||
def delete(self, filename):
|
||||
self.client.delete_object(Bucket=self.bucket_name, Key=filename)
|
||||
|
||||
@@ -60,7 +60,7 @@ class VolcengineTosStorage(BaseStorage):
|
||||
return False
|
||||
return True
|
||||
|
||||
def delete(self, filename: str):
|
||||
def delete(self, filename):
|
||||
if not self.bucket_name:
|
||||
return
|
||||
self.client.delete_object(bucket=self.bucket_name, key=filename)
|
||||
|
||||
@@ -176,7 +176,6 @@ dev = [
|
||||
"sseclient-py>=1.8.0",
|
||||
"pytest-timeout>=2.4.0",
|
||||
"pytest-xdist>=3.8.0",
|
||||
"pyrefly>=0.54.0",
|
||||
]
|
||||
|
||||
############################################################
|
||||
|
||||
@@ -8,6 +8,7 @@ new GraphEngine command channel mechanism.
|
||||
from core.app.apps.base_app_queue_manager import AppQueueManager
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.workflow.graph_engine.manager import GraphEngineManager
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.model import AppMode
|
||||
|
||||
|
||||
@@ -42,4 +43,4 @@ class AppTaskService:
|
||||
# New mechanism: Send stop command via GraphEngine for workflow-based apps
|
||||
# This ensures proper workflow status recording in the persistence layer
|
||||
if app_mode in (AppMode.ADVANCED_CHAT, AppMode.WORKFLOW):
|
||||
GraphEngineManager.send_stop_command(task_id)
|
||||
GraphEngineManager(redis_client).send_stop_command(task_id)
|
||||
|
||||
@@ -50,26 +50,8 @@ class TestDealDatasetVectorIndexTask:
|
||||
mock_factory.return_value = mock_instance
|
||||
yield mock_factory
|
||||
|
||||
@pytest.fixture
|
||||
def account_and_tenant(self, db_session_with_containers, mock_external_service_dependencies):
|
||||
"""Create an account with an owner tenant for testing.
|
||||
|
||||
Returns a tuple of (account, tenant) where tenant is guaranteed to be non-None.
|
||||
"""
|
||||
fake = Faker()
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
assert tenant is not None
|
||||
return account, tenant
|
||||
|
||||
def test_deal_dataset_vector_index_task_remove_action_success(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test successful removal of dataset vector index.
|
||||
@@ -81,7 +63,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
4. Completes without errors
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset
|
||||
dataset = Dataset(
|
||||
@@ -127,7 +118,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
assert mock_processor.clean.call_count >= 0 # For now, just check it doesn't fail
|
||||
|
||||
def test_deal_dataset_vector_index_task_add_action_success(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test successful addition of dataset vector index.
|
||||
@@ -141,7 +132,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
6. Updates document status to completed
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset
|
||||
dataset = Dataset(
|
||||
@@ -227,7 +227,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
mock_processor.load.assert_called_once()
|
||||
|
||||
def test_deal_dataset_vector_index_task_update_action_success(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test successful update of dataset vector index.
|
||||
@@ -242,7 +242,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
7. Updates document status to completed
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset with parent-child index
|
||||
dataset = Dataset(
|
||||
@@ -329,7 +338,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
mock_processor.load.assert_called_once()
|
||||
|
||||
def test_deal_dataset_vector_index_task_dataset_not_found_error(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test task behavior when dataset is not found.
|
||||
@@ -349,7 +358,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
mock_processor.load.assert_not_called()
|
||||
|
||||
def test_deal_dataset_vector_index_task_add_action_no_documents(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test add action when no documents exist for the dataset.
|
||||
@@ -358,7 +367,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
a dataset exists but has no documents to process.
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset without documents
|
||||
dataset = Dataset(
|
||||
@@ -381,7 +399,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
mock_processor.load.assert_not_called()
|
||||
|
||||
def test_deal_dataset_vector_index_task_add_action_no_segments(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test add action when documents exist but have no segments.
|
||||
@@ -390,7 +408,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
documents exist but contain no segments to process.
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset
|
||||
dataset = Dataset(
|
||||
@@ -437,7 +464,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
mock_processor.load.assert_not_called()
|
||||
|
||||
def test_deal_dataset_vector_index_task_update_action_no_documents(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test update action when no documents exist for the dataset.
|
||||
@@ -446,7 +473,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
a dataset exists but has no documents to process during update.
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset without documents
|
||||
dataset = Dataset(
|
||||
@@ -470,7 +506,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
mock_processor.load.assert_not_called()
|
||||
|
||||
def test_deal_dataset_vector_index_task_add_action_with_exception_handling(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test add action with exception handling during processing.
|
||||
@@ -479,7 +515,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
during document processing and updates document status to error.
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset
|
||||
dataset = Dataset(
|
||||
@@ -566,7 +611,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
assert "Test exception during indexing" in updated_document.error
|
||||
|
||||
def test_deal_dataset_vector_index_task_with_custom_index_type(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test task behavior with custom index type (QA_INDEX).
|
||||
@@ -575,7 +620,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
and initializes the appropriate index processor.
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset with custom index type
|
||||
dataset = Dataset(
|
||||
@@ -642,7 +696,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
mock_processor.load.assert_called_once()
|
||||
|
||||
def test_deal_dataset_vector_index_task_with_default_index_type(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test task behavior with default index type (PARAGRAPH_INDEX).
|
||||
@@ -651,7 +705,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
when dataset.doc_form is None.
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset without doc_form (should use default)
|
||||
dataset = Dataset(
|
||||
@@ -718,7 +781,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
mock_processor.load.assert_called_once()
|
||||
|
||||
def test_deal_dataset_vector_index_task_multiple_documents_processing(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test task processing with multiple documents and segments.
|
||||
@@ -727,7 +790,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
and their segments in sequence.
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset
|
||||
dataset = Dataset(
|
||||
@@ -821,7 +893,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
assert mock_processor.load.call_count == 3
|
||||
|
||||
def test_deal_dataset_vector_index_task_document_status_transitions(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test document status transitions during task execution.
|
||||
@@ -830,7 +902,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
'completed' to 'indexing' and back to 'completed' during processing.
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset
|
||||
dataset = Dataset(
|
||||
@@ -918,7 +999,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
assert updated_document.indexing_status == "completed"
|
||||
|
||||
def test_deal_dataset_vector_index_task_with_disabled_documents(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test task behavior with disabled documents.
|
||||
@@ -927,7 +1008,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
during processing.
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset
|
||||
dataset = Dataset(
|
||||
@@ -1039,7 +1129,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
mock_processor.load.assert_called_once()
|
||||
|
||||
def test_deal_dataset_vector_index_task_with_archived_documents(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test task behavior with archived documents.
|
||||
@@ -1048,7 +1138,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
during processing.
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset
|
||||
dataset = Dataset(
|
||||
@@ -1160,7 +1259,7 @@ class TestDealDatasetVectorIndexTask:
|
||||
mock_processor.load.assert_called_once()
|
||||
|
||||
def test_deal_dataset_vector_index_task_with_incomplete_documents(
|
||||
self, db_session_with_containers, mock_index_processor_factory, account_and_tenant
|
||||
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test task behavior with documents that have incomplete indexing status.
|
||||
@@ -1169,7 +1268,16 @@ class TestDealDatasetVectorIndexTask:
|
||||
incomplete indexing status during processing.
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = account_and_tenant
|
||||
|
||||
# Create test data
|
||||
account = AccountService.create_account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
password=fake.password(length=12),
|
||||
)
|
||||
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
|
||||
tenant = account.current_tenant
|
||||
|
||||
# Create dataset
|
||||
dataset = Dataset(
|
||||
|
||||
@@ -596,7 +596,8 @@ class TestWorkflowTaskStopApiPost:
|
||||
|
||||
assert result == {"result": "success"}
|
||||
mock_queue_mgr.set_stop_flag_no_user_check.assert_called_once_with("task-1")
|
||||
mock_graph_mgr.send_stop_command.assert_called_once_with("task-1")
|
||||
mock_graph_mgr.assert_called_once()
|
||||
mock_graph_mgr.return_value.send_stop_command.assert_called_once_with("task-1")
|
||||
|
||||
def test_stop_workflow_task_wrong_app_mode(self, app):
|
||||
"""Test NotWorkflowAppError when app mode is not workflow."""
|
||||
|
||||
@@ -32,25 +32,26 @@ class TestRedisStopIntegration:
|
||||
mock_redis.pipeline.return_value.__enter__ = Mock(return_value=mock_pipeline)
|
||||
mock_redis.pipeline.return_value.__exit__ = Mock(return_value=None)
|
||||
|
||||
with patch("core.workflow.graph_engine.manager.redis_client", mock_redis):
|
||||
# Execute
|
||||
GraphEngineManager.send_stop_command(task_id, reason="Test stop")
|
||||
manager = GraphEngineManager(mock_redis)
|
||||
|
||||
# Verify
|
||||
mock_redis.pipeline.assert_called_once()
|
||||
# Execute
|
||||
manager.send_stop_command(task_id, reason="Test stop")
|
||||
|
||||
# Check that rpush was called with correct arguments
|
||||
calls = mock_pipeline.rpush.call_args_list
|
||||
assert len(calls) == 1
|
||||
# Verify
|
||||
mock_redis.pipeline.assert_called_once()
|
||||
|
||||
# Verify the channel key
|
||||
assert calls[0][0][0] == expected_channel_key
|
||||
# Check that rpush was called with correct arguments
|
||||
calls = mock_pipeline.rpush.call_args_list
|
||||
assert len(calls) == 1
|
||||
|
||||
# Verify the command data
|
||||
command_json = calls[0][0][1]
|
||||
command_data = json.loads(command_json)
|
||||
assert command_data["command_type"] == CommandType.ABORT
|
||||
assert command_data["reason"] == "Test stop"
|
||||
# Verify the channel key
|
||||
assert calls[0][0][0] == expected_channel_key
|
||||
|
||||
# Verify the command data
|
||||
command_json = calls[0][0][1]
|
||||
command_data = json.loads(command_json)
|
||||
assert command_data["command_type"] == CommandType.ABORT
|
||||
assert command_data["reason"] == "Test stop"
|
||||
|
||||
def test_graph_engine_manager_sends_pause_command(self):
|
||||
"""Test that GraphEngineManager correctly sends pause command through Redis."""
|
||||
@@ -62,18 +63,18 @@ class TestRedisStopIntegration:
|
||||
mock_redis.pipeline.return_value.__enter__ = Mock(return_value=mock_pipeline)
|
||||
mock_redis.pipeline.return_value.__exit__ = Mock(return_value=None)
|
||||
|
||||
with patch("core.workflow.graph_engine.manager.redis_client", mock_redis):
|
||||
GraphEngineManager.send_pause_command(task_id, reason="Awaiting resources")
|
||||
manager = GraphEngineManager(mock_redis)
|
||||
manager.send_pause_command(task_id, reason="Awaiting resources")
|
||||
|
||||
mock_redis.pipeline.assert_called_once()
|
||||
calls = mock_pipeline.rpush.call_args_list
|
||||
assert len(calls) == 1
|
||||
assert calls[0][0][0] == expected_channel_key
|
||||
mock_redis.pipeline.assert_called_once()
|
||||
calls = mock_pipeline.rpush.call_args_list
|
||||
assert len(calls) == 1
|
||||
assert calls[0][0][0] == expected_channel_key
|
||||
|
||||
command_json = calls[0][0][1]
|
||||
command_data = json.loads(command_json)
|
||||
assert command_data["command_type"] == CommandType.PAUSE.value
|
||||
assert command_data["reason"] == "Awaiting resources"
|
||||
command_json = calls[0][0][1]
|
||||
command_data = json.loads(command_json)
|
||||
assert command_data["command_type"] == CommandType.PAUSE.value
|
||||
assert command_data["reason"] == "Awaiting resources"
|
||||
|
||||
def test_graph_engine_manager_handles_redis_failure_gracefully(self):
|
||||
"""Test that GraphEngineManager handles Redis failures without raising exceptions."""
|
||||
@@ -82,13 +83,13 @@ class TestRedisStopIntegration:
|
||||
# Mock redis client to raise exception
|
||||
mock_redis = MagicMock()
|
||||
mock_redis.pipeline.side_effect = redis.ConnectionError("Redis connection failed")
|
||||
manager = GraphEngineManager(mock_redis)
|
||||
|
||||
with patch("core.workflow.graph_engine.manager.redis_client", mock_redis):
|
||||
# Should not raise exception
|
||||
try:
|
||||
GraphEngineManager.send_stop_command(task_id)
|
||||
except Exception as e:
|
||||
pytest.fail(f"GraphEngineManager.send_stop_command raised {e} unexpectedly")
|
||||
# Should not raise exception
|
||||
try:
|
||||
manager.send_stop_command(task_id)
|
||||
except Exception as e:
|
||||
pytest.fail(f"GraphEngineManager.send_stop_command raised {e} unexpectedly")
|
||||
|
||||
def test_app_queue_manager_no_user_check(self):
|
||||
"""Test that AppQueueManager.set_stop_flag_no_user_check works without user validation."""
|
||||
@@ -251,13 +252,10 @@ class TestRedisStopIntegration:
|
||||
mock_redis.pipeline.return_value.__enter__ = Mock(return_value=mock_pipeline)
|
||||
mock_redis.pipeline.return_value.__exit__ = Mock(return_value=None)
|
||||
|
||||
with (
|
||||
patch("core.app.apps.base_app_queue_manager.redis_client", mock_redis),
|
||||
patch("core.workflow.graph_engine.manager.redis_client", mock_redis),
|
||||
):
|
||||
with patch("core.app.apps.base_app_queue_manager.redis_client", mock_redis):
|
||||
# Execute both stop mechanisms
|
||||
AppQueueManager.set_stop_flag_no_user_check(task_id)
|
||||
GraphEngineManager.send_stop_command(task_id)
|
||||
GraphEngineManager(mock_redis).send_stop_command(task_id)
|
||||
|
||||
# Verify legacy stop flag was set
|
||||
expected_stop_flag_key = f"generate_task_stopped:{task_id}"
|
||||
|
||||
@@ -44,9 +44,10 @@ class TestAppTaskService:
|
||||
# Assert
|
||||
mock_app_queue_manager.set_stop_flag.assert_called_once_with(task_id, invoke_from, user_id)
|
||||
if should_call_graph_engine:
|
||||
mock_graph_engine_manager.send_stop_command.assert_called_once_with(task_id)
|
||||
mock_graph_engine_manager.assert_called_once()
|
||||
mock_graph_engine_manager.return_value.send_stop_command.assert_called_once_with(task_id)
|
||||
else:
|
||||
mock_graph_engine_manager.send_stop_command.assert_not_called()
|
||||
mock_graph_engine_manager.assert_not_called()
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"invoke_from",
|
||||
@@ -76,7 +77,8 @@ class TestAppTaskService:
|
||||
|
||||
# Assert
|
||||
mock_app_queue_manager.set_stop_flag.assert_called_once_with(task_id, invoke_from, user_id)
|
||||
mock_graph_engine_manager.send_stop_command.assert_called_once_with(task_id)
|
||||
mock_graph_engine_manager.assert_called_once()
|
||||
mock_graph_engine_manager.return_value.send_stop_command.assert_called_once_with(task_id)
|
||||
|
||||
@patch("services.app_task_service.GraphEngineManager")
|
||||
@patch("services.app_task_service.AppQueueManager")
|
||||
@@ -96,7 +98,7 @@ class TestAppTaskService:
|
||||
app_mode = AppMode.ADVANCED_CHAT
|
||||
|
||||
# Simulate GraphEngine failure
|
||||
mock_graph_engine_manager.send_stop_command.side_effect = Exception("GraphEngine error")
|
||||
mock_graph_engine_manager.return_value.send_stop_command.side_effect = Exception("GraphEngine error")
|
||||
|
||||
# Act & Assert - should raise the exception since it's not caught
|
||||
with pytest.raises(Exception, match="GraphEngine error"):
|
||||
|
||||
@@ -143,8 +143,234 @@ def mock_upload_file():
|
||||
# ============================================================================
|
||||
# Test Basic Cleanup
|
||||
# ============================================================================
|
||||
# Note: Basic cleanup behavior is now covered by testcontainers-based
|
||||
# integration tests; no unit tests remain in this section.
|
||||
|
||||
|
||||
class TestBasicCleanup:
|
||||
"""Test cases for basic dataset cleanup functionality."""
|
||||
|
||||
def test_clean_dataset_task_empty_dataset(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
):
|
||||
"""
|
||||
Test cleanup of an empty dataset with no documents or segments.
|
||||
|
||||
Scenario:
|
||||
- Dataset has no documents or segments
|
||||
- Should still clean vector database and delete related records
|
||||
|
||||
Expected behavior:
|
||||
- IndexProcessorFactory is called to clean vector database
|
||||
- No storage deletions occur
|
||||
- Related records (DatasetProcessRule, etc.) are deleted
|
||||
- Session is committed and closed
|
||||
"""
|
||||
# Arrange
|
||||
mock_db_session.session.scalars.return_value.all.return_value = []
|
||||
|
||||
# Act
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Assert
|
||||
mock_index_processor_factory["factory"].assert_called_once_with("paragraph_index")
|
||||
mock_index_processor_factory["processor"].clean.assert_called_once()
|
||||
mock_storage.delete.assert_not_called()
|
||||
mock_db_session.session.commit.assert_called_once()
|
||||
mock_db_session.session.close.assert_called_once()
|
||||
|
||||
def test_clean_dataset_task_with_documents_and_segments(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
mock_document,
|
||||
mock_segment,
|
||||
):
|
||||
"""
|
||||
Test cleanup of dataset with documents and segments.
|
||||
|
||||
Scenario:
|
||||
- Dataset has one document and one segment
|
||||
- No image files in segment content
|
||||
|
||||
Expected behavior:
|
||||
- Documents and segments are deleted
|
||||
- Vector database is cleaned
|
||||
- Session is committed
|
||||
"""
|
||||
# Arrange
|
||||
mock_db_session.session.scalars.return_value.all.side_effect = [
|
||||
[mock_document], # documents
|
||||
[mock_segment], # segments
|
||||
]
|
||||
mock_get_image_upload_file_ids.return_value = []
|
||||
|
||||
# Act
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Assert
|
||||
mock_db_session.session.delete.assert_any_call(mock_document)
|
||||
# Segments are deleted in batch; verify a DELETE on document_segments was issued
|
||||
execute_sqls = [" ".join(str(c[0][0]).split()) for c in mock_db_session.session.execute.call_args_list]
|
||||
assert any("DELETE FROM document_segments" in sql for sql in execute_sqls)
|
||||
mock_db_session.session.commit.assert_called_once()
|
||||
|
||||
def test_clean_dataset_task_deletes_related_records(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
):
|
||||
"""
|
||||
Test that all related records are deleted.
|
||||
|
||||
Expected behavior:
|
||||
- DatasetProcessRule records are deleted
|
||||
- DatasetQuery records are deleted
|
||||
- AppDatasetJoin records are deleted
|
||||
- DatasetMetadata records are deleted
|
||||
- DatasetMetadataBinding records are deleted
|
||||
"""
|
||||
# Arrange
|
||||
mock_query = mock_db_session.session.query.return_value
|
||||
mock_query.where.return_value = mock_query
|
||||
mock_query.delete.return_value = 1
|
||||
|
||||
# Act
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Assert - verify query.where.delete was called multiple times
|
||||
# for different models (DatasetProcessRule, DatasetQuery, etc.)
|
||||
assert mock_query.delete.call_count >= 5
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test Doc Form Validation
|
||||
# ============================================================================
|
||||
|
||||
|
||||
class TestDocFormValidation:
|
||||
"""Test cases for doc_form validation and default fallback."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"invalid_doc_form",
|
||||
[
|
||||
None,
|
||||
"",
|
||||
" ",
|
||||
"\t",
|
||||
"\n",
|
||||
" \t\n ",
|
||||
],
|
||||
)
|
||||
def test_clean_dataset_task_invalid_doc_form_uses_default(
|
||||
self,
|
||||
invalid_doc_form,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
):
|
||||
"""
|
||||
Test that invalid doc_form values use default paragraph index type.
|
||||
|
||||
Scenario:
|
||||
- doc_form is None, empty, or whitespace-only
|
||||
- Should use default IndexStructureType.PARAGRAPH_INDEX
|
||||
|
||||
Expected behavior:
|
||||
- Default index type is used for cleanup
|
||||
- No errors are raised
|
||||
- Cleanup proceeds normally
|
||||
"""
|
||||
# Arrange - import to verify the default value
|
||||
from core.rag.index_processor.constant.index_type import IndexStructureType
|
||||
|
||||
# Act
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form=invalid_doc_form,
|
||||
)
|
||||
|
||||
# Assert - IndexProcessorFactory should be called with default type
|
||||
mock_index_processor_factory["factory"].assert_called_once_with(IndexStructureType.PARAGRAPH_INDEX)
|
||||
mock_index_processor_factory["processor"].clean.assert_called_once()
|
||||
|
||||
def test_clean_dataset_task_valid_doc_form_used_directly(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
):
|
||||
"""
|
||||
Test that valid doc_form values are used directly.
|
||||
|
||||
Expected behavior:
|
||||
- Provided doc_form is passed to IndexProcessorFactory
|
||||
"""
|
||||
# Arrange
|
||||
valid_doc_form = "qa_index"
|
||||
|
||||
# Act
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form=valid_doc_form,
|
||||
)
|
||||
|
||||
# Assert
|
||||
mock_index_processor_factory["factory"].assert_called_once_with(valid_doc_form)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test Error Handling
|
||||
# ============================================================================
|
||||
@@ -153,6 +379,156 @@ def mock_upload_file():
|
||||
class TestErrorHandling:
|
||||
"""Test cases for error handling and recovery."""
|
||||
|
||||
def test_clean_dataset_task_vector_cleanup_failure_continues(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
mock_document,
|
||||
mock_segment,
|
||||
):
|
||||
"""
|
||||
Test that document cleanup continues even if vector cleanup fails.
|
||||
|
||||
Scenario:
|
||||
- IndexProcessor.clean() raises an exception
|
||||
- Document and segment deletion should still proceed
|
||||
|
||||
Expected behavior:
|
||||
- Exception is caught and logged
|
||||
- Documents and segments are still deleted
|
||||
- Session is committed
|
||||
"""
|
||||
# Arrange
|
||||
mock_db_session.session.scalars.return_value.all.side_effect = [
|
||||
[mock_document], # documents
|
||||
[mock_segment], # segments
|
||||
]
|
||||
mock_index_processor_factory["processor"].clean.side_effect = Exception("Vector database error")
|
||||
|
||||
# Act
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Assert - documents and segments should still be deleted
|
||||
mock_db_session.session.delete.assert_any_call(mock_document)
|
||||
# Segments are deleted in batch; verify a DELETE on document_segments was issued
|
||||
execute_sqls = [" ".join(str(c[0][0]).split()) for c in mock_db_session.session.execute.call_args_list]
|
||||
assert any("DELETE FROM document_segments" in sql for sql in execute_sqls)
|
||||
mock_db_session.session.commit.assert_called_once()
|
||||
|
||||
def test_clean_dataset_task_storage_delete_failure_continues(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
):
|
||||
"""
|
||||
Test that cleanup continues even if storage deletion fails.
|
||||
|
||||
Scenario:
|
||||
- Segment contains image file references
|
||||
- Storage.delete() raises an exception
|
||||
- Cleanup should continue
|
||||
|
||||
Expected behavior:
|
||||
- Exception is caught and logged
|
||||
- Image file record is still deleted from database
|
||||
- Other cleanup operations proceed
|
||||
"""
|
||||
# Arrange
|
||||
# Need at least one document for segment processing to occur (code is in else block)
|
||||
mock_document = MagicMock()
|
||||
mock_document.id = str(uuid.uuid4())
|
||||
mock_document.tenant_id = tenant_id
|
||||
mock_document.data_source_type = "website" # Non-upload type to avoid file deletion
|
||||
|
||||
mock_segment = MagicMock()
|
||||
mock_segment.id = str(uuid.uuid4())
|
||||
mock_segment.content = "Test content with image"
|
||||
|
||||
mock_upload_file = MagicMock()
|
||||
mock_upload_file.id = str(uuid.uuid4())
|
||||
mock_upload_file.key = "images/test-image.jpg"
|
||||
|
||||
image_file_id = mock_upload_file.id
|
||||
|
||||
mock_db_session.session.scalars.return_value.all.side_effect = [
|
||||
[mock_document], # documents - need at least one for segment processing
|
||||
[mock_segment], # segments
|
||||
]
|
||||
mock_get_image_upload_file_ids.return_value = [image_file_id]
|
||||
mock_db_session.session.query.return_value.where.return_value.all.return_value = [mock_upload_file]
|
||||
mock_storage.delete.side_effect = Exception("Storage service unavailable")
|
||||
|
||||
# Act
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Assert - storage delete was attempted for image file
|
||||
mock_storage.delete.assert_called_with(mock_upload_file.key)
|
||||
# Upload files are deleted in batch; verify a DELETE on upload_files was issued
|
||||
execute_sqls = [" ".join(str(c[0][0]).split()) for c in mock_db_session.session.execute.call_args_list]
|
||||
assert any("DELETE FROM upload_files" in sql for sql in execute_sqls)
|
||||
|
||||
def test_clean_dataset_task_database_error_rollback(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
):
|
||||
"""
|
||||
Test that database session is rolled back on error.
|
||||
|
||||
Scenario:
|
||||
- Database operation raises an exception
|
||||
- Session should be rolled back to prevent dirty state
|
||||
|
||||
Expected behavior:
|
||||
- Session.rollback() is called
|
||||
- Session.close() is called in finally block
|
||||
"""
|
||||
# Arrange
|
||||
mock_db_session.session.commit.side_effect = Exception("Database commit failed")
|
||||
|
||||
# Act
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Assert
|
||||
mock_db_session.session.rollback.assert_called_once()
|
||||
mock_db_session.session.close.assert_called_once()
|
||||
|
||||
def test_clean_dataset_task_rollback_failure_still_closes_session(
|
||||
self,
|
||||
dataset_id,
|
||||
@@ -378,6 +754,296 @@ class TestSegmentAttachmentCleanup:
|
||||
assert any("DELETE FROM segment_attachment_bindings" in sql for sql in execute_sqls)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test Upload File Cleanup
|
||||
# ============================================================================
|
||||
|
||||
|
||||
class TestUploadFileCleanup:
|
||||
"""Test cases for upload file cleanup."""
|
||||
|
||||
def test_clean_dataset_task_deletes_document_upload_files(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
):
|
||||
"""
|
||||
Test that document upload files are deleted.
|
||||
|
||||
Scenario:
|
||||
- Document has data_source_type = "upload_file"
|
||||
- data_source_info contains upload_file_id
|
||||
|
||||
Expected behavior:
|
||||
- Upload file is deleted from storage
|
||||
- Upload file record is deleted from database
|
||||
"""
|
||||
# Arrange
|
||||
mock_document = MagicMock()
|
||||
mock_document.id = str(uuid.uuid4())
|
||||
mock_document.tenant_id = tenant_id
|
||||
mock_document.data_source_type = "upload_file"
|
||||
mock_document.data_source_info = '{"upload_file_id": "test-file-id"}'
|
||||
mock_document.data_source_info_dict = {"upload_file_id": "test-file-id"}
|
||||
|
||||
mock_upload_file = MagicMock()
|
||||
mock_upload_file.id = "test-file-id"
|
||||
mock_upload_file.key = "uploads/test-file.txt"
|
||||
|
||||
mock_db_session.session.scalars.return_value.all.side_effect = [
|
||||
[mock_document], # documents
|
||||
[], # segments
|
||||
]
|
||||
mock_db_session.session.query.return_value.where.return_value.all.return_value = [mock_upload_file]
|
||||
|
||||
# Act
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Assert
|
||||
mock_storage.delete.assert_called_with(mock_upload_file.key)
|
||||
# Upload files are deleted in batch; verify a DELETE on upload_files was issued
|
||||
execute_sqls = [" ".join(str(c[0][0]).split()) for c in mock_db_session.session.execute.call_args_list]
|
||||
assert any("DELETE FROM upload_files" in sql for sql in execute_sqls)
|
||||
|
||||
def test_clean_dataset_task_handles_missing_upload_file(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
):
|
||||
"""
|
||||
Test that missing upload files are handled gracefully.
|
||||
|
||||
Scenario:
|
||||
- Document references an upload_file_id that doesn't exist
|
||||
|
||||
Expected behavior:
|
||||
- No error is raised
|
||||
- Cleanup continues normally
|
||||
"""
|
||||
# Arrange
|
||||
mock_document = MagicMock()
|
||||
mock_document.id = str(uuid.uuid4())
|
||||
mock_document.tenant_id = tenant_id
|
||||
mock_document.data_source_type = "upload_file"
|
||||
mock_document.data_source_info = '{"upload_file_id": "nonexistent-file"}'
|
||||
mock_document.data_source_info_dict = {"upload_file_id": "nonexistent-file"}
|
||||
|
||||
mock_db_session.session.scalars.return_value.all.side_effect = [
|
||||
[mock_document], # documents
|
||||
[], # segments
|
||||
]
|
||||
mock_db_session.session.query.return_value.where.return_value.all.return_value = []
|
||||
|
||||
# Act - should not raise exception
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Assert
|
||||
mock_storage.delete.assert_not_called()
|
||||
mock_db_session.session.commit.assert_called_once()
|
||||
|
||||
def test_clean_dataset_task_handles_non_upload_file_data_source(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
):
|
||||
"""
|
||||
Test that non-upload_file data sources are skipped.
|
||||
|
||||
Scenario:
|
||||
- Document has data_source_type = "website"
|
||||
|
||||
Expected behavior:
|
||||
- No file deletion is attempted
|
||||
"""
|
||||
# Arrange
|
||||
mock_document = MagicMock()
|
||||
mock_document.id = str(uuid.uuid4())
|
||||
mock_document.tenant_id = tenant_id
|
||||
mock_document.data_source_type = "website"
|
||||
mock_document.data_source_info = None
|
||||
|
||||
mock_db_session.session.scalars.return_value.all.side_effect = [
|
||||
[mock_document], # documents
|
||||
[], # segments
|
||||
]
|
||||
|
||||
# Act
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Assert - storage delete should not be called for document files
|
||||
# (only for image files in segments, which are empty here)
|
||||
mock_storage.delete.assert_not_called()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test Image File Cleanup
|
||||
# ============================================================================
|
||||
|
||||
|
||||
class TestImageFileCleanup:
|
||||
"""Test cases for image file cleanup in segments."""
|
||||
|
||||
def test_clean_dataset_task_deletes_image_files_in_segments(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
):
|
||||
"""
|
||||
Test that image files referenced in segment content are deleted.
|
||||
|
||||
Scenario:
|
||||
- Segment content contains image file references
|
||||
- get_image_upload_file_ids returns file IDs
|
||||
|
||||
Expected behavior:
|
||||
- Each image file is deleted from storage
|
||||
- Each image file record is deleted from database
|
||||
"""
|
||||
# Arrange
|
||||
# Need at least one document for segment processing to occur (code is in else block)
|
||||
mock_document = MagicMock()
|
||||
mock_document.id = str(uuid.uuid4())
|
||||
mock_document.tenant_id = tenant_id
|
||||
mock_document.data_source_type = "website" # Non-upload type
|
||||
|
||||
mock_segment = MagicMock()
|
||||
mock_segment.id = str(uuid.uuid4())
|
||||
mock_segment.content = '<img src="file://image-1"> <img src="file://image-2">'
|
||||
|
||||
image_file_ids = ["image-1", "image-2"]
|
||||
mock_get_image_upload_file_ids.return_value = image_file_ids
|
||||
|
||||
mock_image_files = []
|
||||
for file_id in image_file_ids:
|
||||
mock_file = MagicMock()
|
||||
mock_file.id = file_id
|
||||
mock_file.key = f"images/{file_id}.jpg"
|
||||
mock_image_files.append(mock_file)
|
||||
|
||||
mock_db_session.session.scalars.return_value.all.side_effect = [
|
||||
[mock_document], # documents - need at least one for segment processing
|
||||
[mock_segment], # segments
|
||||
]
|
||||
|
||||
# Setup a mock query chain that returns files in batch (align with .in_().all())
|
||||
mock_query = MagicMock()
|
||||
mock_where = MagicMock()
|
||||
mock_query.where.return_value = mock_where
|
||||
mock_where.all.return_value = mock_image_files
|
||||
mock_db_session.session.query.return_value = mock_query
|
||||
|
||||
# Act
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Assert - each expected image key was deleted at least once
|
||||
calls = [c.args[0] for c in mock_storage.delete.call_args_list]
|
||||
assert "images/image-1.jpg" in calls
|
||||
assert "images/image-2.jpg" in calls
|
||||
|
||||
def test_clean_dataset_task_handles_missing_image_file(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
):
|
||||
"""
|
||||
Test that missing image files are handled gracefully.
|
||||
|
||||
Scenario:
|
||||
- Segment references image file ID that doesn't exist in database
|
||||
|
||||
Expected behavior:
|
||||
- No error is raised
|
||||
- Cleanup continues
|
||||
"""
|
||||
# Arrange
|
||||
# Need at least one document for segment processing to occur (code is in else block)
|
||||
mock_document = MagicMock()
|
||||
mock_document.id = str(uuid.uuid4())
|
||||
mock_document.tenant_id = tenant_id
|
||||
mock_document.data_source_type = "website" # Non-upload type
|
||||
|
||||
mock_segment = MagicMock()
|
||||
mock_segment.id = str(uuid.uuid4())
|
||||
mock_segment.content = '<img src="file://nonexistent-image">'
|
||||
|
||||
mock_get_image_upload_file_ids.return_value = ["nonexistent-image"]
|
||||
|
||||
mock_db_session.session.scalars.return_value.all.side_effect = [
|
||||
[mock_document], # documents - need at least one for segment processing
|
||||
[mock_segment], # segments
|
||||
]
|
||||
|
||||
# Image file not found
|
||||
mock_db_session.session.query.return_value.where.return_value.all.return_value = []
|
||||
|
||||
# Act - should not raise exception
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Assert
|
||||
mock_storage.delete.assert_not_called()
|
||||
mock_db_session.session.commit.assert_called_once()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test Edge Cases
|
||||
# ============================================================================
|
||||
@@ -386,6 +1052,114 @@ class TestSegmentAttachmentCleanup:
|
||||
class TestEdgeCases:
|
||||
"""Test edge cases and boundary conditions."""
|
||||
|
||||
def test_clean_dataset_task_multiple_documents_and_segments(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
):
|
||||
"""
|
||||
Test cleanup of multiple documents and segments.
|
||||
|
||||
Scenario:
|
||||
- Dataset has 5 documents and 10 segments
|
||||
|
||||
Expected behavior:
|
||||
- All documents and segments are deleted
|
||||
"""
|
||||
# Arrange
|
||||
mock_documents = []
|
||||
for i in range(5):
|
||||
doc = MagicMock()
|
||||
doc.id = str(uuid.uuid4())
|
||||
doc.tenant_id = tenant_id
|
||||
doc.data_source_type = "website" # Non-upload type
|
||||
mock_documents.append(doc)
|
||||
|
||||
mock_segments = []
|
||||
for i in range(10):
|
||||
seg = MagicMock()
|
||||
seg.id = str(uuid.uuid4())
|
||||
seg.content = f"Segment content {i}"
|
||||
mock_segments.append(seg)
|
||||
|
||||
mock_db_session.session.scalars.return_value.all.side_effect = [
|
||||
mock_documents,
|
||||
mock_segments,
|
||||
]
|
||||
mock_get_image_upload_file_ids.return_value = []
|
||||
|
||||
# Act
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Assert - all documents and segments should be deleted (documents per-entity, segments in batch)
|
||||
delete_calls = mock_db_session.session.delete.call_args_list
|
||||
deleted_items = [call[0][0] for call in delete_calls]
|
||||
|
||||
for doc in mock_documents:
|
||||
assert doc in deleted_items
|
||||
# Verify a batch DELETE on document_segments occurred
|
||||
execute_sqls = [" ".join(str(c[0][0]).split()) for c in mock_db_session.session.execute.call_args_list]
|
||||
assert any("DELETE FROM document_segments" in sql for sql in execute_sqls)
|
||||
|
||||
def test_clean_dataset_task_document_with_empty_data_source_info(
|
||||
self,
|
||||
dataset_id,
|
||||
tenant_id,
|
||||
collection_binding_id,
|
||||
mock_db_session,
|
||||
mock_storage,
|
||||
mock_index_processor_factory,
|
||||
mock_get_image_upload_file_ids,
|
||||
):
|
||||
"""
|
||||
Test handling of document with empty data_source_info.
|
||||
|
||||
Scenario:
|
||||
- Document has data_source_type = "upload_file"
|
||||
- data_source_info is None or empty
|
||||
|
||||
Expected behavior:
|
||||
- No error is raised
|
||||
- File deletion is skipped
|
||||
"""
|
||||
# Arrange
|
||||
mock_document = MagicMock()
|
||||
mock_document.id = str(uuid.uuid4())
|
||||
mock_document.tenant_id = tenant_id
|
||||
mock_document.data_source_type = "upload_file"
|
||||
mock_document.data_source_info = None
|
||||
|
||||
mock_db_session.session.scalars.return_value.all.side_effect = [
|
||||
[mock_document], # documents
|
||||
[], # segments
|
||||
]
|
||||
|
||||
# Act - should not raise exception
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=collection_binding_id,
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Assert
|
||||
mock_storage.delete.assert_not_called()
|
||||
mock_db_session.session.commit.assert_called_once()
|
||||
|
||||
def test_clean_dataset_task_session_always_closed(
|
||||
self,
|
||||
dataset_id,
|
||||
|
||||
18
api/uv.lock
generated
18
api/uv.lock
generated
@@ -1471,7 +1471,6 @@ dev = [
|
||||
{ name = "lxml-stubs" },
|
||||
{ name = "mypy" },
|
||||
{ name = "pandas-stubs" },
|
||||
{ name = "pyrefly" },
|
||||
{ name = "pytest" },
|
||||
{ name = "pytest-benchmark" },
|
||||
{ name = "pytest-cov" },
|
||||
@@ -1672,7 +1671,6 @@ dev = [
|
||||
{ name = "lxml-stubs", specifier = "~=0.5.1" },
|
||||
{ name = "mypy", specifier = "~=1.17.1" },
|
||||
{ name = "pandas-stubs", specifier = "~=2.2.3" },
|
||||
{ name = "pyrefly", specifier = ">=0.54.0" },
|
||||
{ name = "pytest", specifier = "~=8.3.2" },
|
||||
{ name = "pytest-benchmark", specifier = "~=4.0.0" },
|
||||
{ name = "pytest-cov", specifier = "~=4.1.0" },
|
||||
@@ -5109,22 +5107,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/5a/dc/491b7661614ab97483abf2056be1deee4dc2490ecbf7bff9ab5cdbac86e1/pyreadline3-3.5.4-py3-none-any.whl", hash = "sha256:eaf8e6cc3c49bcccf145fc6067ba8643d1df34d604a1ec0eccbf7a18e6d3fae6", size = 83178, upload-time = "2024-09-19T02:40:08.598Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pyrefly"
|
||||
version = "0.54.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/81/44/c10b16a302fda90d0af1328f880b232761b510eab546616a7be2fdf35a57/pyrefly-0.54.0.tar.gz", hash = "sha256:c6663be64d492f0d2f2a411ada9f28a6792163d34133639378b7f3dd9a8dca94", size = 5098893, upload-time = "2026-02-23T15:44:35.111Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/5f/99/8fdcdb4e55f0227fdd9f6abce36b619bab1ecb0662b83b66adc8cba3c788/pyrefly-0.54.0-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:58a3f092b6dc25ef79b2dc6c69a40f36784ca157c312bfc0baea463926a9db6d", size = 12223973, upload-time = "2026-02-23T15:44:14.278Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/90/35/c2aaf87a76003ad27b286594d2e5178f811eaa15bfe3d98dba2b47d56dd1/pyrefly-0.54.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:615081414106dd95873bc39c3a4bed68754c6cc24a8177ac51d22f88f88d3eb3", size = 11785585, upload-time = "2026-02-23T15:44:17.468Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/c4/4a/ced02691ed67e5a897714979196f08ad279ec7ec7f63c45e00a75a7f3c0e/pyrefly-0.54.0-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0cbcaf20f5fe585079079a95205c1f3cd4542d17228cdf1df560288880623b70", size = 33381977, upload-time = "2026-02-23T15:44:19.736Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/0b/ce/72a117ed437c8f6950862181014b41e36f3c3997580e29b772b71e78d587/pyrefly-0.54.0-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:66d5da116c0d34acfbd66663addd3ca8aa78a636f6692a66e078126d3620a883", size = 35962821, upload-time = "2026-02-23T15:44:22.357Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/85/de/89013f5ae0a35d2b6b01274a92a35ee91431ea001050edf0a16748d39875/pyrefly-0.54.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ef3ac27f1a4baaf67aead64287d3163350844794aca6315ad1a9650b16ec26a", size = 38496689, upload-time = "2026-02-23T15:44:25.236Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/9f/9a/33b097c7bf498b924742dca32dd5d9c6a3fa6c2b52b63a58eb9e1980ca89/pyrefly-0.54.0-py3-none-win32.whl", hash = "sha256:7d607d72200a8afbd2db10bfefb40160a7a5d709d207161c21649cedd5cfc09a", size = 11295268, upload-time = "2026-02-23T15:44:27.551Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/d4/21/9263fd1144d2a3d7342b474f183f7785b3358a1565c864089b780110b933/pyrefly-0.54.0-py3-none-win_amd64.whl", hash = "sha256:fd416f04f89309385696f685bd5c9141011f18c8072f84d31ca20c748546e791", size = 12081810, upload-time = "2026-02-23T15:44:29.461Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/ea/5b/fad062a196c064cbc8564de5b2f4d3cb6315f852e3b31e8a1ce74c69a1ea/pyrefly-0.54.0-py3-none-win_arm64.whl", hash = "sha256:f06ab371356c7b1925e0bffe193b738797e71e5dbbff7fb5a13f90ee7521211d", size = 11564930, upload-time = "2026-02-23T15:44:33.053Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pytest"
|
||||
version = "8.3.5"
|
||||
|
||||
Reference in New Issue
Block a user