Compare commits

...

9 Commits

Author SHA1 Message Date
hjlarry
232b8eb248 fix CI 2026-03-09 15:41:54 +08:00
hjlarry
4c0d81029f fix CI 2026-03-09 15:29:21 +08:00
autofix-ci[bot]
bacf70c00a [autofix.ci] apply automated fixes 2026-03-09 06:46:39 +00:00
非法操作
942729ba48 Update api/extensions/otel/runtime.py
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2026-03-09 14:44:35 +08:00
非法操作
4f5af0b43c Update api/extensions/otel/runtime.py
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2026-03-09 14:44:27 +08:00
hjlarry
a28cb993b8 Merge remote-tracking branch 'myori/main' into p372 2026-03-09 14:43:03 +08:00
hjlarry
c6dd2ef25a add task label to the cleanup task 2026-03-09 14:30:04 +08:00
hjlarry
b0e8becd14 add clean message metrics 2026-03-09 13:45:08 +08:00
hjlarry
f90e0d781a feat: add metrics to clean workflow run task 2026-03-09 09:48:24 +08:00
7 changed files with 684 additions and 142 deletions

View File

@@ -937,6 +937,12 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[
is_flag=True,
help="Preview cleanup results without deleting any workflow run data.",
)
@click.option(
"--task-label",
default="daily",
show_default=True,
help="Stable label value used to distinguish multiple cleanup CronJobs in metrics.",
)
def clean_workflow_runs(
before_days: int,
batch_size: int,
@@ -945,10 +951,13 @@ def clean_workflow_runs(
start_from: datetime.datetime | None,
end_before: datetime.datetime | None,
dry_run: bool,
task_label: str,
):
"""
Clean workflow runs and related workflow data for free tenants.
"""
from extensions.otel.runtime import flush_telemetry
if (start_from is None) ^ (end_before is None):
raise click.UsageError("--start-from and --end-before must be provided together.")
@@ -968,13 +977,17 @@ def clean_workflow_runs(
start_time = datetime.datetime.now(datetime.UTC)
click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white"))
WorkflowRunCleanup(
days=before_days,
batch_size=batch_size,
start_from=start_from,
end_before=end_before,
dry_run=dry_run,
).run()
try:
WorkflowRunCleanup(
days=before_days,
batch_size=batch_size,
start_from=start_from,
end_before=end_before,
dry_run=dry_run,
task_label=task_label,
).run()
finally:
flush_telemetry()
end_time = datetime.datetime.now(datetime.UTC)
elapsed = end_time - start_time
@@ -2630,6 +2643,12 @@ def migrate_oss(
help="Graceful period in days after subscription expiration, will be ignored when billing is disabled.",
)
@click.option("--dry-run", is_flag=True, default=False, help="Show messages logs would be cleaned without deleting")
@click.option(
"--task-label",
default="daily",
show_default=True,
help="Stable label value used to distinguish multiple cleanup CronJobs in metrics.",
)
def clean_expired_messages(
batch_size: int,
graceful_period: int,
@@ -2638,10 +2657,13 @@ def clean_expired_messages(
from_days_ago: int | None,
before_days: int | None,
dry_run: bool,
task_label: str,
):
"""
Clean expired messages and related data for tenants based on clean policy.
"""
from extensions.otel.runtime import flush_telemetry
click.echo(click.style("clean_messages: start clean messages.", fg="green"))
start_at = time.perf_counter()
@@ -2691,6 +2713,7 @@ def clean_expired_messages(
end_before=end_before,
batch_size=batch_size,
dry_run=dry_run,
task_label=task_label,
)
elif from_days_ago is None:
assert before_days is not None
@@ -2699,6 +2722,7 @@ def clean_expired_messages(
days=before_days,
batch_size=batch_size,
dry_run=dry_run,
task_label=task_label,
)
else:
assert before_days is not None
@@ -2710,6 +2734,7 @@ def clean_expired_messages(
end_before=now - datetime.timedelta(days=before_days),
batch_size=batch_size,
dry_run=dry_run,
task_label=task_label,
)
stats = service.run()
@@ -2735,6 +2760,8 @@ def clean_expired_messages(
)
)
raise
finally:
flush_telemetry()
click.echo(click.style("messages cleanup completed.", fg="green"))

View File

@@ -5,7 +5,7 @@ from typing import Union
from celery.signals import worker_init
from flask_login import user_loaded_from_request, user_logged_in
from opentelemetry import trace
from opentelemetry import metrics, trace
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.b3 import B3Format
from opentelemetry.propagators.composite import CompositePropagator
@@ -31,9 +31,29 @@ def setup_context_propagation() -> None:
def shutdown_tracer() -> None:
flush_telemetry()
def flush_telemetry() -> None:
"""
Best-effort flush for telemetry providers.
This is mainly used by short-lived command processes (e.g. Kubernetes CronJob)
so counters/histograms are exported before the process exits.
"""
provider = trace.get_tracer_provider()
if hasattr(provider, "force_flush"):
provider.force_flush()
try:
provider.force_flush()
except Exception:
logger.exception("otel: failed to flush trace provider")
metric_provider = metrics.get_meter_provider()
if hasattr(metric_provider, "force_flush"):
try:
metric_provider.force_flush()
except Exception:
logger.exception("otel: failed to flush metric provider")
def is_celery_worker():

View File

@@ -1,16 +1,16 @@
import datetime
import logging
import os
import random
import time
from collections.abc import Sequence
from typing import cast
from typing import TYPE_CHECKING, cast
import sqlalchemy as sa
from sqlalchemy import delete, select, tuple_
from sqlalchemy.engine import CursorResult
from sqlalchemy.orm import Session
from configs import dify_config
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.model import (
@@ -33,6 +33,128 @@ from services.retention.conversation.messages_clean_policy import (
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from opentelemetry.metrics import Counter, Histogram
class MessagesCleanupMetrics:
"""
Records low-cardinality OpenTelemetry metrics for expired message cleanup jobs.
We keep labels stable (dry_run/window_mode/task_label/status) so these metrics remain
dashboard-friendly for long-running CronJob executions.
"""
_job_runs_total: "Counter | None"
_batches_total: "Counter | None"
_messages_scanned_total: "Counter | None"
_messages_filtered_total: "Counter | None"
_messages_deleted_total: "Counter | None"
_job_duration_seconds: "Histogram | None"
_batch_duration_seconds: "Histogram | None"
_base_attributes: dict[str, str]
def __init__(self, *, dry_run: bool, has_window: bool, task_label: str) -> None:
self._job_runs_total = None
self._batches_total = None
self._messages_scanned_total = None
self._messages_filtered_total = None
self._messages_deleted_total = None
self._job_duration_seconds = None
self._batch_duration_seconds = None
self._base_attributes = {
"job_name": "messages_cleanup",
"dry_run": str(dry_run).lower(),
"window_mode": "between" if has_window else "before_cutoff",
"task_label": task_label,
}
self._init_instruments()
def _init_instruments(self) -> None:
try:
from opentelemetry.metrics import get_meter
meter = get_meter("messages_cleanup", version=dify_config.project.version)
self._job_runs_total = meter.create_counter(
"messages_cleanup_jobs_total",
description="Total number of expired message cleanup jobs by status.",
unit="{job}",
)
self._batches_total = meter.create_counter(
"messages_cleanup_batches_total",
description="Total number of message cleanup batches processed.",
unit="{batch}",
)
self._messages_scanned_total = meter.create_counter(
"messages_cleanup_scanned_messages_total",
description="Total messages scanned by cleanup jobs.",
unit="{message}",
)
self._messages_filtered_total = meter.create_counter(
"messages_cleanup_filtered_messages_total",
description="Total messages selected by cleanup policy.",
unit="{message}",
)
self._messages_deleted_total = meter.create_counter(
"messages_cleanup_deleted_messages_total",
description="Total messages deleted by cleanup jobs.",
unit="{message}",
)
self._job_duration_seconds = meter.create_histogram(
"messages_cleanup_job_duration_seconds",
description="Duration of expired message cleanup jobs in seconds.",
unit="s",
)
self._batch_duration_seconds = meter.create_histogram(
"messages_cleanup_batch_duration_seconds",
description="Duration of expired message cleanup batch processing in seconds.",
unit="s",
)
except Exception:
logger.exception("messages_cleanup_metrics: failed to initialize instruments")
def _attrs(self, **extra: str) -> dict[str, str]:
return {**self._base_attributes, **extra}
@staticmethod
def _add(counter: "Counter | None", value: int, attributes: dict[str, str]) -> None:
if not counter or value <= 0:
return
try:
counter.add(value, attributes)
except Exception:
logger.exception("messages_cleanup_metrics: failed to add counter value")
@staticmethod
def _record(histogram: "Histogram | None", value: float, attributes: dict[str, str]) -> None:
if not histogram:
return
try:
histogram.record(value, attributes)
except Exception:
logger.exception("messages_cleanup_metrics: failed to record histogram value")
def record_batch(
self,
*,
scanned_messages: int,
filtered_messages: int,
deleted_messages: int,
batch_duration_seconds: float,
) -> None:
attributes = self._attrs()
self._add(self._batches_total, 1, attributes)
self._add(self._messages_scanned_total, scanned_messages, attributes)
self._add(self._messages_filtered_total, filtered_messages, attributes)
self._add(self._messages_deleted_total, deleted_messages, attributes)
self._record(self._batch_duration_seconds, batch_duration_seconds, attributes)
def record_completion(self, *, status: str, job_duration_seconds: float) -> None:
attributes = self._attrs(status=status)
self._add(self._job_runs_total, 1, attributes)
self._record(self._job_duration_seconds, job_duration_seconds, attributes)
class MessagesCleanService:
"""
Service for cleaning expired messages based on retention policies.
@@ -48,6 +170,7 @@ class MessagesCleanService:
start_from: datetime.datetime | None = None,
batch_size: int = 1000,
dry_run: bool = False,
task_label: str = "daily",
) -> None:
"""
Initialize the service with cleanup parameters.
@@ -58,12 +181,20 @@ class MessagesCleanService:
start_from: Optional start time (inclusive) of the range
batch_size: Number of messages to process per batch
dry_run: Whether to perform a dry run (no actual deletion)
task_label: Stable task label to distinguish multiple cleanup CronJobs
"""
self._policy = policy
self._end_before = end_before
self._start_from = start_from
self._batch_size = batch_size
self._dry_run = dry_run
normalized_task_label = task_label.strip()
self._task_label = normalized_task_label or "daily"
self._metrics = MessagesCleanupMetrics(
dry_run=dry_run,
has_window=bool(start_from),
task_label=self._task_label,
)
@classmethod
def from_time_range(
@@ -73,6 +204,7 @@ class MessagesCleanService:
end_before: datetime.datetime,
batch_size: int = 1000,
dry_run: bool = False,
task_label: str = "daily",
) -> "MessagesCleanService":
"""
Create a service instance for cleaning messages within a specific time range.
@@ -85,6 +217,7 @@ class MessagesCleanService:
end_before: End time (exclusive) of the range
batch_size: Number of messages to process per batch
dry_run: Whether to perform a dry run (no actual deletion)
task_label: Stable task label to distinguish multiple cleanup CronJobs
Returns:
MessagesCleanService instance
@@ -112,6 +245,7 @@ class MessagesCleanService:
start_from=start_from,
batch_size=batch_size,
dry_run=dry_run,
task_label=task_label,
)
@classmethod
@@ -121,6 +255,7 @@ class MessagesCleanService:
days: int = 30,
batch_size: int = 1000,
dry_run: bool = False,
task_label: str = "daily",
) -> "MessagesCleanService":
"""
Create a service instance for cleaning messages older than specified days.
@@ -130,6 +265,7 @@ class MessagesCleanService:
days: Number of days to look back from now
batch_size: Number of messages to process per batch
dry_run: Whether to perform a dry run (no actual deletion)
task_label: Stable task label to distinguish multiple cleanup CronJobs
Returns:
MessagesCleanService instance
@@ -153,7 +289,14 @@ class MessagesCleanService:
policy.__class__.__name__,
)
return cls(policy=policy, end_before=end_before, start_from=None, batch_size=batch_size, dry_run=dry_run)
return cls(
policy=policy,
end_before=end_before,
start_from=None,
batch_size=batch_size,
dry_run=dry_run,
task_label=task_label,
)
def run(self) -> dict[str, int]:
"""
@@ -162,7 +305,18 @@ class MessagesCleanService:
Returns:
Dict with statistics: batches, filtered_messages, total_deleted
"""
return self._clean_messages_by_time_range()
status = "success"
run_start = time.monotonic()
try:
return self._clean_messages_by_time_range()
except Exception:
status = "failed"
raise
finally:
self._metrics.record_completion(
status=status,
job_duration_seconds=time.monotonic() - run_start,
)
def _clean_messages_by_time_range(self) -> dict[str, int]:
"""
@@ -197,11 +351,14 @@ class MessagesCleanService:
self._end_before,
)
max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200))
max_batch_interval_ms = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL
while True:
stats["batches"] += 1
batch_start = time.monotonic()
batch_scanned_messages = 0
batch_filtered_messages = 0
batch_deleted_messages = 0
# Step 1: Fetch a batch of messages using cursor
with Session(db.engine, expire_on_commit=False) as session:
@@ -240,9 +397,16 @@ class MessagesCleanService:
# Track total messages fetched across all batches
stats["total_messages"] += len(messages)
batch_scanned_messages = len(messages)
if not messages:
logger.info("clean_messages (batch %s): no more messages to process", stats["batches"])
self._metrics.record_batch(
scanned_messages=batch_scanned_messages,
filtered_messages=batch_filtered_messages,
deleted_messages=batch_deleted_messages,
batch_duration_seconds=time.monotonic() - batch_start,
)
break
# Update cursor to the last message's (created_at, id)
@@ -268,6 +432,12 @@ class MessagesCleanService:
if not apps:
logger.info("clean_messages (batch %s): no apps found, skip", stats["batches"])
self._metrics.record_batch(
scanned_messages=batch_scanned_messages,
filtered_messages=batch_filtered_messages,
deleted_messages=batch_deleted_messages,
batch_duration_seconds=time.monotonic() - batch_start,
)
continue
# Build app_id -> tenant_id mapping
@@ -286,9 +456,16 @@ class MessagesCleanService:
if not message_ids_to_delete:
logger.info("clean_messages (batch %s): no messages to delete, skip", stats["batches"])
self._metrics.record_batch(
scanned_messages=batch_scanned_messages,
filtered_messages=batch_filtered_messages,
deleted_messages=batch_deleted_messages,
batch_duration_seconds=time.monotonic() - batch_start,
)
continue
stats["filtered_messages"] += len(message_ids_to_delete)
batch_filtered_messages = len(message_ids_to_delete)
# Step 4: Batch delete messages and their relations
if not self._dry_run:
@@ -309,6 +486,7 @@ class MessagesCleanService:
commit_ms = int((time.monotonic() - commit_start) * 1000)
stats["total_deleted"] += messages_deleted
batch_deleted_messages = messages_deleted
logger.info(
"clean_messages (batch %s): processed %s messages, deleted %s messages",
@@ -343,6 +521,13 @@ class MessagesCleanService:
for msg_id in sampled_ids:
logger.info("clean_messages (batch %s, dry_run) sample: message_id=%s", stats["batches"], msg_id)
self._metrics.record_batch(
scanned_messages=batch_scanned_messages,
filtered_messages=batch_filtered_messages,
deleted_messages=batch_deleted_messages,
batch_duration_seconds=time.monotonic() - batch_start,
)
logger.info(
"clean_messages completed: total batches: %s, total messages: %s, filtered messages: %s, total deleted: %s",
stats["batches"],

View File

@@ -1,9 +1,9 @@
import datetime
import logging
import os
import random
import time
from collections.abc import Iterable, Sequence
from typing import TYPE_CHECKING
import click
from sqlalchemy.orm import Session, sessionmaker
@@ -20,6 +20,156 @@ from services.billing_service import BillingService, SubscriptionPlan
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from opentelemetry.metrics import Counter, Histogram
class WorkflowRunCleanupMetrics:
"""
Records low-cardinality OpenTelemetry metrics for workflow run cleanup jobs.
Metrics are emitted with stable labels only (dry_run/window_mode/task_label/status)
to keep dashboard and alert cardinality predictable in production clusters.
"""
_job_runs_total: "Counter | None"
_batches_total: "Counter | None"
_runs_scanned_total: "Counter | None"
_runs_targeted_total: "Counter | None"
_runs_deleted_total: "Counter | None"
_runs_skipped_total: "Counter | None"
_related_records_total: "Counter | None"
_job_duration_seconds: "Histogram | None"
_batch_duration_seconds: "Histogram | None"
_base_attributes: dict[str, str]
def __init__(self, *, dry_run: bool, has_window: bool, task_label: str) -> None:
self._job_runs_total = None
self._batches_total = None
self._runs_scanned_total = None
self._runs_targeted_total = None
self._runs_deleted_total = None
self._runs_skipped_total = None
self._related_records_total = None
self._job_duration_seconds = None
self._batch_duration_seconds = None
self._base_attributes = {
"job_name": "workflow_run_cleanup",
"dry_run": str(dry_run).lower(),
"window_mode": "between" if has_window else "before_cutoff",
"task_label": task_label,
}
self._init_instruments()
def _init_instruments(self) -> None:
try:
from opentelemetry.metrics import get_meter
meter = get_meter("workflow_run_cleanup", version=dify_config.project.version)
self._job_runs_total = meter.create_counter(
"workflow_run_cleanup_jobs_total",
description="Total number of workflow run cleanup jobs by status.",
unit="{job}",
)
self._batches_total = meter.create_counter(
"workflow_run_cleanup_batches_total",
description="Total number of processed cleanup batches.",
unit="{batch}",
)
self._runs_scanned_total = meter.create_counter(
"workflow_run_cleanup_scanned_runs_total",
description="Total workflow runs scanned by cleanup jobs.",
unit="{run}",
)
self._runs_targeted_total = meter.create_counter(
"workflow_run_cleanup_targeted_runs_total",
description="Total workflow runs targeted by cleanup policy.",
unit="{run}",
)
self._runs_deleted_total = meter.create_counter(
"workflow_run_cleanup_deleted_runs_total",
description="Total workflow runs deleted by cleanup jobs.",
unit="{run}",
)
self._runs_skipped_total = meter.create_counter(
"workflow_run_cleanup_skipped_runs_total",
description="Total workflow runs skipped because tenant is paid/unknown.",
unit="{run}",
)
self._related_records_total = meter.create_counter(
"workflow_run_cleanup_related_records_total",
description="Total related records processed by cleanup jobs.",
unit="{record}",
)
self._job_duration_seconds = meter.create_histogram(
"workflow_run_cleanup_job_duration_seconds",
description="Duration of workflow run cleanup jobs in seconds.",
unit="s",
)
self._batch_duration_seconds = meter.create_histogram(
"workflow_run_cleanup_batch_duration_seconds",
description="Duration of workflow run cleanup batch processing in seconds.",
unit="s",
)
except Exception:
logger.exception("workflow_run_cleanup_metrics: failed to initialize instruments")
def _attrs(self, **extra: str) -> dict[str, str]:
return {**self._base_attributes, **extra}
@staticmethod
def _add(counter: "Counter | None", value: int, attributes: dict[str, str]) -> None:
if not counter or value <= 0:
return
try:
counter.add(value, attributes)
except Exception:
logger.exception("workflow_run_cleanup_metrics: failed to add counter value")
@staticmethod
def _record(histogram: "Histogram | None", value: float, attributes: dict[str, str]) -> None:
if not histogram:
return
try:
histogram.record(value, attributes)
except Exception:
logger.exception("workflow_run_cleanup_metrics: failed to record histogram value")
def record_batch(
self,
*,
batch_rows: int,
targeted_runs: int,
skipped_runs: int,
deleted_runs: int,
related_counts: dict[str, int] | None,
related_action: str | None,
batch_duration_seconds: float,
) -> None:
attributes = self._attrs()
self._add(self._batches_total, 1, attributes)
self._add(self._runs_scanned_total, batch_rows, attributes)
self._add(self._runs_targeted_total, targeted_runs, attributes)
self._add(self._runs_skipped_total, skipped_runs, attributes)
self._add(self._runs_deleted_total, deleted_runs, attributes)
self._record(self._batch_duration_seconds, batch_duration_seconds, attributes)
if not related_counts or not related_action:
return
for record_type, count in related_counts.items():
self._add(
self._related_records_total,
count,
self._attrs(action=related_action, record_type=record_type),
)
def record_completion(self, *, status: str, job_duration_seconds: float) -> None:
attributes = self._attrs(status=status)
self._add(self._job_runs_total, 1, attributes)
self._record(self._job_duration_seconds, job_duration_seconds, attributes)
class WorkflowRunCleanup:
def __init__(
self,
@@ -29,6 +179,7 @@ class WorkflowRunCleanup:
end_before: datetime.datetime | None = None,
workflow_run_repo: APIWorkflowRunRepository | None = None,
dry_run: bool = False,
task_label: str = "daily",
):
if (start_from is None) ^ (end_before is None):
raise ValueError("start_from and end_before must be both set or both omitted.")
@@ -46,6 +197,13 @@ class WorkflowRunCleanup:
self.batch_size = batch_size
self._cleanup_whitelist: set[str] | None = None
self.dry_run = dry_run
normalized_task_label = task_label.strip()
self.task_label = normalized_task_label or "daily"
self._metrics = WorkflowRunCleanupMetrics(
dry_run=dry_run,
has_window=bool(start_from),
task_label=self.task_label,
)
self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD
self.workflow_run_repo: APIWorkflowRunRepository
if workflow_run_repo:
@@ -74,153 +232,193 @@ class WorkflowRunCleanup:
related_totals = self._empty_related_counts() if self.dry_run else None
batch_index = 0
last_seen: tuple[datetime.datetime, str] | None = None
status = "success"
run_start = time.monotonic()
max_batch_interval_ms = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL
max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200))
try:
while True:
batch_start = time.monotonic()
while True:
batch_start = time.monotonic()
fetch_start = time.monotonic()
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
start_from=self.window_start,
end_before=self.window_end,
last_seen=last_seen,
batch_size=self.batch_size,
)
if not run_rows:
logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1)
break
batch_index += 1
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
logger.info(
"workflow_run_cleanup (batch #%s): fetched %s rows in %sms",
batch_index,
len(run_rows),
int((time.monotonic() - fetch_start) * 1000),
)
tenant_ids = {row.tenant_id for row in run_rows}
filter_start = time.monotonic()
free_tenants = self._filter_free_tenants(tenant_ids)
logger.info(
"workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms",
batch_index,
len(free_tenants),
len(tenant_ids),
int((time.monotonic() - filter_start) * 1000),
)
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
paid_or_skipped = len(run_rows) - len(free_runs)
if not free_runs:
skipped_message = (
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)"
fetch_start = time.monotonic()
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
start_from=self.window_start,
end_before=self.window_end,
last_seen=last_seen,
batch_size=self.batch_size,
)
click.echo(
click.style(
skipped_message,
fg="yellow",
)
)
continue
if not run_rows:
logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1)
break
total_runs_targeted += len(free_runs)
if self.dry_run:
count_start = time.monotonic()
batch_counts = self.workflow_run_repo.count_runs_with_related(
free_runs,
count_node_executions=self._count_node_executions,
count_trigger_logs=self._count_trigger_logs,
)
batch_index += 1
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
logger.info(
"workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms",
"workflow_run_cleanup (batch #%s): fetched %s rows in %sms",
batch_index,
int((time.monotonic() - count_start) * 1000),
len(run_rows),
int((time.monotonic() - fetch_start) * 1000),
)
if related_totals is not None:
for key in related_totals:
related_totals[key] += batch_counts.get(key, 0)
sample_ids = ", ".join(run.id for run in free_runs[:5])
tenant_ids = {row.tenant_id for row in run_rows}
filter_start = time.monotonic()
free_tenants = self._filter_free_tenants(tenant_ids)
logger.info(
"workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms",
batch_index,
len(free_tenants),
len(tenant_ids),
int((time.monotonic() - filter_start) * 1000),
)
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
paid_or_skipped = len(run_rows) - len(free_runs)
if not free_runs:
skipped_message = (
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)"
)
click.echo(
click.style(
skipped_message,
fg="yellow",
)
)
self._metrics.record_batch(
batch_rows=len(run_rows),
targeted_runs=0,
skipped_runs=paid_or_skipped,
deleted_runs=0,
related_counts=None,
related_action=None,
batch_duration_seconds=time.monotonic() - batch_start,
)
continue
total_runs_targeted += len(free_runs)
if self.dry_run:
count_start = time.monotonic()
batch_counts = self.workflow_run_repo.count_runs_with_related(
free_runs,
count_node_executions=self._count_node_executions,
count_trigger_logs=self._count_trigger_logs,
)
logger.info(
"workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms",
batch_index,
int((time.monotonic() - count_start) * 1000),
)
if related_totals is not None:
for key in related_totals:
related_totals[key] += batch_counts.get(key, 0)
sample_ids = ", ".join(run.id for run in free_runs[:5])
click.echo(
click.style(
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
fg="yellow",
)
)
logger.info(
"workflow_run_cleanup (batch #%s, dry_run): batch total %sms",
batch_index,
int((time.monotonic() - batch_start) * 1000),
)
self._metrics.record_batch(
batch_rows=len(run_rows),
targeted_runs=len(free_runs),
skipped_runs=paid_or_skipped,
deleted_runs=0,
related_counts={key: batch_counts.get(key, 0) for key in self._empty_related_counts()},
related_action="would_delete",
batch_duration_seconds=time.monotonic() - batch_start,
)
continue
try:
delete_start = time.monotonic()
counts = self.workflow_run_repo.delete_runs_with_related(
free_runs,
delete_node_executions=self._delete_node_executions,
delete_trigger_logs=self._delete_trigger_logs,
)
delete_ms = int((time.monotonic() - delete_start) * 1000)
except Exception:
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
raise
total_runs_deleted += counts["runs"]
click.echo(
click.style(
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
fg="yellow",
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
f"skipped {paid_or_skipped} paid/unknown",
fg="green",
)
)
logger.info(
"workflow_run_cleanup (batch #%s, dry_run): batch total %sms",
"workflow_run_cleanup (batch #%s): delete %sms, batch total %sms",
batch_index,
delete_ms,
int((time.monotonic() - batch_start) * 1000),
)
continue
try:
delete_start = time.monotonic()
counts = self.workflow_run_repo.delete_runs_with_related(
free_runs,
delete_node_executions=self._delete_node_executions,
delete_trigger_logs=self._delete_trigger_logs,
self._metrics.record_batch(
batch_rows=len(run_rows),
targeted_runs=len(free_runs),
skipped_runs=paid_or_skipped,
deleted_runs=counts["runs"],
related_counts={key: counts.get(key, 0) for key in self._empty_related_counts()},
related_action="deleted",
batch_duration_seconds=time.monotonic() - batch_start,
)
delete_ms = int((time.monotonic() - delete_start) * 1000)
except Exception:
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
raise
total_runs_deleted += counts["runs"]
click.echo(
click.style(
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
f"skipped {paid_or_skipped} paid/unknown",
fg="green",
)
)
logger.info(
"workflow_run_cleanup (batch #%s): delete %sms, batch total %sms",
batch_index,
delete_ms,
int((time.monotonic() - batch_start) * 1000),
)
# Random sleep between batches to avoid overwhelming the database
sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311
logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms)
time.sleep(sleep_ms / 1000)
# Random sleep between batches to avoid overwhelming the database
sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311
logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms)
time.sleep(sleep_ms / 1000)
if self.dry_run:
if self.window_start:
summary_message = (
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
)
if self.dry_run:
if self.window_start:
summary_message = (
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
)
else:
summary_message = (
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
f"before {self.window_end.isoformat()}"
)
if related_totals is not None:
summary_message = (
f"{summary_message}; related records: {self._format_related_counts(related_totals)}"
)
summary_color = "yellow"
else:
summary_message = (
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
f"before {self.window_end.isoformat()}"
)
if related_totals is not None:
summary_message = f"{summary_message}; related records: {self._format_related_counts(related_totals)}"
summary_color = "yellow"
else:
if self.window_start:
summary_message = (
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
)
else:
summary_message = (
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs before {self.window_end.isoformat()}"
)
summary_color = "white"
if self.window_start:
summary_message = (
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
)
else:
summary_message = (
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
f"before {self.window_end.isoformat()}"
)
summary_color = "white"
click.echo(click.style(summary_message, fg=summary_color))
click.echo(click.style(summary_message, fg=summary_color))
except Exception:
status = "failed"
raise
finally:
self._metrics.record_completion(
status=status,
job_duration_seconds=time.monotonic() - run_start,
)
def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]:
tenant_id_list = list(tenant_ids)

