mirror of
https://github.com/langgenius/dify.git
synced 2026-03-09 17:25:10 +00:00
Compare commits
9 Commits
feat/enter
...
deploy/cle
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
232b8eb248 | ||
|
|
4c0d81029f | ||
|
|
bacf70c00a | ||
|
|
942729ba48 | ||
|
|
4f5af0b43c | ||
|
|
a28cb993b8 | ||
|
|
c6dd2ef25a | ||
|
|
b0e8becd14 | ||
|
|
f90e0d781a |
@@ -133,7 +133,7 @@ Star Dify on GitHub and be instantly notified of new releases.
|
||||
|
||||
### Custom configurations
|
||||
|
||||
If you need to customize the configuration, please refer to the comments in our [.env.example](docker/.env.example) file and update the corresponding values in your `.env` file. Additionally, you might need to make adjustments to the `docker-compose.yaml` file itself, such as changing image versions, port mappings, or volume mounts, based on your specific deployment environment and requirements. After making any changes, please re-run `docker compose up -d`. You can find the full list of available environment variables [here](https://docs.dify.ai/getting-started/install-self-hosted/environments).
|
||||
If you need to customize the configuration, please refer to the comments in our [.env.example](docker/.env.example) file and update the corresponding values in your `.env` file. Additionally, you might need to make adjustments to the `docker-compose.yaml` file itself, such as changing image versions, port mappings, or volume mounts, based on your specific deployment environment and requirements. After making any changes, please re-run `docker-compose up -d`. You can find the full list of available environment variables [here](https://docs.dify.ai/getting-started/install-self-hosted/environments).
|
||||
|
||||
#### Customizing Suggested Questions
|
||||
|
||||
|
||||
@@ -937,6 +937,12 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[
|
||||
is_flag=True,
|
||||
help="Preview cleanup results without deleting any workflow run data.",
|
||||
)
|
||||
@click.option(
|
||||
"--task-label",
|
||||
default="daily",
|
||||
show_default=True,
|
||||
help="Stable label value used to distinguish multiple cleanup CronJobs in metrics.",
|
||||
)
|
||||
def clean_workflow_runs(
|
||||
before_days: int,
|
||||
batch_size: int,
|
||||
@@ -945,10 +951,13 @@ def clean_workflow_runs(
|
||||
start_from: datetime.datetime | None,
|
||||
end_before: datetime.datetime | None,
|
||||
dry_run: bool,
|
||||
task_label: str,
|
||||
):
|
||||
"""
|
||||
Clean workflow runs and related workflow data for free tenants.
|
||||
"""
|
||||
from extensions.otel.runtime import flush_telemetry
|
||||
|
||||
if (start_from is None) ^ (end_before is None):
|
||||
raise click.UsageError("--start-from and --end-before must be provided together.")
|
||||
|
||||
@@ -968,13 +977,17 @@ def clean_workflow_runs(
|
||||
start_time = datetime.datetime.now(datetime.UTC)
|
||||
click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white"))
|
||||
|
||||
WorkflowRunCleanup(
|
||||
days=before_days,
|
||||
batch_size=batch_size,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
dry_run=dry_run,
|
||||
).run()
|
||||
try:
|
||||
WorkflowRunCleanup(
|
||||
days=before_days,
|
||||
batch_size=batch_size,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
dry_run=dry_run,
|
||||
task_label=task_label,
|
||||
).run()
|
||||
finally:
|
||||
flush_telemetry()
|
||||
|
||||
end_time = datetime.datetime.now(datetime.UTC)
|
||||
elapsed = end_time - start_time
|
||||
@@ -2630,6 +2643,12 @@ def migrate_oss(
|
||||
help="Graceful period in days after subscription expiration, will be ignored when billing is disabled.",
|
||||
)
|
||||
@click.option("--dry-run", is_flag=True, default=False, help="Show messages logs would be cleaned without deleting")
|
||||
@click.option(
|
||||
"--task-label",
|
||||
default="daily",
|
||||
show_default=True,
|
||||
help="Stable label value used to distinguish multiple cleanup CronJobs in metrics.",
|
||||
)
|
||||
def clean_expired_messages(
|
||||
batch_size: int,
|
||||
graceful_period: int,
|
||||
@@ -2638,10 +2657,13 @@ def clean_expired_messages(
|
||||
from_days_ago: int | None,
|
||||
before_days: int | None,
|
||||
dry_run: bool,
|
||||
task_label: str,
|
||||
):
|
||||
"""
|
||||
Clean expired messages and related data for tenants based on clean policy.
|
||||
"""
|
||||
from extensions.otel.runtime import flush_telemetry
|
||||
|
||||
click.echo(click.style("clean_messages: start clean messages.", fg="green"))
|
||||
|
||||
start_at = time.perf_counter()
|
||||
@@ -2691,6 +2713,7 @@ def clean_expired_messages(
|
||||
end_before=end_before,
|
||||
batch_size=batch_size,
|
||||
dry_run=dry_run,
|
||||
task_label=task_label,
|
||||
)
|
||||
elif from_days_ago is None:
|
||||
assert before_days is not None
|
||||
@@ -2699,6 +2722,7 @@ def clean_expired_messages(
|
||||
days=before_days,
|
||||
batch_size=batch_size,
|
||||
dry_run=dry_run,
|
||||
task_label=task_label,
|
||||
)
|
||||
else:
|
||||
assert before_days is not None
|
||||
@@ -2710,6 +2734,7 @@ def clean_expired_messages(
|
||||
end_before=now - datetime.timedelta(days=before_days),
|
||||
batch_size=batch_size,
|
||||
dry_run=dry_run,
|
||||
task_label=task_label,
|
||||
)
|
||||
stats = service.run()
|
||||
|
||||
@@ -2735,6 +2760,8 @@ def clean_expired_messages(
|
||||
)
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
flush_telemetry()
|
||||
|
||||
click.echo(click.style("messages cleanup completed.", fg="green"))
|
||||
|
||||
|
||||
@@ -18,7 +18,3 @@ class EnterpriseFeatureConfig(BaseSettings):
|
||||
description="Allow customization of the enterprise logo.",
|
||||
default=False,
|
||||
)
|
||||
|
||||
ENTERPRISE_REQUEST_TIMEOUT: int = Field(
|
||||
ge=1, description="Maximum timeout in seconds for enterprise requests", default=5
|
||||
)
|
||||
|
||||
@@ -158,7 +158,6 @@ class PluginEntity(PluginInstallation):
|
||||
name: str
|
||||
installation_id: str
|
||||
version: str
|
||||
plugin_unique_identifier: str
|
||||
|
||||
@model_validator(mode="after")
|
||||
def set_plugin_id(self):
|
||||
|
||||
@@ -5,7 +5,7 @@ from typing import Union
|
||||
|
||||
from celery.signals import worker_init
|
||||
from flask_login import user_loaded_from_request, user_logged_in
|
||||
from opentelemetry import trace
|
||||
from opentelemetry import metrics, trace
|
||||
from opentelemetry.propagate import set_global_textmap
|
||||
from opentelemetry.propagators.b3 import B3Format
|
||||
from opentelemetry.propagators.composite import CompositePropagator
|
||||
@@ -31,9 +31,29 @@ def setup_context_propagation() -> None:
|
||||
|
||||
|
||||
def shutdown_tracer() -> None:
|
||||
flush_telemetry()
|
||||
|
||||
|
||||
def flush_telemetry() -> None:
|
||||
"""
|
||||
Best-effort flush for telemetry providers.
|
||||
|
||||
This is mainly used by short-lived command processes (e.g. Kubernetes CronJob)
|
||||
so counters/histograms are exported before the process exits.
|
||||
"""
|
||||
provider = trace.get_tracer_provider()
|
||||
if hasattr(provider, "force_flush"):
|
||||
provider.force_flush()
|
||||
try:
|
||||
provider.force_flush()
|
||||
except Exception:
|
||||
logger.exception("otel: failed to flush trace provider")
|
||||
|
||||
metric_provider = metrics.get_meter_provider()
|
||||
if hasattr(metric_provider, "force_flush"):
|
||||
try:
|
||||
metric_provider.force_flush()
|
||||
except Exception:
|
||||
logger.exception("otel: failed to flush metric provider")
|
||||
|
||||
|
||||
def is_celery_worker():
|
||||
|
||||
@@ -3,7 +3,6 @@ import logging
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from configs import dify_config
|
||||
from services.enterprise.base import EnterprisePluginManagerRequest
|
||||
from services.errors.base import BaseServiceError
|
||||
|
||||
@@ -29,11 +28,6 @@ class CheckCredentialPolicyComplianceRequest(BaseModel):
|
||||
return data
|
||||
|
||||
|
||||
class PreUninstallPluginRequest(BaseModel):
|
||||
tenant_id: str
|
||||
plugin_unique_identifier: str
|
||||
|
||||
|
||||
class CredentialPolicyViolationError(BaseServiceError):
|
||||
pass
|
||||
|
||||
@@ -61,24 +55,3 @@ class PluginManagerService:
|
||||
body.dify_credential_id,
|
||||
ret.get("result", False),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def try_pre_uninstall_plugin(cls, body: PreUninstallPluginRequest):
|
||||
try:
|
||||
# the invocation must be synchronous.
|
||||
EnterprisePluginManagerRequest.send_request( # pyright: ignore[reportUnknownMemberType]
|
||||
"POST",
|
||||
"/pre-uninstall-plugin",
|
||||
json=body.model_dump(), # pyright: ignore[reportUnknownArgumentType, reportUnknownMemberType]
|
||||
raise_for_status=True,
|
||||
timeout=dify_config.ENTERPRISE_REQUEST_TIMEOUT,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"""
|
||||
failed to perform pre uninstall plugin hook. tenant_id: %s, plugin_unique_identifier: %s,
|
||||
this may cause plugin to be automatically garbage collected
|
||||
""",
|
||||
body.tenant_id,
|
||||
body.plugin_unique_identifier,
|
||||
)
|
||||
|
||||
@@ -32,10 +32,6 @@ from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.provider import Provider, ProviderCredential
|
||||
from models.provider_ids import GenericProviderID
|
||||
from services.enterprise.plugin_manager_service import (
|
||||
PluginManagerService,
|
||||
PreUninstallPluginRequest,
|
||||
)
|
||||
from services.errors.plugin import PluginInstallationForbiddenError
|
||||
from services.feature_service import FeatureService, PluginInstallationScope
|
||||
|
||||
@@ -523,13 +519,6 @@ class PluginService:
|
||||
if not plugin:
|
||||
return manager.uninstall(tenant_id, plugin_installation_id)
|
||||
|
||||
if dify_config.ENTERPRISE_ENABLED:
|
||||
PluginManagerService.try_pre_uninstall_plugin(
|
||||
PreUninstallPluginRequest(
|
||||
tenant_id=tenant_id,
|
||||
plugin_unique_identifier=plugin.plugin_unique_identifier,
|
||||
)
|
||||
)
|
||||
with Session(db.engine) as session, session.begin():
|
||||
plugin_id = plugin.plugin_id
|
||||
logger.info("Deleting credentials for plugin: %s", plugin_id)
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from collections.abc import Sequence
|
||||
from typing import cast
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import delete, select, tuple_
|
||||
from sqlalchemy.engine import CursorResult
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from configs import dify_config
|
||||
from extensions.ext_database import db
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from models.model import (
|
||||
@@ -33,6 +33,128 @@ from services.retention.conversation.messages_clean_policy import (
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from opentelemetry.metrics import Counter, Histogram
|
||||
|
||||
|
||||
class MessagesCleanupMetrics:
|
||||
"""
|
||||
Records low-cardinality OpenTelemetry metrics for expired message cleanup jobs.
|
||||
|
||||
We keep labels stable (dry_run/window_mode/task_label/status) so these metrics remain
|
||||
dashboard-friendly for long-running CronJob executions.
|
||||
"""
|
||||
|
||||
_job_runs_total: "Counter | None"
|
||||
_batches_total: "Counter | None"
|
||||
_messages_scanned_total: "Counter | None"
|
||||
_messages_filtered_total: "Counter | None"
|
||||
_messages_deleted_total: "Counter | None"
|
||||
_job_duration_seconds: "Histogram | None"
|
||||
_batch_duration_seconds: "Histogram | None"
|
||||
_base_attributes: dict[str, str]
|
||||
|
||||
def __init__(self, *, dry_run: bool, has_window: bool, task_label: str) -> None:
|
||||
self._job_runs_total = None
|
||||
self._batches_total = None
|
||||
self._messages_scanned_total = None
|
||||
self._messages_filtered_total = None
|
||||
self._messages_deleted_total = None
|
||||
self._job_duration_seconds = None
|
||||
self._batch_duration_seconds = None
|
||||
self._base_attributes = {
|
||||
"job_name": "messages_cleanup",
|
||||
"dry_run": str(dry_run).lower(),
|
||||
"window_mode": "between" if has_window else "before_cutoff",
|
||||
"task_label": task_label,
|
||||
}
|
||||
self._init_instruments()
|
||||
|
||||
def _init_instruments(self) -> None:
|
||||
try:
|
||||
from opentelemetry.metrics import get_meter
|
||||
|
||||
meter = get_meter("messages_cleanup", version=dify_config.project.version)
|
||||
self._job_runs_total = meter.create_counter(
|
||||
"messages_cleanup_jobs_total",
|
||||
description="Total number of expired message cleanup jobs by status.",
|
||||
unit="{job}",
|
||||
)
|
||||
self._batches_total = meter.create_counter(
|
||||
"messages_cleanup_batches_total",
|
||||
description="Total number of message cleanup batches processed.",
|
||||
unit="{batch}",
|
||||
)
|
||||
self._messages_scanned_total = meter.create_counter(
|
||||
"messages_cleanup_scanned_messages_total",
|
||||
description="Total messages scanned by cleanup jobs.",
|
||||
unit="{message}",
|
||||
)
|
||||
self._messages_filtered_total = meter.create_counter(
|
||||
"messages_cleanup_filtered_messages_total",
|
||||
description="Total messages selected by cleanup policy.",
|
||||
unit="{message}",
|
||||
)
|
||||
self._messages_deleted_total = meter.create_counter(
|
||||
"messages_cleanup_deleted_messages_total",
|
||||
description="Total messages deleted by cleanup jobs.",
|
||||
unit="{message}",
|
||||
)
|
||||
self._job_duration_seconds = meter.create_histogram(
|
||||
"messages_cleanup_job_duration_seconds",
|
||||
description="Duration of expired message cleanup jobs in seconds.",
|
||||
unit="s",
|
||||
)
|
||||
self._batch_duration_seconds = meter.create_histogram(
|
||||
"messages_cleanup_batch_duration_seconds",
|
||||
description="Duration of expired message cleanup batch processing in seconds.",
|
||||
unit="s",
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("messages_cleanup_metrics: failed to initialize instruments")
|
||||
|
||||
def _attrs(self, **extra: str) -> dict[str, str]:
|
||||
return {**self._base_attributes, **extra}
|
||||
|
||||
@staticmethod
|
||||
def _add(counter: "Counter | None", value: int, attributes: dict[str, str]) -> None:
|
||||
if not counter or value <= 0:
|
||||
return
|
||||
try:
|
||||
counter.add(value, attributes)
|
||||
except Exception:
|
||||
logger.exception("messages_cleanup_metrics: failed to add counter value")
|
||||
|
||||
@staticmethod
|
||||
def _record(histogram: "Histogram | None", value: float, attributes: dict[str, str]) -> None:
|
||||
if not histogram:
|
||||
return
|
||||
try:
|
||||
histogram.record(value, attributes)
|
||||
except Exception:
|
||||
logger.exception("messages_cleanup_metrics: failed to record histogram value")
|
||||
|
||||
def record_batch(
|
||||
self,
|
||||
*,
|
||||
scanned_messages: int,
|
||||
filtered_messages: int,
|
||||
deleted_messages: int,
|
||||
batch_duration_seconds: float,
|
||||
) -> None:
|
||||
attributes = self._attrs()
|
||||
self._add(self._batches_total, 1, attributes)
|
||||
self._add(self._messages_scanned_total, scanned_messages, attributes)
|
||||
self._add(self._messages_filtered_total, filtered_messages, attributes)
|
||||
self._add(self._messages_deleted_total, deleted_messages, attributes)
|
||||
self._record(self._batch_duration_seconds, batch_duration_seconds, attributes)
|
||||
|
||||
def record_completion(self, *, status: str, job_duration_seconds: float) -> None:
|
||||
attributes = self._attrs(status=status)
|
||||
self._add(self._job_runs_total, 1, attributes)
|
||||
self._record(self._job_duration_seconds, job_duration_seconds, attributes)
|
||||
|
||||
|
||||
class MessagesCleanService:
|
||||
"""
|
||||
Service for cleaning expired messages based on retention policies.
|
||||
@@ -48,6 +170,7 @@ class MessagesCleanService:
|
||||
start_from: datetime.datetime | None = None,
|
||||
batch_size: int = 1000,
|
||||
dry_run: bool = False,
|
||||
task_label: str = "daily",
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the service with cleanup parameters.
|
||||
@@ -58,12 +181,20 @@ class MessagesCleanService:
|
||||
start_from: Optional start time (inclusive) of the range
|
||||
batch_size: Number of messages to process per batch
|
||||
dry_run: Whether to perform a dry run (no actual deletion)
|
||||
task_label: Stable task label to distinguish multiple cleanup CronJobs
|
||||
"""
|
||||
self._policy = policy
|
||||
self._end_before = end_before
|
||||
self._start_from = start_from
|
||||
self._batch_size = batch_size
|
||||
self._dry_run = dry_run
|
||||
normalized_task_label = task_label.strip()
|
||||
self._task_label = normalized_task_label or "daily"
|
||||
self._metrics = MessagesCleanupMetrics(
|
||||
dry_run=dry_run,
|
||||
has_window=bool(start_from),
|
||||
task_label=self._task_label,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_time_range(
|
||||
@@ -73,6 +204,7 @@ class MessagesCleanService:
|
||||
end_before: datetime.datetime,
|
||||
batch_size: int = 1000,
|
||||
dry_run: bool = False,
|
||||
task_label: str = "daily",
|
||||
) -> "MessagesCleanService":
|
||||
"""
|
||||
Create a service instance for cleaning messages within a specific time range.
|
||||
@@ -85,6 +217,7 @@ class MessagesCleanService:
|
||||
end_before: End time (exclusive) of the range
|
||||
batch_size: Number of messages to process per batch
|
||||
dry_run: Whether to perform a dry run (no actual deletion)
|
||||
task_label: Stable task label to distinguish multiple cleanup CronJobs
|
||||
|
||||
Returns:
|
||||
MessagesCleanService instance
|
||||
@@ -112,6 +245,7 @@ class MessagesCleanService:
|
||||
start_from=start_from,
|
||||
batch_size=batch_size,
|
||||
dry_run=dry_run,
|
||||
task_label=task_label,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@@ -121,6 +255,7 @@ class MessagesCleanService:
|
||||
days: int = 30,
|
||||
batch_size: int = 1000,
|
||||
dry_run: bool = False,
|
||||
task_label: str = "daily",
|
||||
) -> "MessagesCleanService":
|
||||
"""
|
||||
Create a service instance for cleaning messages older than specified days.
|
||||
@@ -130,6 +265,7 @@ class MessagesCleanService:
|
||||
days: Number of days to look back from now
|
||||
batch_size: Number of messages to process per batch
|
||||
dry_run: Whether to perform a dry run (no actual deletion)
|
||||
task_label: Stable task label to distinguish multiple cleanup CronJobs
|
||||
|
||||
Returns:
|
||||
MessagesCleanService instance
|
||||
@@ -153,7 +289,14 @@ class MessagesCleanService:
|
||||
policy.__class__.__name__,
|
||||
)
|
||||
|
||||
return cls(policy=policy, end_before=end_before, start_from=None, batch_size=batch_size, dry_run=dry_run)
|
||||
return cls(
|
||||
policy=policy,
|
||||
end_before=end_before,
|
||||
start_from=None,
|
||||
batch_size=batch_size,
|
||||
dry_run=dry_run,
|
||||
task_label=task_label,
|
||||
)
|
||||
|
||||
def run(self) -> dict[str, int]:
|
||||
"""
|
||||
@@ -162,7 +305,18 @@ class MessagesCleanService:
|
||||
Returns:
|
||||
Dict with statistics: batches, filtered_messages, total_deleted
|
||||
"""
|
||||
return self._clean_messages_by_time_range()
|
||||
status = "success"
|
||||
run_start = time.monotonic()
|
||||
try:
|
||||
return self._clean_messages_by_time_range()
|
||||
except Exception:
|
||||
status = "failed"
|
||||
raise
|
||||
finally:
|
||||
self._metrics.record_completion(
|
||||
status=status,
|
||||
job_duration_seconds=time.monotonic() - run_start,
|
||||
)
|
||||
|
||||
def _clean_messages_by_time_range(self) -> dict[str, int]:
|
||||
"""
|
||||
@@ -197,11 +351,14 @@ class MessagesCleanService:
|
||||
self._end_before,
|
||||
)
|
||||
|
||||
max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200))
|
||||
max_batch_interval_ms = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL
|
||||
|
||||
while True:
|
||||
stats["batches"] += 1
|
||||
batch_start = time.monotonic()
|
||||
batch_scanned_messages = 0
|
||||
batch_filtered_messages = 0
|
||||
batch_deleted_messages = 0
|
||||
|
||||
# Step 1: Fetch a batch of messages using cursor
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
@@ -240,9 +397,16 @@ class MessagesCleanService:
|
||||
|
||||
# Track total messages fetched across all batches
|
||||
stats["total_messages"] += len(messages)
|
||||
batch_scanned_messages = len(messages)
|
||||
|
||||
if not messages:
|
||||
logger.info("clean_messages (batch %s): no more messages to process", stats["batches"])
|
||||
self._metrics.record_batch(
|
||||
scanned_messages=batch_scanned_messages,
|
||||
filtered_messages=batch_filtered_messages,
|
||||
deleted_messages=batch_deleted_messages,
|
||||
batch_duration_seconds=time.monotonic() - batch_start,
|
||||
)
|
||||
break
|
||||
|
||||
# Update cursor to the last message's (created_at, id)
|
||||
@@ -268,6 +432,12 @@ class MessagesCleanService:
|
||||
|
||||
if not apps:
|
||||
logger.info("clean_messages (batch %s): no apps found, skip", stats["batches"])
|
||||
self._metrics.record_batch(
|
||||
scanned_messages=batch_scanned_messages,
|
||||
filtered_messages=batch_filtered_messages,
|
||||
deleted_messages=batch_deleted_messages,
|
||||
batch_duration_seconds=time.monotonic() - batch_start,
|
||||
)
|
||||
continue
|
||||
|
||||
# Build app_id -> tenant_id mapping
|
||||
@@ -286,9 +456,16 @@ class MessagesCleanService:
|
||||
|
||||
if not message_ids_to_delete:
|
||||
logger.info("clean_messages (batch %s): no messages to delete, skip", stats["batches"])
|
||||
self._metrics.record_batch(
|
||||
scanned_messages=batch_scanned_messages,
|
||||
filtered_messages=batch_filtered_messages,
|
||||
deleted_messages=batch_deleted_messages,
|
||||
batch_duration_seconds=time.monotonic() - batch_start,
|
||||
)
|
||||
continue
|
||||
|
||||
stats["filtered_messages"] += len(message_ids_to_delete)
|
||||
batch_filtered_messages = len(message_ids_to_delete)
|
||||
|
||||
# Step 4: Batch delete messages and their relations
|
||||
if not self._dry_run:
|
||||
@@ -309,6 +486,7 @@ class MessagesCleanService:
|
||||
commit_ms = int((time.monotonic() - commit_start) * 1000)
|
||||
|
||||
stats["total_deleted"] += messages_deleted
|
||||
batch_deleted_messages = messages_deleted
|
||||
|
||||
logger.info(
|
||||
"clean_messages (batch %s): processed %s messages, deleted %s messages",
|
||||
@@ -343,6 +521,13 @@ class MessagesCleanService:
|
||||
for msg_id in sampled_ids:
|
||||
logger.info("clean_messages (batch %s, dry_run) sample: message_id=%s", stats["batches"], msg_id)
|
||||
|
||||
self._metrics.record_batch(
|
||||
scanned_messages=batch_scanned_messages,
|
||||
filtered_messages=batch_filtered_messages,
|
||||
deleted_messages=batch_deleted_messages,
|
||||
batch_duration_seconds=time.monotonic() - batch_start,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"clean_messages completed: total batches: %s, total messages: %s, filtered messages: %s, total deleted: %s",
|
||||
stats["batches"],
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from collections.abc import Iterable, Sequence
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import click
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
@@ -20,6 +20,156 @@ from services.billing_service import BillingService, SubscriptionPlan
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from opentelemetry.metrics import Counter, Histogram
|
||||
|
||||
|
||||
class WorkflowRunCleanupMetrics:
|
||||
"""
|
||||
Records low-cardinality OpenTelemetry metrics for workflow run cleanup jobs.
|
||||
|
||||
Metrics are emitted with stable labels only (dry_run/window_mode/task_label/status)
|
||||
to keep dashboard and alert cardinality predictable in production clusters.
|
||||
"""
|
||||
|
||||
_job_runs_total: "Counter | None"
|
||||
_batches_total: "Counter | None"
|
||||
_runs_scanned_total: "Counter | None"
|
||||
_runs_targeted_total: "Counter | None"
|
||||
_runs_deleted_total: "Counter | None"
|
||||
_runs_skipped_total: "Counter | None"
|
||||
_related_records_total: "Counter | None"
|
||||
_job_duration_seconds: "Histogram | None"
|
||||
_batch_duration_seconds: "Histogram | None"
|
||||
_base_attributes: dict[str, str]
|
||||
|
||||
def __init__(self, *, dry_run: bool, has_window: bool, task_label: str) -> None:
|
||||
self._job_runs_total = None
|
||||
self._batches_total = None
|
||||
self._runs_scanned_total = None
|
||||
self._runs_targeted_total = None
|
||||
self._runs_deleted_total = None
|
||||
self._runs_skipped_total = None
|
||||
self._related_records_total = None
|
||||
self._job_duration_seconds = None
|
||||
self._batch_duration_seconds = None
|
||||
self._base_attributes = {
|
||||
"job_name": "workflow_run_cleanup",
|
||||
"dry_run": str(dry_run).lower(),
|
||||
"window_mode": "between" if has_window else "before_cutoff",
|
||||
"task_label": task_label,
|
||||
}
|
||||
self._init_instruments()
|
||||
|
||||
def _init_instruments(self) -> None:
|
||||
try:
|
||||
from opentelemetry.metrics import get_meter
|
||||
|
||||
meter = get_meter("workflow_run_cleanup", version=dify_config.project.version)
|
||||
self._job_runs_total = meter.create_counter(
|
||||
"workflow_run_cleanup_jobs_total",
|
||||
description="Total number of workflow run cleanup jobs by status.",
|
||||
unit="{job}",
|
||||
)
|
||||
self._batches_total = meter.create_counter(
|
||||
"workflow_run_cleanup_batches_total",
|
||||
description="Total number of processed cleanup batches.",
|
||||
unit="{batch}",
|
||||
)
|
||||
self._runs_scanned_total = meter.create_counter(
|
||||
"workflow_run_cleanup_scanned_runs_total",
|
||||
description="Total workflow runs scanned by cleanup jobs.",
|
||||
unit="{run}",
|
||||
)
|
||||
self._runs_targeted_total = meter.create_counter(
|
||||
"workflow_run_cleanup_targeted_runs_total",
|
||||
description="Total workflow runs targeted by cleanup policy.",
|
||||
unit="{run}",
|
||||
)
|
||||
self._runs_deleted_total = meter.create_counter(
|
||||
"workflow_run_cleanup_deleted_runs_total",
|
||||
description="Total workflow runs deleted by cleanup jobs.",
|
||||
unit="{run}",
|
||||
)
|
||||
self._runs_skipped_total = meter.create_counter(
|
||||
"workflow_run_cleanup_skipped_runs_total",
|
||||
description="Total workflow runs skipped because tenant is paid/unknown.",
|
||||
unit="{run}",
|
||||
)
|
||||
self._related_records_total = meter.create_counter(
|
||||
"workflow_run_cleanup_related_records_total",
|
||||
description="Total related records processed by cleanup jobs.",
|
||||
unit="{record}",
|
||||
)
|
||||
self._job_duration_seconds = meter.create_histogram(
|
||||
"workflow_run_cleanup_job_duration_seconds",
|
||||
description="Duration of workflow run cleanup jobs in seconds.",
|
||||
unit="s",
|
||||
)
|
||||
self._batch_duration_seconds = meter.create_histogram(
|
||||
"workflow_run_cleanup_batch_duration_seconds",
|
||||
description="Duration of workflow run cleanup batch processing in seconds.",
|
||||
unit="s",
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("workflow_run_cleanup_metrics: failed to initialize instruments")
|
||||
|
||||
def _attrs(self, **extra: str) -> dict[str, str]:
|
||||
return {**self._base_attributes, **extra}
|
||||
|
||||
@staticmethod
|
||||
def _add(counter: "Counter | None", value: int, attributes: dict[str, str]) -> None:
|
||||
if not counter or value <= 0:
|
||||
return
|
||||
try:
|
||||
counter.add(value, attributes)
|
||||
except Exception:
|
||||
logger.exception("workflow_run_cleanup_metrics: failed to add counter value")
|
||||
|
||||
@staticmethod
|
||||
def _record(histogram: "Histogram | None", value: float, attributes: dict[str, str]) -> None:
|
||||
if not histogram:
|
||||
return
|
||||
try:
|
||||
histogram.record(value, attributes)
|
||||
except Exception:
|
||||
logger.exception("workflow_run_cleanup_metrics: failed to record histogram value")
|
||||
|
||||
def record_batch(
|
||||
self,
|
||||
*,
|
||||
batch_rows: int,
|
||||
targeted_runs: int,
|
||||
skipped_runs: int,
|
||||
deleted_runs: int,
|
||||
related_counts: dict[str, int] | None,
|
||||
related_action: str | None,
|
||||
batch_duration_seconds: float,
|
||||
) -> None:
|
||||
attributes = self._attrs()
|
||||
self._add(self._batches_total, 1, attributes)
|
||||
self._add(self._runs_scanned_total, batch_rows, attributes)
|
||||
self._add(self._runs_targeted_total, targeted_runs, attributes)
|
||||
self._add(self._runs_skipped_total, skipped_runs, attributes)
|
||||
self._add(self._runs_deleted_total, deleted_runs, attributes)
|
||||
self._record(self._batch_duration_seconds, batch_duration_seconds, attributes)
|
||||
|
||||
if not related_counts or not related_action:
|
||||
return
|
||||
|
||||
for record_type, count in related_counts.items():
|
||||
self._add(
|
||||
self._related_records_total,
|
||||
count,
|
||||
self._attrs(action=related_action, record_type=record_type),
|
||||
)
|
||||
|
||||
def record_completion(self, *, status: str, job_duration_seconds: float) -> None:
|
||||
attributes = self._attrs(status=status)
|
||||
self._add(self._job_runs_total, 1, attributes)
|
||||
self._record(self._job_duration_seconds, job_duration_seconds, attributes)
|
||||
|
||||
|
||||
class WorkflowRunCleanup:
|
||||
def __init__(
|
||||
self,
|
||||
@@ -29,6 +179,7 @@ class WorkflowRunCleanup:
|
||||
end_before: datetime.datetime | None = None,
|
||||
workflow_run_repo: APIWorkflowRunRepository | None = None,
|
||||
dry_run: bool = False,
|
||||
task_label: str = "daily",
|
||||
):
|
||||
if (start_from is None) ^ (end_before is None):
|
||||
raise ValueError("start_from and end_before must be both set or both omitted.")
|
||||
@@ -46,6 +197,13 @@ class WorkflowRunCleanup:
|
||||
self.batch_size = batch_size
|
||||
self._cleanup_whitelist: set[str] | None = None
|
||||
self.dry_run = dry_run
|
||||
normalized_task_label = task_label.strip()
|
||||
self.task_label = normalized_task_label or "daily"
|
||||
self._metrics = WorkflowRunCleanupMetrics(
|
||||
dry_run=dry_run,
|
||||
has_window=bool(start_from),
|
||||
task_label=self.task_label,
|
||||
)
|
||||
self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD
|
||||
self.workflow_run_repo: APIWorkflowRunRepository
|
||||
if workflow_run_repo:
|
||||
@@ -74,153 +232,193 @@ class WorkflowRunCleanup:
|
||||
related_totals = self._empty_related_counts() if self.dry_run else None
|
||||
batch_index = 0
|
||||
last_seen: tuple[datetime.datetime, str] | None = None
|
||||
status = "success"
|
||||
run_start = time.monotonic()
|
||||
max_batch_interval_ms = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL
|
||||
|
||||
max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200))
|
||||
try:
|
||||
while True:
|
||||
batch_start = time.monotonic()
|
||||
|
||||
while True:
|
||||
batch_start = time.monotonic()
|
||||
|
||||
fetch_start = time.monotonic()
|
||||
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
|
||||
start_from=self.window_start,
|
||||
end_before=self.window_end,
|
||||
last_seen=last_seen,
|
||||
batch_size=self.batch_size,
|
||||
)
|
||||
if not run_rows:
|
||||
logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1)
|
||||
break
|
||||
|
||||
batch_index += 1
|
||||
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s): fetched %s rows in %sms",
|
||||
batch_index,
|
||||
len(run_rows),
|
||||
int((time.monotonic() - fetch_start) * 1000),
|
||||
)
|
||||
|
||||
tenant_ids = {row.tenant_id for row in run_rows}
|
||||
|
||||
filter_start = time.monotonic()
|
||||
free_tenants = self._filter_free_tenants(tenant_ids)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms",
|
||||
batch_index,
|
||||
len(free_tenants),
|
||||
len(tenant_ids),
|
||||
int((time.monotonic() - filter_start) * 1000),
|
||||
)
|
||||
|
||||
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
|
||||
paid_or_skipped = len(run_rows) - len(free_runs)
|
||||
|
||||
if not free_runs:
|
||||
skipped_message = (
|
||||
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)"
|
||||
fetch_start = time.monotonic()
|
||||
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
|
||||
start_from=self.window_start,
|
||||
end_before=self.window_end,
|
||||
last_seen=last_seen,
|
||||
batch_size=self.batch_size,
|
||||
)
|
||||
click.echo(
|
||||
click.style(
|
||||
skipped_message,
|
||||
fg="yellow",
|
||||
)
|
||||
)
|
||||
continue
|
||||
if not run_rows:
|
||||
logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1)
|
||||
break
|
||||
|
||||
total_runs_targeted += len(free_runs)
|
||||
|
||||
if self.dry_run:
|
||||
count_start = time.monotonic()
|
||||
batch_counts = self.workflow_run_repo.count_runs_with_related(
|
||||
free_runs,
|
||||
count_node_executions=self._count_node_executions,
|
||||
count_trigger_logs=self._count_trigger_logs,
|
||||
)
|
||||
batch_index += 1
|
||||
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms",
|
||||
"workflow_run_cleanup (batch #%s): fetched %s rows in %sms",
|
||||
batch_index,
|
||||
int((time.monotonic() - count_start) * 1000),
|
||||
len(run_rows),
|
||||
int((time.monotonic() - fetch_start) * 1000),
|
||||
)
|
||||
if related_totals is not None:
|
||||
for key in related_totals:
|
||||
related_totals[key] += batch_counts.get(key, 0)
|
||||
sample_ids = ", ".join(run.id for run in free_runs[:5])
|
||||
|
||||
tenant_ids = {row.tenant_id for row in run_rows}
|
||||
|
||||
filter_start = time.monotonic()
|
||||
free_tenants = self._filter_free_tenants(tenant_ids)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms",
|
||||
batch_index,
|
||||
len(free_tenants),
|
||||
len(tenant_ids),
|
||||
int((time.monotonic() - filter_start) * 1000),
|
||||
)
|
||||
|
||||
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
|
||||
paid_or_skipped = len(run_rows) - len(free_runs)
|
||||
|
||||
if not free_runs:
|
||||
skipped_message = (
|
||||
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)"
|
||||
)
|
||||
click.echo(
|
||||
click.style(
|
||||
skipped_message,
|
||||
fg="yellow",
|
||||
)
|
||||
)
|
||||
self._metrics.record_batch(
|
||||
batch_rows=len(run_rows),
|
||||
targeted_runs=0,
|
||||
skipped_runs=paid_or_skipped,
|
||||
deleted_runs=0,
|
||||
related_counts=None,
|
||||
related_action=None,
|
||||
batch_duration_seconds=time.monotonic() - batch_start,
|
||||
)
|
||||
continue
|
||||
|
||||
total_runs_targeted += len(free_runs)
|
||||
|
||||
if self.dry_run:
|
||||
count_start = time.monotonic()
|
||||
batch_counts = self.workflow_run_repo.count_runs_with_related(
|
||||
free_runs,
|
||||
count_node_executions=self._count_node_executions,
|
||||
count_trigger_logs=self._count_trigger_logs,
|
||||
)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms",
|
||||
batch_index,
|
||||
int((time.monotonic() - count_start) * 1000),
|
||||
)
|
||||
if related_totals is not None:
|
||||
for key in related_totals:
|
||||
related_totals[key] += batch_counts.get(key, 0)
|
||||
sample_ids = ", ".join(run.id for run in free_runs[:5])
|
||||
click.echo(
|
||||
click.style(
|
||||
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
|
||||
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
|
||||
fg="yellow",
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s, dry_run): batch total %sms",
|
||||
batch_index,
|
||||
int((time.monotonic() - batch_start) * 1000),
|
||||
)
|
||||
self._metrics.record_batch(
|
||||
batch_rows=len(run_rows),
|
||||
targeted_runs=len(free_runs),
|
||||
skipped_runs=paid_or_skipped,
|
||||
deleted_runs=0,
|
||||
related_counts={key: batch_counts.get(key, 0) for key in self._empty_related_counts()},
|
||||
related_action="would_delete",
|
||||
batch_duration_seconds=time.monotonic() - batch_start,
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
delete_start = time.monotonic()
|
||||
counts = self.workflow_run_repo.delete_runs_with_related(
|
||||
free_runs,
|
||||
delete_node_executions=self._delete_node_executions,
|
||||
delete_trigger_logs=self._delete_trigger_logs,
|
||||
)
|
||||
delete_ms = int((time.monotonic() - delete_start) * 1000)
|
||||
except Exception:
|
||||
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
|
||||
raise
|
||||
|
||||
total_runs_deleted += counts["runs"]
|
||||
click.echo(
|
||||
click.style(
|
||||
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
|
||||
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
|
||||
fg="yellow",
|
||||
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
|
||||
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
|
||||
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
|
||||
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
|
||||
f"skipped {paid_or_skipped} paid/unknown",
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s, dry_run): batch total %sms",
|
||||
"workflow_run_cleanup (batch #%s): delete %sms, batch total %sms",
|
||||
batch_index,
|
||||
delete_ms,
|
||||
int((time.monotonic() - batch_start) * 1000),
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
delete_start = time.monotonic()
|
||||
counts = self.workflow_run_repo.delete_runs_with_related(
|
||||
free_runs,
|
||||
delete_node_executions=self._delete_node_executions,
|
||||
delete_trigger_logs=self._delete_trigger_logs,
|
||||
self._metrics.record_batch(
|
||||
batch_rows=len(run_rows),
|
||||
targeted_runs=len(free_runs),
|
||||
skipped_runs=paid_or_skipped,
|
||||
deleted_runs=counts["runs"],
|
||||
related_counts={key: counts.get(key, 0) for key in self._empty_related_counts()},
|
||||
related_action="deleted",
|
||||
batch_duration_seconds=time.monotonic() - batch_start,
|
||||
)
|
||||
delete_ms = int((time.monotonic() - delete_start) * 1000)
|
||||
except Exception:
|
||||
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
|
||||
raise
|
||||
|
||||
total_runs_deleted += counts["runs"]
|
||||
click.echo(
|
||||
click.style(
|
||||
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
|
||||
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
|
||||
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
|
||||
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
|
||||
f"skipped {paid_or_skipped} paid/unknown",
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
"workflow_run_cleanup (batch #%s): delete %sms, batch total %sms",
|
||||
batch_index,
|
||||
delete_ms,
|
||||
int((time.monotonic() - batch_start) * 1000),
|
||||
)
|
||||
# Random sleep between batches to avoid overwhelming the database
|
||||
sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311
|
||||
logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms)
|
||||
time.sleep(sleep_ms / 1000)
|
||||
|
||||
# Random sleep between batches to avoid overwhelming the database
|
||||
sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311
|
||||
logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms)
|
||||
time.sleep(sleep_ms / 1000)
|
||||
|
||||
if self.dry_run:
|
||||
if self.window_start:
|
||||
summary_message = (
|
||||
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
|
||||
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
|
||||
)
|
||||
if self.dry_run:
|
||||
if self.window_start:
|
||||
summary_message = (
|
||||
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
|
||||
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
|
||||
)
|
||||
else:
|
||||
summary_message = (
|
||||
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
|
||||
f"before {self.window_end.isoformat()}"
|
||||
)
|
||||
if related_totals is not None:
|
||||
summary_message = (
|
||||
f"{summary_message}; related records: {self._format_related_counts(related_totals)}"
|
||||
)
|
||||
summary_color = "yellow"
|
||||
else:
|
||||
summary_message = (
|
||||
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
|
||||
f"before {self.window_end.isoformat()}"
|
||||
)
|
||||
if related_totals is not None:
|
||||
summary_message = f"{summary_message}; related records: {self._format_related_counts(related_totals)}"
|
||||
summary_color = "yellow"
|
||||
else:
|
||||
if self.window_start:
|
||||
summary_message = (
|
||||
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
|
||||
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
|
||||
)
|
||||
else:
|
||||
summary_message = (
|
||||
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs before {self.window_end.isoformat()}"
|
||||
)
|
||||
summary_color = "white"
|
||||
if self.window_start:
|
||||
summary_message = (
|
||||
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
|
||||
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
|
||||
)
|
||||
else:
|
||||
summary_message = (
|
||||
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
|
||||
f"before {self.window_end.isoformat()}"
|
||||
)
|
||||
summary_color = "white"
|
||||
|
||||
click.echo(click.style(summary_message, fg=summary_color))
|
||||
click.echo(click.style(summary_message, fg=summary_color))
|
||||
except Exception:
|
||||
status = "failed"
|
||||
raise
|
||||
finally:
|
||||
self._metrics.record_completion(
|
||||
status=status,
|
||||
job_duration_seconds=time.monotonic() - run_start,
|
||||
)
|
||||
|
||||
def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]:
|
||||
tenant_id_list = list(tenant_ids)
|
||||
|
||||
@@ -38,6 +38,7 @@ def test_absolute_mode_calls_from_time_range():
|
||||
from_days_ago=None,
|
||||
before_days=None,
|
||||
dry_run=True,
|
||||
task_label="daily",
|
||||
)
|
||||
|
||||
mock_from_time_range.assert_called_once_with(
|
||||
@@ -46,6 +47,7 @@ def test_absolute_mode_calls_from_time_range():
|
||||
end_before=end_before,
|
||||
batch_size=200,
|
||||
dry_run=True,
|
||||
task_label="daily",
|
||||
)
|
||||
mock_from_days.assert_not_called()
|
||||
|
||||
@@ -67,6 +69,7 @@ def test_relative_mode_before_days_only_calls_from_days():
|
||||
from_days_ago=None,
|
||||
before_days=30,
|
||||
dry_run=False,
|
||||
task_label="daily",
|
||||
)
|
||||
|
||||
mock_from_days.assert_called_once_with(
|
||||
@@ -74,6 +77,7 @@ def test_relative_mode_before_days_only_calls_from_days():
|
||||
days=30,
|
||||
batch_size=500,
|
||||
dry_run=False,
|
||||
task_label="daily",
|
||||
)
|
||||
mock_from_time_range.assert_not_called()
|
||||
|
||||
@@ -97,6 +101,7 @@ def test_relative_mode_with_from_days_ago_calls_from_time_range():
|
||||
from_days_ago=60,
|
||||
before_days=30,
|
||||
dry_run=False,
|
||||
task_label="daily",
|
||||
)
|
||||
|
||||
mock_from_time_range.assert_called_once_with(
|
||||
@@ -105,6 +110,7 @@ def test_relative_mode_with_from_days_ago_calls_from_time_range():
|
||||
end_before=fixed_now - datetime.timedelta(days=30),
|
||||
batch_size=1000,
|
||||
dry_run=False,
|
||||
task_label="daily",
|
||||
)
|
||||
mock_from_days.assert_not_called()
|
||||
|
||||
@@ -178,4 +184,5 @@ def test_invalid_inputs_raise_usage_error(kwargs: dict, message: str):
|
||||
from_days_ago=kwargs["from_days_ago"],
|
||||
before_days=kwargs["before_days"],
|
||||
dry_run=False,
|
||||
task_label="daily",
|
||||
)
|
||||
|
||||
@@ -269,7 +269,6 @@ class TestPluginLoading:
|
||||
id="task-123",
|
||||
created_at=datetime.datetime.now(),
|
||||
updated_at=datetime.datetime.now(),
|
||||
plugin_unique_identifier="test-org/test-plugin/1.0.0",
|
||||
status=PluginInstallTaskStatus.Running,
|
||||
total_plugins=3,
|
||||
completed_plugins=1,
|
||||
@@ -721,7 +720,6 @@ class TestPluginTaskManagement:
|
||||
id="task-1",
|
||||
created_at=datetime.datetime.now(),
|
||||
updated_at=datetime.datetime.now(),
|
||||
plugin_unique_identifier="test-org/test-plugin-a/1.0.0",
|
||||
status=PluginInstallTaskStatus.Running,
|
||||
total_plugins=2,
|
||||
completed_plugins=1,
|
||||
@@ -731,7 +729,6 @@ class TestPluginTaskManagement:
|
||||
id="task-2",
|
||||
created_at=datetime.datetime.now(),
|
||||
updated_at=datetime.datetime.now(),
|
||||
plugin_unique_identifier="test-org/test-plugin-b/1.0.0",
|
||||
status=PluginInstallTaskStatus.Success,
|
||||
total_plugins=1,
|
||||
completed_plugins=1,
|
||||
@@ -1259,7 +1256,6 @@ class TestPluginTaskStatusTransitions:
|
||||
id="pending-task",
|
||||
created_at=datetime.datetime.now(),
|
||||
updated_at=datetime.datetime.now(),
|
||||
plugin_unique_identifier="test-org/test-plugin/1.0.0",
|
||||
status=PluginInstallTaskStatus.Pending,
|
||||
total_plugins=3,
|
||||
completed_plugins=0, # No plugins completed yet
|
||||
@@ -1287,7 +1283,6 @@ class TestPluginTaskStatusTransitions:
|
||||
id="running-task",
|
||||
created_at=datetime.datetime.now(),
|
||||
updated_at=datetime.datetime.now(),
|
||||
plugin_unique_identifier="test-org/test-plugin/1.0.0",
|
||||
status=PluginInstallTaskStatus.Running,
|
||||
total_plugins=5,
|
||||
completed_plugins=2, # 2 out of 5 completed
|
||||
@@ -1316,7 +1311,6 @@ class TestPluginTaskStatusTransitions:
|
||||
id="success-task",
|
||||
created_at=datetime.datetime.now(),
|
||||
updated_at=datetime.datetime.now(),
|
||||
plugin_unique_identifier="test-org/test-plugin/1.0.0",
|
||||
status=PluginInstallTaskStatus.Success,
|
||||
total_plugins=4,
|
||||
completed_plugins=4, # All plugins completed
|
||||
@@ -1344,7 +1338,6 @@ class TestPluginTaskStatusTransitions:
|
||||
id="failed-task",
|
||||
created_at=datetime.datetime.now(),
|
||||
updated_at=datetime.datetime.now(),
|
||||
plugin_unique_identifier="test-org/test-plugin/1.0.0",
|
||||
status=PluginInstallTaskStatus.Failed,
|
||||
total_plugins=3,
|
||||
completed_plugins=1, # Only 1 completed before failure
|
||||
|
||||
@@ -1,93 +0,0 @@
|
||||
"""Unit tests for PluginManagerService.
|
||||
|
||||
This module covers the pre-uninstall plugin hook behavior:
|
||||
- Successful API call: no exception raised, correct request sent
|
||||
- API failure: soft-fail (logs and does not re-raise)
|
||||
"""
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
from httpx import HTTPStatusError
|
||||
|
||||
from configs import dify_config
|
||||
from services.enterprise.plugin_manager_service import (
|
||||
PluginManagerService,
|
||||
PreUninstallPluginRequest,
|
||||
)
|
||||
|
||||
|
||||
class TestTryPreUninstallPlugin:
|
||||
def test_try_pre_uninstall_plugin_success(self):
|
||||
body = PreUninstallPluginRequest(
|
||||
tenant_id="tenant-123",
|
||||
plugin_unique_identifier="com.example.my_plugin",
|
||||
)
|
||||
|
||||
with patch(
|
||||
"services.enterprise.plugin_manager_service.EnterprisePluginManagerRequest.send_request"
|
||||
) as mock_send_request:
|
||||
mock_send_request.return_value = {}
|
||||
|
||||
PluginManagerService.try_pre_uninstall_plugin(body)
|
||||
|
||||
mock_send_request.assert_called_once_with(
|
||||
"POST",
|
||||
"/pre-uninstall-plugin",
|
||||
json={"tenant_id": "tenant-123", "plugin_unique_identifier": "com.example.my_plugin"},
|
||||
raise_for_status=True,
|
||||
timeout=dify_config.ENTERPRISE_REQUEST_TIMEOUT,
|
||||
)
|
||||
|
||||
def test_try_pre_uninstall_plugin_http_error_soft_fails(self):
|
||||
body = PreUninstallPluginRequest(
|
||||
tenant_id="tenant-456",
|
||||
plugin_unique_identifier="com.example.other_plugin",
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"services.enterprise.plugin_manager_service.EnterprisePluginManagerRequest.send_request"
|
||||
) as mock_send_request,
|
||||
patch("services.enterprise.plugin_manager_service.logger") as mock_logger,
|
||||
):
|
||||
mock_send_request.side_effect = HTTPStatusError(
|
||||
"502 Bad Gateway",
|
||||
request=None,
|
||||
response=None,
|
||||
)
|
||||
|
||||
PluginManagerService.try_pre_uninstall_plugin(body)
|
||||
|
||||
mock_send_request.assert_called_once_with(
|
||||
"POST",
|
||||
"/pre-uninstall-plugin",
|
||||
json={"tenant_id": "tenant-456", "plugin_unique_identifier": "com.example.other_plugin"},
|
||||
raise_for_status=True,
|
||||
timeout=dify_config.ENTERPRISE_REQUEST_TIMEOUT,
|
||||
)
|
||||
mock_logger.exception.assert_called_once()
|
||||
|
||||
def test_try_pre_uninstall_plugin_generic_exception_soft_fails(self):
|
||||
body = PreUninstallPluginRequest(
|
||||
tenant_id="tenant-789",
|
||||
plugin_unique_identifier="com.example.failing_plugin",
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"services.enterprise.plugin_manager_service.EnterprisePluginManagerRequest.send_request"
|
||||
) as mock_send_request,
|
||||
patch("services.enterprise.plugin_manager_service.logger") as mock_logger,
|
||||
):
|
||||
mock_send_request.side_effect = ConnectionError("network unreachable")
|
||||
|
||||
PluginManagerService.try_pre_uninstall_plugin(body)
|
||||
|
||||
mock_send_request.assert_called_once_with(
|
||||
"POST",
|
||||
"/pre-uninstall-plugin",
|
||||
json={"tenant_id": "tenant-789", "plugin_unique_identifier": "com.example.failing_plugin"},
|
||||
raise_for_status=True,
|
||||
timeout=dify_config.ENTERPRISE_REQUEST_TIMEOUT,
|
||||
)
|
||||
mock_logger.exception.assert_called_once()
|
||||
@@ -265,6 +265,61 @@ def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
cleanup.run()
|
||||
|
||||
|
||||
def test_run_records_metrics_on_success(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
cutoff = datetime.datetime.now()
|
||||
repo = FakeRepo(
|
||||
batches=[[FakeRun("run-free", "t_free", cutoff)]],
|
||||
delete_result={
|
||||
"runs": 0,
|
||||
"node_executions": 2,
|
||||
"offloads": 1,
|
||||
"app_logs": 3,
|
||||
"trigger_logs": 4,
|
||||
"pauses": 5,
|
||||
"pause_reasons": 6,
|
||||
},
|
||||
)
|
||||
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
|
||||
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
|
||||
|
||||
batch_calls: list[dict[str, object]] = []
|
||||
completion_calls: list[dict[str, object]] = []
|
||||
monkeypatch.setattr(cleanup._metrics, "record_batch", lambda **kwargs: batch_calls.append(kwargs))
|
||||
monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs))
|
||||
|
||||
cleanup.run()
|
||||
|
||||
assert len(batch_calls) == 1
|
||||
assert batch_calls[0]["batch_rows"] == 1
|
||||
assert batch_calls[0]["targeted_runs"] == 1
|
||||
assert batch_calls[0]["deleted_runs"] == 1
|
||||
assert batch_calls[0]["related_action"] == "deleted"
|
||||
assert len(completion_calls) == 1
|
||||
assert completion_calls[0]["status"] == "success"
|
||||
|
||||
|
||||
def test_run_records_failed_metrics(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
class FailingRepo(FakeRepo):
|
||||
def delete_runs_with_related(
|
||||
self, runs: list[FakeRun], delete_node_executions=None, delete_trigger_logs=None
|
||||
) -> dict[str, int]:
|
||||
raise RuntimeError("delete failed")
|
||||
|
||||
cutoff = datetime.datetime.now()
|
||||
repo = FailingRepo(batches=[[FakeRun("run-free", "t_free", cutoff)]])
|
||||
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
|
||||
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
|
||||
|
||||
completion_calls: list[dict[str, object]] = []
|
||||
monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs))
|
||||
|
||||
with pytest.raises(RuntimeError, match="delete failed"):
|
||||
cleanup.run()
|
||||
|
||||
assert len(completion_calls) == 1
|
||||
assert completion_calls[0]["status"] == "failed"
|
||||
|
||||
|
||||
def test_run_dry_run_skips_deletions(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
cutoff = datetime.datetime.now()
|
||||
repo = FakeRepo(
|
||||
|
||||
@@ -619,3 +619,53 @@ class TestMessagesCleanServiceFromDays:
|
||||
assert service._end_before == expected_end_before
|
||||
assert service._batch_size == 1000 # default
|
||||
assert service._dry_run is False # default
|
||||
|
||||
|
||||
class TestMessagesCleanServiceRun:
|
||||
"""Unit tests for MessagesCleanService.run instrumentation behavior."""
|
||||
|
||||
def test_run_records_completion_metrics_on_success(self):
|
||||
# Arrange
|
||||
service = MessagesCleanService(
|
||||
policy=BillingDisabledPolicy(),
|
||||
start_from=datetime.datetime(2024, 1, 1),
|
||||
end_before=datetime.datetime(2024, 1, 2),
|
||||
batch_size=100,
|
||||
dry_run=False,
|
||||
)
|
||||
expected_stats = {
|
||||
"batches": 1,
|
||||
"total_messages": 10,
|
||||
"filtered_messages": 5,
|
||||
"total_deleted": 5,
|
||||
}
|
||||
service._clean_messages_by_time_range = MagicMock(return_value=expected_stats) # type: ignore[method-assign]
|
||||
completion_calls: list[dict[str, object]] = []
|
||||
service._metrics.record_completion = lambda **kwargs: completion_calls.append(kwargs) # type: ignore[method-assign]
|
||||
|
||||
# Act
|
||||
result = service.run()
|
||||
|
||||
# Assert
|
||||
assert result == expected_stats
|
||||
assert len(completion_calls) == 1
|
||||
assert completion_calls[0]["status"] == "success"
|
||||
|
||||
def test_run_records_completion_metrics_on_failure(self):
|
||||
# Arrange
|
||||
service = MessagesCleanService(
|
||||
policy=BillingDisabledPolicy(),
|
||||
start_from=datetime.datetime(2024, 1, 1),
|
||||
end_before=datetime.datetime(2024, 1, 2),
|
||||
batch_size=100,
|
||||
dry_run=False,
|
||||
)
|
||||
service._clean_messages_by_time_range = MagicMock(side_effect=RuntimeError("clean failed")) # type: ignore[method-assign]
|
||||
completion_calls: list[dict[str, object]] = []
|
||||
service._metrics.record_completion = lambda **kwargs: completion_calls.append(kwargs) # type: ignore[method-assign]
|
||||
|
||||
# Act & Assert
|
||||
with pytest.raises(RuntimeError, match="clean failed"):
|
||||
service.run()
|
||||
assert len(completion_calls) == 1
|
||||
assert completion_calls[0]["status"] == "failed"
|
||||
|
||||
Reference in New Issue
Block a user