mirror of
https://github.com/langgenius/dify.git
synced 2026-01-15 03:09:55 +00:00
Compare commits
8 Commits
refactor/p
...
refactor-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eb13f5d19b | ||
|
|
10c93c8be8 | ||
|
|
a58f24fbba | ||
|
|
375fd40834 | ||
|
|
058da44e5a | ||
|
|
49dc540b9e | ||
|
|
6e6922c4ac | ||
|
|
c385283356 |
@@ -3,6 +3,7 @@ import datetime
|
||||
import json
|
||||
import logging
|
||||
import secrets
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import click
|
||||
@@ -46,6 +47,8 @@ from services.clear_free_plan_tenant_expired_logs import ClearFreePlanTenantExpi
|
||||
from services.plugin.data_migration import PluginDataMigration
|
||||
from services.plugin.plugin_migration import PluginMigration
|
||||
from services.plugin.plugin_service import PluginService
|
||||
from services.retention.conversation.messages_clean_policy import create_message_clean_policy
|
||||
from services.retention.conversation.messages_clean_service import MessagesCleanService
|
||||
from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup
|
||||
from tasks.remove_app_and_related_data_task import delete_draft_variables_batch
|
||||
|
||||
@@ -2172,3 +2175,79 @@ def migrate_oss(
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
click.echo(click.style(f"Failed to update DB storage_type: {str(e)}", fg="red"))
|
||||
|
||||
|
||||
@click.command("clean-expired-messages", help="Clean expired messages.")
|
||||
@click.option(
|
||||
"--start-from",
|
||||
type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]),
|
||||
required=True,
|
||||
help="Lower bound (inclusive) for created_at.",
|
||||
)
|
||||
@click.option(
|
||||
"--end-before",
|
||||
type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]),
|
||||
required=True,
|
||||
help="Upper bound (exclusive) for created_at.",
|
||||
)
|
||||
@click.option("--batch-size", default=1000, show_default=True, help="Batch size for selecting messages.")
|
||||
@click.option(
|
||||
"--graceful-period",
|
||||
default=21,
|
||||
show_default=True,
|
||||
help="Graceful period in days after subscription expiration, will be ignored when billing is disabled.",
|
||||
)
|
||||
@click.option("--dry-run", is_flag=True, default=False, help="Show messages logs would be cleaned without deleting")
|
||||
def clean_expired_messages(
|
||||
batch_size: int,
|
||||
graceful_period: int,
|
||||
start_from: datetime.datetime,
|
||||
end_before: datetime.datetime,
|
||||
dry_run: bool,
|
||||
):
|
||||
"""
|
||||
Clean expired messages and related data for tenants based on clean policy.
|
||||
"""
|
||||
click.echo(click.style("clean_messages: start clean messages.", fg="green"))
|
||||
|
||||
start_at = time.perf_counter()
|
||||
|
||||
try:
|
||||
# Create policy based on billing configuration
|
||||
# NOTE: graceful_period will be ignored when billing is disabled.
|
||||
policy = create_message_clean_policy(graceful_period_days=graceful_period)
|
||||
|
||||
# Create and run the cleanup service
|
||||
service = MessagesCleanService.from_time_range(
|
||||
policy=policy,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
batch_size=batch_size,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
stats = service.run()
|
||||
|
||||
end_at = time.perf_counter()
|
||||
click.echo(
|
||||
click.style(
|
||||
f"clean_messages: completed successfully\n"
|
||||
f" - Latency: {end_at - start_at:.2f}s\n"
|
||||
f" - Batches processed: {stats['batches']}\n"
|
||||
f" - Total messages scanned: {stats['total_messages']}\n"
|
||||
f" - Messages filtered: {stats['filtered_messages']}\n"
|
||||
f" - Messages deleted: {stats['total_deleted']}",
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
end_at = time.perf_counter()
|
||||
logger.exception("clean_messages failed")
|
||||
click.echo(
|
||||
click.style(
|
||||
f"clean_messages: failed after {end_at - start_at:.2f}s - {str(e)}",
|
||||
fg="red",
|
||||
)
|
||||
)
|
||||
raise
|
||||
|
||||
click.echo(click.style("messages cleanup completed.", fg="green"))
|
||||
|
||||
@@ -4,6 +4,7 @@ from dify_app import DifyApp
|
||||
def init_app(app: DifyApp):
|
||||
from commands import (
|
||||
add_qdrant_index,
|
||||
clean_expired_messages,
|
||||
clean_workflow_runs,
|
||||
cleanup_orphaned_draft_variables,
|
||||
clear_free_plan_tenant_expired_logs,
|
||||
@@ -58,6 +59,7 @@ def init_app(app: DifyApp):
|
||||
transform_datasource_credentials,
|
||||
install_rag_pipeline_plugins,
|
||||
clean_workflow_runs,
|
||||
clean_expired_messages,
|
||||
]
|
||||
for cmd in cmds_to_register:
|
||||
app.cli.add_command(cmd)
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
"""feat: add created_at id index to messages
|
||||
|
||||
Revision ID: 3334862ee907
|
||||
Revises: 905527cc8fd3
|
||||
Create Date: 2026-01-12 17:29:44.846544
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import models as models
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '3334862ee907'
|
||||
down_revision = '905527cc8fd3'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('messages', schema=None) as batch_op:
|
||||
batch_op.create_index('message_created_at_id_idx', ['created_at', 'id'], unique=False)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('messages', schema=None) as batch_op:
|
||||
batch_op.drop_index('message_created_at_id_idx')
|
||||
|
||||
# ### end Alembic commands ###
|
||||
@@ -968,6 +968,7 @@ class Message(Base):
|
||||
Index("message_workflow_run_id_idx", "conversation_id", "workflow_run_id"),
|
||||
Index("message_created_at_idx", "created_at"),
|
||||
Index("message_app_mode_idx", "app_mode"),
|
||||
Index("message_created_at_id_idx", "created_at", "id"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuid4()))
|
||||
|
||||
@@ -1,90 +1,62 @@
|
||||
import datetime
|
||||
import logging
|
||||
import time
|
||||
|
||||
import click
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
import app
|
||||
from configs import dify_config
|
||||
from enums.cloud_plan import CloudPlan
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.model import (
|
||||
App,
|
||||
Message,
|
||||
MessageAgentThought,
|
||||
MessageAnnotation,
|
||||
MessageChain,
|
||||
MessageFeedback,
|
||||
MessageFile,
|
||||
)
|
||||
from models.web import SavedMessage
|
||||
from services.feature_service import FeatureService
|
||||
from services.retention.conversation.messages_clean_policy import create_message_clean_policy
|
||||
from services.retention.conversation.messages_clean_service import MessagesCleanService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@app.celery.task(queue="dataset")
|
||||
@app.celery.task(queue="retention")
|
||||
def clean_messages():
|
||||
click.echo(click.style("Start clean messages.", fg="green"))
|
||||
start_at = time.perf_counter()
|
||||
plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
|
||||
days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
# Main query with join and filter
|
||||
messages = (
|
||||
db.session.query(Message)
|
||||
.where(Message.created_at < plan_sandbox_clean_message_day)
|
||||
.order_by(Message.created_at.desc())
|
||||
.limit(100)
|
||||
.all()
|
||||
)
|
||||
"""
|
||||
Clean expired messages based on clean policy.
|
||||
|
||||
except SQLAlchemyError:
|
||||
raise
|
||||
if not messages:
|
||||
break
|
||||
for message in messages:
|
||||
app = db.session.query(App).filter_by(id=message.app_id).first()
|
||||
if not app:
|
||||
logger.warning(
|
||||
"Expected App record to exist, but none was found, app_id=%s, message_id=%s",
|
||||
message.app_id,
|
||||
message.id,
|
||||
)
|
||||
continue
|
||||
features_cache_key = f"features:{app.tenant_id}"
|
||||
plan_cache = redis_client.get(features_cache_key)
|
||||
if plan_cache is None:
|
||||
features = FeatureService.get_features(app.tenant_id)
|
||||
redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
|
||||
plan = features.billing.subscription.plan
|
||||
else:
|
||||
plan = plan_cache.decode()
|
||||
if plan == CloudPlan.SANDBOX:
|
||||
# clean related message
|
||||
db.session.query(MessageFeedback).where(MessageFeedback.message_id == message.id).delete(
|
||||
synchronize_session=False
|
||||
)
|
||||
db.session.query(MessageAnnotation).where(MessageAnnotation.message_id == message.id).delete(
|
||||
synchronize_session=False
|
||||
)
|
||||
db.session.query(MessageChain).where(MessageChain.message_id == message.id).delete(
|
||||
synchronize_session=False
|
||||
)
|
||||
db.session.query(MessageAgentThought).where(MessageAgentThought.message_id == message.id).delete(
|
||||
synchronize_session=False
|
||||
)
|
||||
db.session.query(MessageFile).where(MessageFile.message_id == message.id).delete(
|
||||
synchronize_session=False
|
||||
)
|
||||
db.session.query(SavedMessage).where(SavedMessage.message_id == message.id).delete(
|
||||
synchronize_session=False
|
||||
)
|
||||
db.session.query(Message).where(Message.id == message.id).delete()
|
||||
db.session.commit()
|
||||
end_at = time.perf_counter()
|
||||
click.echo(click.style(f"Cleaned messages from db success latency: {end_at - start_at}", fg="green"))
|
||||
This task uses MessagesCleanService to efficiently clean messages in batches.
|
||||
The behavior depends on BILLING_ENABLED configuration:
|
||||
- BILLING_ENABLED=True: only delete messages from sandbox tenants (with whitelist/grace period)
|
||||
- BILLING_ENABLED=False: delete all messages within the time range
|
||||
"""
|
||||
click.echo(click.style("clean_messages: start clean messages.", fg="green"))
|
||||
start_at = time.perf_counter()
|
||||
|
||||
try:
|
||||
# Create policy based on billing configuration
|
||||
policy = create_message_clean_policy(
|
||||
graceful_period_days=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD,
|
||||
)
|
||||
|
||||
# Create and run the cleanup service
|
||||
service = MessagesCleanService.from_days(
|
||||
policy=policy,
|
||||
days=dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS,
|
||||
batch_size=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE,
|
||||
)
|
||||
stats = service.run()
|
||||
|
||||
end_at = time.perf_counter()
|
||||
click.echo(
|
||||
click.style(
|
||||
f"clean_messages: completed successfully\n"
|
||||
f" - Latency: {end_at - start_at:.2f}s\n"
|
||||
f" - Batches processed: {stats['batches']}\n"
|
||||
f" - Total messages scanned: {stats['total_messages']}\n"
|
||||
f" - Messages filtered: {stats['filtered_messages']}\n"
|
||||
f" - Messages deleted: {stats['total_deleted']}",
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
end_at = time.perf_counter()
|
||||
logger.exception("clean_messages failed")
|
||||
click.echo(
|
||||
click.style(
|
||||
f"clean_messages: failed after {end_at - start_at:.2f}s - {str(e)}",
|
||||
fg="red",
|
||||
)
|
||||
)
|
||||
raise
|
||||
|
||||
216
api/services/retention/conversation/messages_clean_policy.py
Normal file
216
api/services/retention/conversation/messages_clean_policy.py
Normal file
@@ -0,0 +1,216 @@
|
||||
import datetime
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Callable, Sequence
|
||||
from dataclasses import dataclass
|
||||
|
||||
from configs import dify_config
|
||||
from enums.cloud_plan import CloudPlan
|
||||
from services.billing_service import BillingService, SubscriptionPlan
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SimpleMessage:
|
||||
id: str
|
||||
app_id: str
|
||||
created_at: datetime.datetime
|
||||
|
||||
|
||||
class MessagesCleanPolicy(ABC):
|
||||
"""
|
||||
Abstract base class for message cleanup policies.
|
||||
|
||||
A policy determines which messages from a batch should be deleted.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def filter_message_ids(
|
||||
self,
|
||||
messages: Sequence[SimpleMessage],
|
||||
app_to_tenant: dict[str, str],
|
||||
) -> Sequence[str]:
|
||||
"""
|
||||
Filter messages and return IDs of messages that should be deleted.
|
||||
|
||||
Args:
|
||||
messages: Batch of messages to evaluate
|
||||
app_to_tenant: Mapping from app_id to tenant_id
|
||||
|
||||
Returns:
|
||||
List of message IDs that should be deleted
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
class BillingDisabledPolicy(MessagesCleanPolicy):
|
||||
"""
|
||||
Policy for community or enterpriseedition (billing disabled).
|
||||
|
||||
No special filter logic, just return all message ids.
|
||||
"""
|
||||
|
||||
def filter_message_ids(
|
||||
self,
|
||||
messages: Sequence[SimpleMessage],
|
||||
app_to_tenant: dict[str, str],
|
||||
) -> Sequence[str]:
|
||||
return [msg.id for msg in messages]
|
||||
|
||||
|
||||
class BillingSandboxPolicy(MessagesCleanPolicy):
|
||||
"""
|
||||
Policy for sandbox plan tenants in cloud edition (billing enabled).
|
||||
|
||||
Filters messages based on sandbox plan expiration rules:
|
||||
- Skip tenants in the whitelist
|
||||
- Only delete messages from sandbox plan tenants
|
||||
- Respect grace period after subscription expiration
|
||||
- Safe default: if tenant mapping or plan is missing, do NOT delete
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
plan_provider: Callable[[Sequence[str]], dict[str, SubscriptionPlan]],
|
||||
graceful_period_days: int = 21,
|
||||
tenant_whitelist: Sequence[str] | None = None,
|
||||
current_timestamp: int | None = None,
|
||||
) -> None:
|
||||
self._graceful_period_days = graceful_period_days
|
||||
self._tenant_whitelist: Sequence[str] = tenant_whitelist or []
|
||||
self._plan_provider = plan_provider
|
||||
self._current_timestamp = current_timestamp
|
||||
|
||||
def filter_message_ids(
|
||||
self,
|
||||
messages: Sequence[SimpleMessage],
|
||||
app_to_tenant: dict[str, str],
|
||||
) -> Sequence[str]:
|
||||
"""
|
||||
Filter messages based on sandbox plan expiration rules.
|
||||
|
||||
Args:
|
||||
messages: Batch of messages to evaluate
|
||||
app_to_tenant: Mapping from app_id to tenant_id
|
||||
|
||||
Returns:
|
||||
List of message IDs that should be deleted
|
||||
"""
|
||||
if not messages or not app_to_tenant:
|
||||
return []
|
||||
|
||||
# Get unique tenant_ids and fetch subscription plans
|
||||
tenant_ids = list(set(app_to_tenant.values()))
|
||||
tenant_plans = self._plan_provider(tenant_ids)
|
||||
|
||||
if not tenant_plans:
|
||||
return []
|
||||
|
||||
# Apply sandbox deletion rules
|
||||
return self._filter_expired_sandbox_messages(
|
||||
messages=messages,
|
||||
app_to_tenant=app_to_tenant,
|
||||
tenant_plans=tenant_plans,
|
||||
)
|
||||
|
||||
def _filter_expired_sandbox_messages(
|
||||
self,
|
||||
messages: Sequence[SimpleMessage],
|
||||
app_to_tenant: dict[str, str],
|
||||
tenant_plans: dict[str, SubscriptionPlan],
|
||||
) -> list[str]:
|
||||
"""
|
||||
Filter messages that should be deleted based on sandbox plan expiration.
|
||||
|
||||
A message should be deleted if:
|
||||
1. It belongs to a sandbox tenant AND
|
||||
2. Either:
|
||||
a) The tenant has no previous subscription (expiration_date == -1), OR
|
||||
b) The subscription expired more than graceful_period_days ago
|
||||
|
||||
Args:
|
||||
messages: List of message objects with id and app_id attributes
|
||||
app_to_tenant: Mapping from app_id to tenant_id
|
||||
tenant_plans: Mapping from tenant_id to subscription plan info
|
||||
|
||||
Returns:
|
||||
List of message IDs that should be deleted
|
||||
"""
|
||||
current_timestamp = self._current_timestamp
|
||||
if current_timestamp is None:
|
||||
current_timestamp = int(datetime.datetime.now(datetime.UTC).timestamp())
|
||||
|
||||
sandbox_message_ids: list[str] = []
|
||||
graceful_period_seconds = self._graceful_period_days * 24 * 60 * 60
|
||||
|
||||
for msg in messages:
|
||||
# Get tenant_id for this message's app
|
||||
tenant_id = app_to_tenant.get(msg.app_id)
|
||||
if not tenant_id:
|
||||
continue
|
||||
|
||||
# Skip tenant messages in whitelist
|
||||
if tenant_id in self._tenant_whitelist:
|
||||
continue
|
||||
|
||||
# Get subscription plan for this tenant
|
||||
tenant_plan = tenant_plans.get(tenant_id)
|
||||
if not tenant_plan:
|
||||
continue
|
||||
|
||||
plan = str(tenant_plan["plan"])
|
||||
expiration_date = int(tenant_plan["expiration_date"])
|
||||
|
||||
# Only process sandbox plans
|
||||
if plan != CloudPlan.SANDBOX:
|
||||
continue
|
||||
|
||||
# Case 1: No previous subscription (-1 means never had a paid subscription)
|
||||
if expiration_date == -1:
|
||||
sandbox_message_ids.append(msg.id)
|
||||
continue
|
||||
|
||||
# Case 2: Subscription expired beyond grace period
|
||||
if current_timestamp - expiration_date > graceful_period_seconds:
|
||||
sandbox_message_ids.append(msg.id)
|
||||
|
||||
return sandbox_message_ids
|
||||
|
||||
|
||||
def create_message_clean_policy(
|
||||
graceful_period_days: int = 21,
|
||||
current_timestamp: int | None = None,
|
||||
) -> MessagesCleanPolicy:
|
||||
"""
|
||||
Factory function to create the appropriate message clean policy.
|
||||
|
||||
Determines which policy to use based on BILLING_ENABLED configuration:
|
||||
- If BILLING_ENABLED is True: returns BillingSandboxPolicy
|
||||
- If BILLING_ENABLED is False: returns BillingDisabledPolicy
|
||||
|
||||
Args:
|
||||
graceful_period_days: Grace period in days after subscription expiration (default: 21)
|
||||
current_timestamp: Current Unix timestamp for testing (default: None, uses current time)
|
||||
"""
|
||||
if not dify_config.BILLING_ENABLED:
|
||||
logger.info("create_message_clean_policy: billing disabled, using BillingDisabledPolicy")
|
||||
return BillingDisabledPolicy()
|
||||
|
||||
# Billing enabled - fetch whitelist from BillingService
|
||||
tenant_whitelist = BillingService.get_expired_subscription_cleanup_whitelist()
|
||||
plan_provider = BillingService.get_plan_bulk_with_cache
|
||||
|
||||
logger.info(
|
||||
"create_message_clean_policy: billing enabled, using BillingSandboxPolicy "
|
||||
"(graceful_period_days=%s, whitelist=%s)",
|
||||
graceful_period_days,
|
||||
tenant_whitelist,
|
||||
)
|
||||
|
||||
return BillingSandboxPolicy(
|
||||
plan_provider=plan_provider,
|
||||
graceful_period_days=graceful_period_days,
|
||||
tenant_whitelist=tenant_whitelist,
|
||||
current_timestamp=current_timestamp,
|
||||
)
|
||||
334
api/services/retention/conversation/messages_clean_service.py
Normal file
334
api/services/retention/conversation/messages_clean_service.py
Normal file
@@ -0,0 +1,334 @@
|
||||
import datetime
|
||||
import logging
|
||||
import random
|
||||
from collections.abc import Sequence
|
||||
from typing import cast
|
||||
|
||||
from sqlalchemy import delete, select
|
||||
from sqlalchemy.engine import CursorResult
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from extensions.ext_database import db
|
||||
from models.model import (
|
||||
App,
|
||||
AppAnnotationHitHistory,
|
||||
DatasetRetrieverResource,
|
||||
Message,
|
||||
MessageAgentThought,
|
||||
MessageAnnotation,
|
||||
MessageChain,
|
||||
MessageFeedback,
|
||||
MessageFile,
|
||||
)
|
||||
from models.web import SavedMessage
|
||||
from services.retention.conversation.messages_clean_policy import (
|
||||
MessagesCleanPolicy,
|
||||
SimpleMessage,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MessagesCleanService:
|
||||
"""
|
||||
Service for cleaning expired messages based on retention policies.
|
||||
|
||||
Compatible with non cloud edition (billing disabled): all messages in the time range will be deleted.
|
||||
If billing is enabled: only sandbox plan tenant messages are deleted (with whitelist and grace period support).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
policy: MessagesCleanPolicy,
|
||||
end_before: datetime.datetime,
|
||||
start_from: datetime.datetime | None = None,
|
||||
batch_size: int = 1000,
|
||||
dry_run: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the service with cleanup parameters.
|
||||
|
||||
Args:
|
||||
policy: The policy that determines which messages to delete
|
||||
end_before: End time (exclusive) of the range
|
||||
start_from: Optional start time (inclusive) of the range
|
||||
batch_size: Number of messages to process per batch
|
||||
dry_run: Whether to perform a dry run (no actual deletion)
|
||||
"""
|
||||
self._policy = policy
|
||||
self._end_before = end_before
|
||||
self._start_from = start_from
|
||||
self._batch_size = batch_size
|
||||
self._dry_run = dry_run
|
||||
|
||||
@classmethod
|
||||
def from_time_range(
|
||||
cls,
|
||||
policy: MessagesCleanPolicy,
|
||||
start_from: datetime.datetime,
|
||||
end_before: datetime.datetime,
|
||||
batch_size: int = 1000,
|
||||
dry_run: bool = False,
|
||||
) -> "MessagesCleanService":
|
||||
"""
|
||||
Create a service instance for cleaning messages within a specific time range.
|
||||
|
||||
Time range is [start_from, end_before).
|
||||
|
||||
Args:
|
||||
policy: The policy that determines which messages to delete
|
||||
start_from: Start time (inclusive) of the range
|
||||
end_before: End time (exclusive) of the range
|
||||
batch_size: Number of messages to process per batch
|
||||
dry_run: Whether to perform a dry run (no actual deletion)
|
||||
|
||||
Returns:
|
||||
MessagesCleanService instance
|
||||
|
||||
Raises:
|
||||
ValueError: If start_from >= end_before or invalid parameters
|
||||
"""
|
||||
if start_from >= end_before:
|
||||
raise ValueError(f"start_from ({start_from}) must be less than end_before ({end_before})")
|
||||
|
||||
if batch_size <= 0:
|
||||
raise ValueError(f"batch_size ({batch_size}) must be greater than 0")
|
||||
|
||||
logger.info(
|
||||
"clean_messages: start_from=%s, end_before=%s, batch_size=%s, policy=%s",
|
||||
start_from,
|
||||
end_before,
|
||||
batch_size,
|
||||
policy.__class__.__name__,
|
||||
)
|
||||
|
||||
return cls(
|
||||
policy=policy,
|
||||
end_before=end_before,
|
||||
start_from=start_from,
|
||||
batch_size=batch_size,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_days(
|
||||
cls,
|
||||
policy: MessagesCleanPolicy,
|
||||
days: int = 30,
|
||||
batch_size: int = 1000,
|
||||
dry_run: bool = False,
|
||||
) -> "MessagesCleanService":
|
||||
"""
|
||||
Create a service instance for cleaning messages older than specified days.
|
||||
|
||||
Args:
|
||||
policy: The policy that determines which messages to delete
|
||||
days: Number of days to look back from now
|
||||
batch_size: Number of messages to process per batch
|
||||
dry_run: Whether to perform a dry run (no actual deletion)
|
||||
|
||||
Returns:
|
||||
MessagesCleanService instance
|
||||
|
||||
Raises:
|
||||
ValueError: If invalid parameters
|
||||
"""
|
||||
if days < 0:
|
||||
raise ValueError(f"days ({days}) must be greater than or equal to 0")
|
||||
|
||||
if batch_size <= 0:
|
||||
raise ValueError(f"batch_size ({batch_size}) must be greater than 0")
|
||||
|
||||
end_before = datetime.datetime.now() - datetime.timedelta(days=days)
|
||||
|
||||
logger.info(
|
||||
"clean_messages: days=%s, end_before=%s, batch_size=%s, policy=%s",
|
||||
days,
|
||||
end_before,
|
||||
batch_size,
|
||||
policy.__class__.__name__,
|
||||
)
|
||||
|
||||
return cls(policy=policy, end_before=end_before, start_from=None, batch_size=batch_size, dry_run=dry_run)
|
||||
|
||||
def run(self) -> dict[str, int]:
|
||||
"""
|
||||
Execute the message cleanup operation.
|
||||
|
||||
Returns:
|
||||
Dict with statistics: batches, filtered_messages, total_deleted
|
||||
"""
|
||||
return self._clean_messages_by_time_range()
|
||||
|
||||
def _clean_messages_by_time_range(self) -> dict[str, int]:
|
||||
"""
|
||||
Clean messages within a time range using cursor-based pagination.
|
||||
|
||||
Time range is [start_from, end_before)
|
||||
|
||||
Steps:
|
||||
1. Iterate messages using cursor pagination (by created_at, id)
|
||||
2. Query app_id -> tenant_id mapping
|
||||
3. Delegate to policy to determine which messages to delete
|
||||
4. Batch delete messages and their relations
|
||||
|
||||
Returns:
|
||||
Dict with statistics: batches, filtered_messages, total_deleted
|
||||
"""
|
||||
stats = {
|
||||
"batches": 0,
|
||||
"total_messages": 0,
|
||||
"filtered_messages": 0,
|
||||
"total_deleted": 0,
|
||||
}
|
||||
|
||||
# Cursor-based pagination using (created_at, id) to avoid infinite loops
|
||||
# and ensure proper ordering with time-based filtering
|
||||
_cursor: tuple[datetime.datetime, str] | None = None
|
||||
|
||||
logger.info(
|
||||
"clean_messages: start cleaning messages (dry_run=%s), start_from=%s, end_before=%s",
|
||||
self._dry_run,
|
||||
self._start_from,
|
||||
self._end_before,
|
||||
)
|
||||
|
||||
while True:
|
||||
stats["batches"] += 1
|
||||
|
||||
# Step 1: Fetch a batch of messages using cursor
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
msg_stmt = (
|
||||
select(Message.id, Message.app_id, Message.created_at)
|
||||
.where(Message.created_at < self._end_before)
|
||||
.order_by(Message.created_at, Message.id)
|
||||
.limit(self._batch_size)
|
||||
)
|
||||
|
||||
if self._start_from:
|
||||
msg_stmt = msg_stmt.where(Message.created_at >= self._start_from)
|
||||
|
||||
# Apply cursor condition: (created_at, id) > (last_created_at, last_message_id)
|
||||
# This translates to:
|
||||
# created_at > last_created_at OR (created_at = last_created_at AND id > last_message_id)
|
||||
if _cursor:
|
||||
# Continuing from previous batch
|
||||
msg_stmt = msg_stmt.where(
|
||||
(Message.created_at > _cursor[0])
|
||||
| ((Message.created_at == _cursor[0]) & (Message.id > _cursor[1]))
|
||||
)
|
||||
|
||||
raw_messages = list(session.execute(msg_stmt).all())
|
||||
messages = [
|
||||
SimpleMessage(id=msg_id, app_id=app_id, created_at=msg_created_at)
|
||||
for msg_id, app_id, msg_created_at in raw_messages
|
||||
]
|
||||
|
||||
# Track total messages fetched across all batches
|
||||
stats["total_messages"] += len(messages)
|
||||
|
||||
if not messages:
|
||||
logger.info("clean_messages (batch %s): no more messages to process", stats["batches"])
|
||||
break
|
||||
|
||||
# Update cursor to the last message's (created_at, id)
|
||||
_cursor = (messages[-1].created_at, messages[-1].id)
|
||||
|
||||
# Step 2: Extract app_ids and query tenant_ids
|
||||
app_ids = list({msg.app_id for msg in messages})
|
||||
|
||||
if not app_ids:
|
||||
logger.info("clean_messages (batch %s): no app_ids found, skip", stats["batches"])
|
||||
continue
|
||||
|
||||
app_stmt = select(App.id, App.tenant_id).where(App.id.in_(app_ids))
|
||||
apps = list(session.execute(app_stmt).all())
|
||||
|
||||
if not apps:
|
||||
logger.info("clean_messages (batch %s): no apps found, skip", stats["batches"])
|
||||
continue
|
||||
|
||||
# Build app_id -> tenant_id mapping
|
||||
app_to_tenant: dict[str, str] = {app.id: app.tenant_id for app in apps}
|
||||
|
||||
# Step 3: Delegate to policy to determine which messages to delete
|
||||
message_ids_to_delete = self._policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
if not message_ids_to_delete:
|
||||
logger.info("clean_messages (batch %s): no messages to delete, skip", stats["batches"])
|
||||
continue
|
||||
|
||||
stats["filtered_messages"] += len(message_ids_to_delete)
|
||||
|
||||
# Step 4: Batch delete messages and their relations
|
||||
if not self._dry_run:
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
# Delete related records first
|
||||
self._batch_delete_message_relations(session, message_ids_to_delete)
|
||||
|
||||
# Delete messages
|
||||
delete_stmt = delete(Message).where(Message.id.in_(message_ids_to_delete))
|
||||
delete_result = cast(CursorResult, session.execute(delete_stmt))
|
||||
messages_deleted = delete_result.rowcount
|
||||
session.commit()
|
||||
|
||||
stats["total_deleted"] += messages_deleted
|
||||
|
||||
logger.info(
|
||||
"clean_messages (batch %s): processed %s messages, deleted %s messages",
|
||||
stats["batches"],
|
||||
len(messages),
|
||||
messages_deleted,
|
||||
)
|
||||
else:
|
||||
# Log random sample of message IDs that would be deleted (up to 10)
|
||||
sample_size = min(10, len(message_ids_to_delete))
|
||||
sampled_ids = random.sample(list(message_ids_to_delete), sample_size)
|
||||
|
||||
logger.info(
|
||||
"clean_messages (batch %s, dry_run): would delete %s messages, sampling %s ids:",
|
||||
stats["batches"],
|
||||
len(message_ids_to_delete),
|
||||
sample_size,
|
||||
)
|
||||
for msg_id in sampled_ids:
|
||||
logger.info("clean_messages (batch %s, dry_run) sample: message_id=%s", stats["batches"], msg_id)
|
||||
|
||||
logger.info(
|
||||
"clean_messages completed: total batches: %s, total messages: %s, filtered messages: %s, total deleted: %s",
|
||||
stats["batches"],
|
||||
stats["total_messages"],
|
||||
stats["filtered_messages"],
|
||||
stats["total_deleted"],
|
||||
)
|
||||
|
||||
return stats
|
||||
|
||||
@staticmethod
|
||||
def _batch_delete_message_relations(session: Session, message_ids: Sequence[str]) -> None:
|
||||
"""
|
||||
Batch delete all related records for given message IDs.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
message_ids: List of message IDs to delete relations for
|
||||
"""
|
||||
if not message_ids:
|
||||
return
|
||||
|
||||
# Delete all related records in batch
|
||||
session.execute(delete(MessageFeedback).where(MessageFeedback.message_id.in_(message_ids)))
|
||||
|
||||
session.execute(delete(MessageAnnotation).where(MessageAnnotation.message_id.in_(message_ids)))
|
||||
|
||||
session.execute(delete(MessageChain).where(MessageChain.message_id.in_(message_ids)))
|
||||
|
||||
session.execute(delete(MessageAgentThought).where(MessageAgentThought.message_id.in_(message_ids)))
|
||||
|
||||
session.execute(delete(MessageFile).where(MessageFile.message_id.in_(message_ids)))
|
||||
|
||||
session.execute(delete(SavedMessage).where(SavedMessage.message_id.in_(message_ids)))
|
||||
|
||||
session.execute(delete(AppAnnotationHitHistory).where(AppAnnotationHitHistory.message_id.in_(message_ids)))
|
||||
|
||||
session.execute(delete(DatasetRetrieverResource).where(DatasetRetrieverResource.message_id.in_(message_ids)))
|
||||
File diff suppressed because it is too large
Load Diff
627
api/tests/unit_tests/services/test_messages_clean_service.py
Normal file
627
api/tests/unit_tests/services/test_messages_clean_service.py
Normal file
@@ -0,0 +1,627 @@
|
||||
import datetime
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from enums.cloud_plan import CloudPlan
|
||||
from services.retention.conversation.messages_clean_policy import (
|
||||
BillingDisabledPolicy,
|
||||
BillingSandboxPolicy,
|
||||
SimpleMessage,
|
||||
create_message_clean_policy,
|
||||
)
|
||||
from services.retention.conversation.messages_clean_service import MessagesCleanService
|
||||
|
||||
|
||||
def make_simple_message(msg_id: str, app_id: str) -> SimpleMessage:
|
||||
"""Helper to create a SimpleMessage with a fixed created_at timestamp."""
|
||||
return SimpleMessage(id=msg_id, app_id=app_id, created_at=datetime.datetime(2024, 1, 1))
|
||||
|
||||
|
||||
def make_plan_provider(tenant_plans: dict) -> MagicMock:
|
||||
"""Helper to create a mock plan_provider that returns the given tenant_plans."""
|
||||
provider = MagicMock()
|
||||
provider.return_value = tenant_plans
|
||||
return provider
|
||||
|
||||
|
||||
class TestBillingSandboxPolicyFilterMessageIds:
|
||||
"""Unit tests for BillingSandboxPolicy.filter_message_ids method."""
|
||||
|
||||
# Fixed timestamp for deterministic tests
|
||||
CURRENT_TIMESTAMP = 1000000
|
||||
GRACEFUL_PERIOD_DAYS = 8
|
||||
GRACEFUL_PERIOD_SECONDS = GRACEFUL_PERIOD_DAYS * 24 * 60 * 60
|
||||
|
||||
def test_missing_tenant_mapping_excluded(self):
|
||||
"""Test that messages with missing app-to-tenant mapping are excluded."""
|
||||
# Arrange
|
||||
messages = [
|
||||
make_simple_message("msg1", "app1"),
|
||||
make_simple_message("msg2", "app2"),
|
||||
]
|
||||
app_to_tenant = {} # No mapping
|
||||
tenant_plans = {"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": -1}}
|
||||
plan_provider = make_plan_provider(tenant_plans)
|
||||
|
||||
policy = BillingSandboxPolicy(
|
||||
plan_provider=plan_provider,
|
||||
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
|
||||
current_timestamp=self.CURRENT_TIMESTAMP,
|
||||
)
|
||||
|
||||
# Act
|
||||
result = policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert
|
||||
assert list(result) == []
|
||||
|
||||
def test_missing_tenant_plan_excluded(self):
|
||||
"""Test that messages with missing tenant plan are excluded (safe default)."""
|
||||
# Arrange
|
||||
messages = [
|
||||
make_simple_message("msg1", "app1"),
|
||||
make_simple_message("msg2", "app2"),
|
||||
]
|
||||
app_to_tenant = {"app1": "tenant1", "app2": "tenant2"}
|
||||
tenant_plans = {} # No plans
|
||||
plan_provider = make_plan_provider(tenant_plans)
|
||||
|
||||
policy = BillingSandboxPolicy(
|
||||
plan_provider=plan_provider,
|
||||
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
|
||||
current_timestamp=self.CURRENT_TIMESTAMP,
|
||||
)
|
||||
|
||||
# Act
|
||||
result = policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert
|
||||
assert list(result) == []
|
||||
|
||||
def test_non_sandbox_plan_excluded(self):
|
||||
"""Test that messages from non-sandbox plans (PROFESSIONAL/TEAM) are excluded."""
|
||||
# Arrange
|
||||
messages = [
|
||||
make_simple_message("msg1", "app1"),
|
||||
make_simple_message("msg2", "app2"),
|
||||
make_simple_message("msg3", "app3"),
|
||||
]
|
||||
app_to_tenant = {"app1": "tenant1", "app2": "tenant2", "app3": "tenant3"}
|
||||
tenant_plans = {
|
||||
"tenant1": {"plan": CloudPlan.PROFESSIONAL, "expiration_date": -1},
|
||||
"tenant2": {"plan": CloudPlan.TEAM, "expiration_date": -1},
|
||||
"tenant3": {"plan": CloudPlan.SANDBOX, "expiration_date": -1}, # Only this one
|
||||
}
|
||||
plan_provider = make_plan_provider(tenant_plans)
|
||||
|
||||
policy = BillingSandboxPolicy(
|
||||
plan_provider=plan_provider,
|
||||
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
|
||||
current_timestamp=self.CURRENT_TIMESTAMP,
|
||||
)
|
||||
|
||||
# Act
|
||||
result = policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert - only msg3 (sandbox tenant) should be included
|
||||
assert set(result) == {"msg3"}
|
||||
|
||||
def test_whitelist_skip(self):
|
||||
"""Test that whitelisted tenants are excluded even if sandbox + expired."""
|
||||
# Arrange
|
||||
messages = [
|
||||
make_simple_message("msg1", "app1"), # Whitelisted - excluded
|
||||
make_simple_message("msg2", "app2"), # Not whitelisted - included
|
||||
make_simple_message("msg3", "app3"), # Whitelisted - excluded
|
||||
]
|
||||
app_to_tenant = {"app1": "tenant1", "app2": "tenant2", "app3": "tenant3"}
|
||||
tenant_plans = {
|
||||
"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": -1},
|
||||
"tenant2": {"plan": CloudPlan.SANDBOX, "expiration_date": -1},
|
||||
"tenant3": {"plan": CloudPlan.SANDBOX, "expiration_date": -1},
|
||||
}
|
||||
plan_provider = make_plan_provider(tenant_plans)
|
||||
tenant_whitelist = ["tenant1", "tenant3"]
|
||||
|
||||
policy = BillingSandboxPolicy(
|
||||
plan_provider=plan_provider,
|
||||
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
|
||||
tenant_whitelist=tenant_whitelist,
|
||||
current_timestamp=self.CURRENT_TIMESTAMP,
|
||||
)
|
||||
|
||||
# Act
|
||||
result = policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert - only msg2 should be included
|
||||
assert set(result) == {"msg2"}
|
||||
|
||||
def test_no_previous_subscription_included(self):
|
||||
"""Test that messages with expiration_date=-1 (no previous subscription) are included."""
|
||||
# Arrange
|
||||
messages = [
|
||||
make_simple_message("msg1", "app1"),
|
||||
make_simple_message("msg2", "app2"),
|
||||
]
|
||||
app_to_tenant = {"app1": "tenant1", "app2": "tenant2"}
|
||||
tenant_plans = {
|
||||
"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": -1},
|
||||
"tenant2": {"plan": CloudPlan.SANDBOX, "expiration_date": -1},
|
||||
}
|
||||
plan_provider = make_plan_provider(tenant_plans)
|
||||
|
||||
policy = BillingSandboxPolicy(
|
||||
plan_provider=plan_provider,
|
||||
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
|
||||
current_timestamp=self.CURRENT_TIMESTAMP,
|
||||
)
|
||||
|
||||
# Act
|
||||
result = policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert - all messages should be included
|
||||
assert set(result) == {"msg1", "msg2"}
|
||||
|
||||
def test_within_grace_period_excluded(self):
|
||||
"""Test that messages within grace period are excluded."""
|
||||
# Arrange
|
||||
now = self.CURRENT_TIMESTAMP
|
||||
expired_1_day_ago = now - (1 * 24 * 60 * 60)
|
||||
expired_5_days_ago = now - (5 * 24 * 60 * 60)
|
||||
expired_7_days_ago = now - (7 * 24 * 60 * 60)
|
||||
|
||||
messages = [
|
||||
make_simple_message("msg1", "app1"),
|
||||
make_simple_message("msg2", "app2"),
|
||||
make_simple_message("msg3", "app3"),
|
||||
]
|
||||
app_to_tenant = {"app1": "tenant1", "app2": "tenant2", "app3": "tenant3"}
|
||||
tenant_plans = {
|
||||
"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": expired_1_day_ago},
|
||||
"tenant2": {"plan": CloudPlan.SANDBOX, "expiration_date": expired_5_days_ago},
|
||||
"tenant3": {"plan": CloudPlan.SANDBOX, "expiration_date": expired_7_days_ago},
|
||||
}
|
||||
plan_provider = make_plan_provider(tenant_plans)
|
||||
|
||||
policy = BillingSandboxPolicy(
|
||||
plan_provider=plan_provider,
|
||||
graceful_period_days=self.GRACEFUL_PERIOD_DAYS, # 8 days
|
||||
current_timestamp=now,
|
||||
)
|
||||
|
||||
# Act
|
||||
result = policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert - all within 8-day grace period, none should be included
|
||||
assert list(result) == []
|
||||
|
||||
def test_exactly_at_boundary_excluded(self):
|
||||
"""Test that messages exactly at grace period boundary are excluded (code uses >)."""
|
||||
# Arrange
|
||||
now = self.CURRENT_TIMESTAMP
|
||||
expired_exactly_8_days_ago = now - self.GRACEFUL_PERIOD_SECONDS # Exactly at boundary
|
||||
|
||||
messages = [make_simple_message("msg1", "app1")]
|
||||
app_to_tenant = {"app1": "tenant1"}
|
||||
tenant_plans = {
|
||||
"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": expired_exactly_8_days_ago},
|
||||
}
|
||||
plan_provider = make_plan_provider(tenant_plans)
|
||||
|
||||
policy = BillingSandboxPolicy(
|
||||
plan_provider=plan_provider,
|
||||
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
|
||||
current_timestamp=now,
|
||||
)
|
||||
|
||||
# Act
|
||||
result = policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert - exactly at boundary (==) should be excluded (code uses >)
|
||||
assert list(result) == []
|
||||
|
||||
def test_beyond_grace_period_included(self):
|
||||
"""Test that messages beyond grace period are included."""
|
||||
# Arrange
|
||||
now = self.CURRENT_TIMESTAMP
|
||||
expired_9_days_ago = now - (9 * 24 * 60 * 60) # Just beyond 8-day grace
|
||||
expired_30_days_ago = now - (30 * 24 * 60 * 60) # Well beyond
|
||||
|
||||
messages = [
|
||||
make_simple_message("msg1", "app1"),
|
||||
make_simple_message("msg2", "app2"),
|
||||
]
|
||||
app_to_tenant = {"app1": "tenant1", "app2": "tenant2"}
|
||||
tenant_plans = {
|
||||
"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": expired_9_days_ago},
|
||||
"tenant2": {"plan": CloudPlan.SANDBOX, "expiration_date": expired_30_days_ago},
|
||||
}
|
||||
plan_provider = make_plan_provider(tenant_plans)
|
||||
|
||||
policy = BillingSandboxPolicy(
|
||||
plan_provider=plan_provider,
|
||||
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
|
||||
current_timestamp=now,
|
||||
)
|
||||
|
||||
# Act
|
||||
result = policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert - both beyond grace period, should be included
|
||||
assert set(result) == {"msg1", "msg2"}
|
||||
|
||||
def test_empty_messages_returns_empty(self):
|
||||
"""Test that empty messages returns empty list."""
|
||||
# Arrange
|
||||
messages: list[SimpleMessage] = []
|
||||
app_to_tenant = {"app1": "tenant1"}
|
||||
plan_provider = make_plan_provider({"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": -1}})
|
||||
|
||||
policy = BillingSandboxPolicy(
|
||||
plan_provider=plan_provider,
|
||||
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
|
||||
current_timestamp=self.CURRENT_TIMESTAMP,
|
||||
)
|
||||
|
||||
# Act
|
||||
result = policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert
|
||||
assert list(result) == []
|
||||
|
||||
def test_plan_provider_called_with_correct_tenant_ids(self):
|
||||
"""Test that plan_provider is called with correct tenant_ids."""
|
||||
# Arrange
|
||||
messages = [
|
||||
make_simple_message("msg1", "app1"),
|
||||
make_simple_message("msg2", "app2"),
|
||||
make_simple_message("msg3", "app3"),
|
||||
]
|
||||
app_to_tenant = {"app1": "tenant1", "app2": "tenant2", "app3": "tenant1"} # tenant1 appears twice
|
||||
plan_provider = make_plan_provider({})
|
||||
|
||||
policy = BillingSandboxPolicy(
|
||||
plan_provider=plan_provider,
|
||||
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
|
||||
current_timestamp=self.CURRENT_TIMESTAMP,
|
||||
)
|
||||
|
||||
# Act
|
||||
policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert - plan_provider should be called once with unique tenant_ids
|
||||
plan_provider.assert_called_once()
|
||||
called_tenant_ids = set(plan_provider.call_args[0][0])
|
||||
assert called_tenant_ids == {"tenant1", "tenant2"}
|
||||
|
||||
def test_complex_mixed_scenario(self):
|
||||
"""Test complex scenario with mixed plans, expirations, whitelist, and missing mappings."""
|
||||
# Arrange
|
||||
now = self.CURRENT_TIMESTAMP
|
||||
sandbox_expired_old = now - (15 * 24 * 60 * 60) # Beyond grace
|
||||
sandbox_expired_recent = now - (3 * 24 * 60 * 60) # Within grace
|
||||
future_expiration = now + (30 * 24 * 60 * 60)
|
||||
|
||||
messages = [
|
||||
make_simple_message("msg1", "app1"), # Sandbox, no subscription - included
|
||||
make_simple_message("msg2", "app2"), # Sandbox, expired old - included
|
||||
make_simple_message("msg3", "app3"), # Sandbox, within grace - excluded
|
||||
make_simple_message("msg4", "app4"), # Team plan, active - excluded
|
||||
make_simple_message("msg5", "app5"), # No tenant mapping - excluded
|
||||
make_simple_message("msg6", "app6"), # No plan info - excluded
|
||||
make_simple_message("msg7", "app7"), # Sandbox, expired old, whitelisted - excluded
|
||||
]
|
||||
app_to_tenant = {
|
||||
"app1": "tenant1",
|
||||
"app2": "tenant2",
|
||||
"app3": "tenant3",
|
||||
"app4": "tenant4",
|
||||
"app6": "tenant6", # Has mapping but no plan
|
||||
"app7": "tenant7",
|
||||
# app5 has no mapping
|
||||
}
|
||||
tenant_plans = {
|
||||
"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": -1},
|
||||
"tenant2": {"plan": CloudPlan.SANDBOX, "expiration_date": sandbox_expired_old},
|
||||
"tenant3": {"plan": CloudPlan.SANDBOX, "expiration_date": sandbox_expired_recent},
|
||||
"tenant4": {"plan": CloudPlan.TEAM, "expiration_date": future_expiration},
|
||||
"tenant7": {"plan": CloudPlan.SANDBOX, "expiration_date": sandbox_expired_old},
|
||||
# tenant6 has no plan
|
||||
}
|
||||
plan_provider = make_plan_provider(tenant_plans)
|
||||
tenant_whitelist = ["tenant7"]
|
||||
|
||||
policy = BillingSandboxPolicy(
|
||||
plan_provider=plan_provider,
|
||||
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
|
||||
tenant_whitelist=tenant_whitelist,
|
||||
current_timestamp=now,
|
||||
)
|
||||
|
||||
# Act
|
||||
result = policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert - only msg1 and msg2 should be included
|
||||
assert set(result) == {"msg1", "msg2"}
|
||||
|
||||
|
||||
class TestBillingDisabledPolicyFilterMessageIds:
|
||||
"""Unit tests for BillingDisabledPolicy.filter_message_ids method."""
|
||||
|
||||
def test_returns_all_message_ids(self):
|
||||
"""Test that all message IDs are returned (order-preserving)."""
|
||||
# Arrange
|
||||
messages = [
|
||||
make_simple_message("msg1", "app1"),
|
||||
make_simple_message("msg2", "app2"),
|
||||
make_simple_message("msg3", "app3"),
|
||||
]
|
||||
app_to_tenant = {"app1": "tenant1", "app2": "tenant2"}
|
||||
|
||||
policy = BillingDisabledPolicy()
|
||||
|
||||
# Act
|
||||
result = policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert - all message IDs returned in order
|
||||
assert list(result) == ["msg1", "msg2", "msg3"]
|
||||
|
||||
def test_ignores_app_to_tenant(self):
|
||||
"""Test that app_to_tenant mapping is ignored."""
|
||||
# Arrange
|
||||
messages = [
|
||||
make_simple_message("msg1", "app1"),
|
||||
make_simple_message("msg2", "app2"),
|
||||
]
|
||||
app_to_tenant: dict[str, str] = {} # Empty - should be ignored
|
||||
|
||||
policy = BillingDisabledPolicy()
|
||||
|
||||
# Act
|
||||
result = policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert - all message IDs still returned
|
||||
assert list(result) == ["msg1", "msg2"]
|
||||
|
||||
def test_empty_messages_returns_empty(self):
|
||||
"""Test that empty messages returns empty list."""
|
||||
# Arrange
|
||||
messages: list[SimpleMessage] = []
|
||||
app_to_tenant = {"app1": "tenant1"}
|
||||
|
||||
policy = BillingDisabledPolicy()
|
||||
|
||||
# Act
|
||||
result = policy.filter_message_ids(messages, app_to_tenant)
|
||||
|
||||
# Assert
|
||||
assert list(result) == []
|
||||
|
||||
|
||||
class TestCreateMessageCleanPolicy:
|
||||
"""Unit tests for create_message_clean_policy factory function."""
|
||||
|
||||
@patch("services.retention.conversation.messages_clean_policy.dify_config")
|
||||
def test_billing_disabled_returns_billing_disabled_policy(self, mock_config):
|
||||
"""Test that BILLING_ENABLED=False returns BillingDisabledPolicy."""
|
||||
# Arrange
|
||||
mock_config.BILLING_ENABLED = False
|
||||
|
||||
# Act
|
||||
policy = create_message_clean_policy(graceful_period_days=21)
|
||||
|
||||
# Assert
|
||||
assert isinstance(policy, BillingDisabledPolicy)
|
||||
|
||||
@patch("services.retention.conversation.messages_clean_policy.BillingService")
|
||||
@patch("services.retention.conversation.messages_clean_policy.dify_config")
|
||||
def test_billing_enabled_policy_has_correct_internals(self, mock_config, mock_billing_service):
|
||||
"""Test that BillingSandboxPolicy is created with correct internal values."""
|
||||
# Arrange
|
||||
mock_config.BILLING_ENABLED = True
|
||||
whitelist = ["tenant1", "tenant2"]
|
||||
mock_billing_service.get_expired_subscription_cleanup_whitelist.return_value = whitelist
|
||||
mock_plan_provider = MagicMock()
|
||||
mock_billing_service.get_plan_bulk_with_cache = mock_plan_provider
|
||||
|
||||
# Act
|
||||
policy = create_message_clean_policy(graceful_period_days=14, current_timestamp=1234567)
|
||||
|
||||
# Assert
|
||||
mock_billing_service.get_expired_subscription_cleanup_whitelist.assert_called_once()
|
||||
assert isinstance(policy, BillingSandboxPolicy)
|
||||
assert policy._graceful_period_days == 14
|
||||
assert list(policy._tenant_whitelist) == whitelist
|
||||
assert policy._plan_provider == mock_plan_provider
|
||||
assert policy._current_timestamp == 1234567
|
||||
|
||||
|
||||
class TestMessagesCleanServiceFromTimeRange:
|
||||
"""Unit tests for MessagesCleanService.from_time_range factory method."""
|
||||
|
||||
def test_start_from_end_before_raises_value_error(self):
|
||||
"""Test that start_from == end_before raises ValueError."""
|
||||
policy = BillingDisabledPolicy()
|
||||
|
||||
# Arrange
|
||||
same_time = datetime.datetime(2024, 1, 1, 12, 0, 0)
|
||||
|
||||
# Act & Assert
|
||||
with pytest.raises(ValueError, match="start_from .* must be less than end_before"):
|
||||
MessagesCleanService.from_time_range(
|
||||
policy=policy,
|
||||
start_from=same_time,
|
||||
end_before=same_time,
|
||||
)
|
||||
|
||||
# Arrange
|
||||
start_from = datetime.datetime(2024, 12, 31)
|
||||
end_before = datetime.datetime(2024, 1, 1)
|
||||
|
||||
# Act & Assert
|
||||
with pytest.raises(ValueError, match="start_from .* must be less than end_before"):
|
||||
MessagesCleanService.from_time_range(
|
||||
policy=policy,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
)
|
||||
|
||||
def test_batch_size_raises_value_error(self):
|
||||
"""Test that batch_size=0 raises ValueError."""
|
||||
# Arrange
|
||||
start_from = datetime.datetime(2024, 1, 1)
|
||||
end_before = datetime.datetime(2024, 2, 1)
|
||||
policy = BillingDisabledPolicy()
|
||||
|
||||
# Act & Assert
|
||||
with pytest.raises(ValueError, match="batch_size .* must be greater than 0"):
|
||||
MessagesCleanService.from_time_range(
|
||||
policy=policy,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
batch_size=0,
|
||||
)
|
||||
|
||||
start_from = datetime.datetime(2024, 1, 1)
|
||||
end_before = datetime.datetime(2024, 2, 1)
|
||||
policy = BillingDisabledPolicy()
|
||||
|
||||
# Act & Assert
|
||||
with pytest.raises(ValueError, match="batch_size .* must be greater than 0"):
|
||||
MessagesCleanService.from_time_range(
|
||||
policy=policy,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
batch_size=-100,
|
||||
)
|
||||
|
||||
def test_valid_params_creates_instance(self):
|
||||
"""Test that valid parameters create a correctly configured instance."""
|
||||
# Arrange
|
||||
start_from = datetime.datetime(2024, 1, 1, 0, 0, 0)
|
||||
end_before = datetime.datetime(2024, 12, 31, 23, 59, 59)
|
||||
policy = BillingDisabledPolicy()
|
||||
batch_size = 500
|
||||
dry_run = True
|
||||
|
||||
# Act
|
||||
service = MessagesCleanService.from_time_range(
|
||||
policy=policy,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
batch_size=batch_size,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert isinstance(service, MessagesCleanService)
|
||||
assert service._policy is policy
|
||||
assert service._start_from == start_from
|
||||
assert service._end_before == end_before
|
||||
assert service._batch_size == batch_size
|
||||
assert service._dry_run == dry_run
|
||||
|
||||
def test_default_params(self):
|
||||
"""Test that default parameters are applied correctly."""
|
||||
# Arrange
|
||||
start_from = datetime.datetime(2024, 1, 1)
|
||||
end_before = datetime.datetime(2024, 2, 1)
|
||||
policy = BillingDisabledPolicy()
|
||||
|
||||
# Act
|
||||
service = MessagesCleanService.from_time_range(
|
||||
policy=policy,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert service._batch_size == 1000 # default
|
||||
assert service._dry_run is False # default
|
||||
|
||||
|
||||
class TestMessagesCleanServiceFromDays:
|
||||
"""Unit tests for MessagesCleanService.from_days factory method."""
|
||||
|
||||
def test_days_raises_value_error(self):
|
||||
"""Test that days < 0 raises ValueError."""
|
||||
# Arrange
|
||||
policy = BillingDisabledPolicy()
|
||||
|
||||
# Act & Assert
|
||||
with pytest.raises(ValueError, match="days .* must be greater than or equal to 0"):
|
||||
MessagesCleanService.from_days(policy=policy, days=-1)
|
||||
|
||||
# Act
|
||||
with patch("services.retention.conversation.messages_clean_service.datetime") as mock_datetime:
|
||||
fixed_now = datetime.datetime(2024, 6, 15, 14, 0, 0)
|
||||
mock_datetime.datetime.now.return_value = fixed_now
|
||||
mock_datetime.timedelta = datetime.timedelta
|
||||
|
||||
service = MessagesCleanService.from_days(policy=policy, days=0)
|
||||
|
||||
# Assert
|
||||
assert service._end_before == fixed_now
|
||||
|
||||
def test_batch_size_raises_value_error(self):
|
||||
"""Test that batch_size=0 raises ValueError."""
|
||||
# Arrange
|
||||
policy = BillingDisabledPolicy()
|
||||
|
||||
# Act & Assert
|
||||
with pytest.raises(ValueError, match="batch_size .* must be greater than 0"):
|
||||
MessagesCleanService.from_days(policy=policy, days=30, batch_size=0)
|
||||
|
||||
# Act & Assert
|
||||
with pytest.raises(ValueError, match="batch_size .* must be greater than 0"):
|
||||
MessagesCleanService.from_days(policy=policy, days=30, batch_size=-500)
|
||||
|
||||
def test_valid_params_creates_instance(self):
|
||||
"""Test that valid parameters create a correctly configured instance."""
|
||||
# Arrange
|
||||
policy = BillingDisabledPolicy()
|
||||
days = 90
|
||||
batch_size = 500
|
||||
dry_run = True
|
||||
|
||||
# Act
|
||||
with patch("services.retention.conversation.messages_clean_service.datetime") as mock_datetime:
|
||||
fixed_now = datetime.datetime(2024, 6, 15, 10, 30, 0)
|
||||
mock_datetime.datetime.now.return_value = fixed_now
|
||||
mock_datetime.timedelta = datetime.timedelta
|
||||
|
||||
service = MessagesCleanService.from_days(
|
||||
policy=policy,
|
||||
days=days,
|
||||
batch_size=batch_size,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
|
||||
# Assert
|
||||
expected_end_before = fixed_now - datetime.timedelta(days=days)
|
||||
assert isinstance(service, MessagesCleanService)
|
||||
assert service._policy is policy
|
||||
assert service._start_from is None
|
||||
assert service._end_before == expected_end_before
|
||||
assert service._batch_size == batch_size
|
||||
assert service._dry_run == dry_run
|
||||
|
||||
def test_default_params(self):
|
||||
"""Test that default parameters are applied correctly."""
|
||||
# Arrange
|
||||
policy = BillingDisabledPolicy()
|
||||
|
||||
# Act
|
||||
with patch("services.retention.conversation.messages_clean_service.datetime") as mock_datetime:
|
||||
fixed_now = datetime.datetime(2024, 6, 15, 10, 30, 0)
|
||||
mock_datetime.datetime.now.return_value = fixed_now
|
||||
mock_datetime.timedelta = datetime.timedelta
|
||||
|
||||
service = MessagesCleanService.from_days(policy=policy)
|
||||
|
||||
# Assert
|
||||
expected_end_before = fixed_now - datetime.timedelta(days=30) # default days=30
|
||||
assert service._end_before == expected_end_before
|
||||
assert service._batch_size == 1000 # default
|
||||
assert service._dry_run is False # default
|
||||
Reference in New Issue
Block a user