View File

@@ -38,6 +38,7 @@ def test_absolute_mode_calls_from_time_range():
from_days_ago=None,
before_days=None,
dry_run=True,
task_label="daily",
)
mock_from_time_range.assert_called_once_with(
@@ -46,6 +47,7 @@ def test_absolute_mode_calls_from_time_range():
end_before=end_before,
batch_size=200,
dry_run=True,
task_label="daily",
)
mock_from_days.assert_not_called()
@@ -67,6 +69,7 @@ def test_relative_mode_before_days_only_calls_from_days():
from_days_ago=None,
before_days=30,
dry_run=False,
task_label="daily",
)
mock_from_days.assert_called_once_with(
@@ -74,6 +77,7 @@ def test_relative_mode_before_days_only_calls_from_days():
days=30,
batch_size=500,
dry_run=False,
task_label="daily",
)
mock_from_time_range.assert_not_called()
@@ -97,6 +101,7 @@ def test_relative_mode_with_from_days_ago_calls_from_time_range():
from_days_ago=60,
before_days=30,
dry_run=False,
task_label="daily",
)
mock_from_time_range.assert_called_once_with(
@@ -105,6 +110,7 @@ def test_relative_mode_with_from_days_ago_calls_from_time_range():
end_before=fixed_now - datetime.timedelta(days=30),
batch_size=1000,
dry_run=False,
task_label="daily",
)
mock_from_days.assert_not_called()
@@ -178,4 +184,5 @@ def test_invalid_inputs_raise_usage_error(kwargs: dict, message: str):
from_days_ago=kwargs["from_days_ago"],
before_days=kwargs["before_days"],
dry_run=False,
task_label="daily",
)

View File

@@ -265,6 +265,61 @@ def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None:
cleanup.run()
def test_run_records_metrics_on_success(monkeypatch: pytest.MonkeyPatch) -> None:
cutoff = datetime.datetime.now()
repo = FakeRepo(
batches=[[FakeRun("run-free", "t_free", cutoff)]],
delete_result={
"runs": 0,
"node_executions": 2,
"offloads": 1,
"app_logs": 3,
"trigger_logs": 4,
"pauses": 5,
"pause_reasons": 6,
},
)
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
batch_calls: list[dict[str, object]] = []
completion_calls: list[dict[str, object]] = []
monkeypatch.setattr(cleanup._metrics, "record_batch", lambda **kwargs: batch_calls.append(kwargs))
monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs))
cleanup.run()
assert len(batch_calls) == 1
assert batch_calls[0]["batch_rows"] == 1
assert batch_calls[0]["targeted_runs"] == 1
assert batch_calls[0]["deleted_runs"] == 1
assert batch_calls[0]["related_action"] == "deleted"
assert len(completion_calls) == 1
assert completion_calls[0]["status"] == "success"
def test_run_records_failed_metrics(monkeypatch: pytest.MonkeyPatch) -> None:
class FailingRepo(FakeRepo):
def delete_runs_with_related(
self, runs: list[FakeRun], delete_node_executions=None, delete_trigger_logs=None
) -> dict[str, int]:
raise RuntimeError("delete failed")
cutoff = datetime.datetime.now()
repo = FailingRepo(batches=[[FakeRun("run-free", "t_free", cutoff)]])
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
completion_calls: list[dict[str, object]] = []
monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs))
with pytest.raises(RuntimeError, match="delete failed"):
cleanup.run()
assert len(completion_calls) == 1
assert completion_calls[0]["status"] == "failed"
def test_run_dry_run_skips_deletions(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
cutoff = datetime.datetime.now()
repo = FakeRepo(

View File

@@ -619,3 +619,53 @@ class TestMessagesCleanServiceFromDays:
assert service._end_before == expected_end_before
assert service._batch_size == 1000 # default
assert service._dry_run is False # default
class TestMessagesCleanServiceRun:
"""Unit tests for MessagesCleanService.run instrumentation behavior."""
def test_run_records_completion_metrics_on_success(self):
# Arrange
service = MessagesCleanService(
policy=BillingDisabledPolicy(),
start_from=datetime.datetime(2024, 1, 1),
end_before=datetime.datetime(2024, 1, 2),
batch_size=100,
dry_run=False,
)
expected_stats = {
"batches": 1,
"total_messages": 10,
"filtered_messages": 5,
"total_deleted": 5,
}
service._clean_messages_by_time_range = MagicMock(return_value=expected_stats) # type: ignore[method-assign]
completion_calls: list[dict[str, object]] = []
service._metrics.record_completion = lambda **kwargs: completion_calls.append(kwargs) # type: ignore[method-assign]
# Act
result = service.run()
# Assert
assert result == expected_stats
assert len(completion_calls) == 1
assert completion_calls[0]["status"] == "success"
def test_run_records_completion_metrics_on_failure(self):
# Arrange
service = MessagesCleanService(
policy=BillingDisabledPolicy(),
start_from=datetime.datetime(2024, 1, 1),
end_before=datetime.datetime(2024, 1, 2),
batch_size=100,
dry_run=False,
)
service._clean_messages_by_time_range = MagicMock(side_effect=RuntimeError("clean failed")) # type: ignore[method-assign]
completion_calls: list[dict[str, object]] = []
service._metrics.record_completion = lambda **kwargs: completion_calls.append(kwargs) # type: ignore[method-assign]
# Act & Assert
with pytest.raises(RuntimeError, match="clean failed"):
service.run()
assert len(completion_calls) == 1
assert completion_calls[0]["status"] == "failed"