Compare commits

...

3 Commits

Author SHA1 Message Date
hjlarry
b19451dbed log expiration_date=-1 2026-01-16 14:57:36 +08:00
hjlarry
c5eeb929cf improve query workflowNodeExecution 2026-01-16 14:46:46 +08:00
hjlarry
65a9233559 Add SQL timing logs to workflow run cleanup 2026-01-16 13:58:45 +08:00
4 changed files with 165 additions and 118 deletions

View File

@@ -881,12 +881,25 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[
is_flag=True,
help="Preview cleanup results without deleting any workflow run data.",
)
@click.option(
"--log-sql",
is_flag=True,
help="Log SQL statements and timings for cleanup queries.",
)
@click.option(
"--log-sql-min-ms",
default=0,
show_default=True,
help="Only log SQL statements slower than N milliseconds (0 logs all).",
)
def clean_workflow_runs(
days: int,
batch_size: int,
start_from: datetime.datetime | None,
end_before: datetime.datetime | None,
dry_run: bool,
log_sql: bool,
log_sql_min_ms: int,
):
"""
Clean workflow runs and related workflow data for free tenants.
@@ -903,6 +916,8 @@ def clean_workflow_runs(
start_from=start_from,
end_before=end_before,
dry_run=dry_run,
log_sql=log_sql,
log_sql_min_ms=log_sql_min_ms,
).run()
end_time = datetime.datetime.now(datetime.UTC)

View File

@@ -9,7 +9,7 @@ from collections.abc import Sequence
from datetime import datetime
from typing import TypedDict, cast
from sqlalchemy import asc, delete, desc, func, select, tuple_
from sqlalchemy import asc, delete, desc, func, select
from sqlalchemy.engine import CursorResult
from sqlalchemy.orm import Session, sessionmaker
@@ -328,39 +328,14 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
"""
Delete node executions (and offloads) for the given workflow runs using indexed columns.
Uses the composite index on (tenant_id, app_id, workflow_id, triggered_from, workflow_run_id)
by filtering on those columns with tuple IN.
Uses the workflow_run_id index to target executions by run id.
"""
if not runs:
return 0, 0
tuple_values = [
(
run["tenant_id"],
run["app_id"],
run["workflow_id"],
DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from(
run["triggered_from"]
),
run["run_id"],
)
for run in runs
]
node_execution_ids = session.scalars(
select(WorkflowNodeExecutionModel.id).where(
tuple_(
WorkflowNodeExecutionModel.tenant_id,
WorkflowNodeExecutionModel.app_id,
WorkflowNodeExecutionModel.workflow_id,
WorkflowNodeExecutionModel.triggered_from,
WorkflowNodeExecutionModel.workflow_run_id,
).in_(tuple_values)
)
).all()
if not node_execution_ids:
return 0, 0
run_ids = [run["run_id"] for run in runs]
run_id_filter = WorkflowNodeExecutionModel.workflow_run_id.in_(run_ids)
node_execution_ids = select(WorkflowNodeExecutionModel.id).where(run_id_filter)
offloads_deleted = (
cast(
@@ -378,7 +353,7 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
cast(
CursorResult,
session.execute(
delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(node_execution_ids))
delete(WorkflowNodeExecutionModel).where(run_id_filter)
),
).rowcount
or 0
@@ -394,38 +369,18 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
if not runs:
return 0, 0
tuple_values = [
(
run["tenant_id"],
run["app_id"],
run["workflow_id"],
DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from(
run["triggered_from"]
),
run["run_id"],
)
for run in runs
]
tuple_filter = tuple_(
WorkflowNodeExecutionModel.tenant_id,
WorkflowNodeExecutionModel.app_id,
WorkflowNodeExecutionModel.workflow_id,
WorkflowNodeExecutionModel.triggered_from,
WorkflowNodeExecutionModel.workflow_run_id,
).in_(tuple_values)
run_ids = [run["run_id"] for run in runs]
run_id_filter = WorkflowNodeExecutionModel.workflow_run_id.in_(run_ids)
node_executions_count = (
session.scalar(select(func.count()).select_from(WorkflowNodeExecutionModel).where(tuple_filter)) or 0
session.scalar(select(func.count()).select_from(WorkflowNodeExecutionModel).where(run_id_filter)) or 0
)
node_execution_ids = select(WorkflowNodeExecutionModel.id).where(run_id_filter)
offloads_count = (
session.scalar(
select(func.count())
.select_from(WorkflowNodeExecutionOffload)
.join(
WorkflowNodeExecutionModel,
WorkflowNodeExecutionOffload.node_execution_id == WorkflowNodeExecutionModel.id,
)
.where(tuple_filter)
.where(WorkflowNodeExecutionOffload.node_execution_id.in_(node_execution_ids))
)
or 0
)

View File

@@ -42,6 +42,7 @@ class BillingService:
params = {"tenant_id": tenant_id}
billing_info = cls._send_request("GET", "/subscription/info", params=params)
cls._log_expiration_date_if_missing(tenant_id, billing_info, "get_info")
return billing_info
@classmethod
@@ -281,6 +282,7 @@ class BillingService:
try:
subscription_plan = subscription_adapter.validate_python(plan)
results[tenant_id] = subscription_plan
cls._log_expiration_date_if_missing(tenant_id, subscription_plan, "get_plan_bulk")
except Exception:
logger.exception(
"get_plan_bulk: failed to validate subscription plan for tenant(%s)", tenant_id
@@ -296,6 +298,21 @@ class BillingService:
def _make_plan_cache_key(cls, tenant_id: str) -> str:
return f"{cls._PLAN_CACHE_KEY_PREFIX}{tenant_id}"
@staticmethod
def _log_expiration_date_if_missing(tenant_id: str, payload: dict, source: str) -> None:
expiration_date = None
if isinstance(payload, dict):
if "expiration_date" in payload:
expiration_date = payload.get("expiration_date")
elif isinstance(payload.get("subscription"), dict):
expiration_date = payload["subscription"].get("expiration_date")
if expiration_date == -1:
logger.warning(
"billing %s: tenant %s returned expiration_date=-1",
source,
tenant_id,
)
@classmethod
def get_plan_bulk_with_cache(cls, tenant_ids: Sequence[str]) -> dict[str, SubscriptionPlan]:
"""

View File

@@ -1,8 +1,11 @@
import datetime
import logging
import time
from collections.abc import Iterable, Sequence
from contextlib import contextmanager
import click
from sqlalchemy import event
from sqlalchemy.orm import Session, sessionmaker
from configs import dify_config
@@ -28,6 +31,8 @@ class WorkflowRunCleanup:
end_before: datetime.datetime | None = None,
workflow_run_repo: APIWorkflowRunRepository | None = None,
dry_run: bool = False,
log_sql: bool = False,
log_sql_min_ms: int = 0,
):
if (start_from is None) ^ (end_before is None):
raise ValueError("start_from and end_before must be both set or both omitted.")
@@ -45,6 +50,8 @@ class WorkflowRunCleanup:
self.batch_size = batch_size
self._cleanup_whitelist: set[str] | None = None
self.dry_run = dry_run
self.log_sql = log_sql
self.log_sql_min_ms = max(0, log_sql_min_ms)
self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD
self.workflow_run_repo: APIWorkflowRunRepository
if workflow_run_repo:
@@ -56,6 +63,38 @@ class WorkflowRunCleanup:
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
self.workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
@contextmanager
def _sql_logger(self):
if not self.log_sql:
yield
return
def _before_cursor_execute(conn, cursor, statement, parameters, context, executemany) -> None:
context._dify_sql_start_time = time.monotonic()
context._dify_sql_statement = statement
context._dify_sql_parameters = parameters
def _after_cursor_execute(conn, cursor, statement, parameters, context, executemany) -> None:
start = getattr(context, "_dify_sql_start_time", None)
if start is None:
return
elapsed_ms = (time.monotonic() - start) * 1000
if elapsed_ms < self.log_sql_min_ms:
return
logged_statement = getattr(context, "_dify_sql_statement", statement)
logged_parameters = getattr(context, "_dify_sql_parameters", parameters)
click.echo(f"[sql] {elapsed_ms:.1f} ms {logged_statement}")
if logged_parameters:
click.echo(f"[sql] params: {logged_parameters}")
event.listen(db.engine, "before_cursor_execute", _before_cursor_execute)
event.listen(db.engine, "after_cursor_execute", _after_cursor_execute)
try:
yield
finally:
event.remove(db.engine, "before_cursor_execute", _before_cursor_execute)
event.remove(db.engine, "after_cursor_execute", _after_cursor_execute)
def run(self) -> None:
click.echo(
click.style(
@@ -74,74 +113,95 @@ class WorkflowRunCleanup:
batch_index = 0
last_seen: tuple[datetime.datetime, str] | None = None
while True:
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
start_from=self.window_start,
end_before=self.window_end,
last_seen=last_seen,
batch_size=self.batch_size,
)
if not run_rows:
break
with self._sql_logger():
while True:
batch_start = time.monotonic()
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
start_from=self.window_start,
end_before=self.window_end,
last_seen=last_seen,
batch_size=self.batch_size,
)
fetch_ms = (time.monotonic() - batch_start) * 1000
if not run_rows:
break
batch_index += 1
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
tenant_ids = {row.tenant_id for row in run_rows}
free_tenants = self._filter_free_tenants(tenant_ids)
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
paid_or_skipped = len(run_rows) - len(free_runs)
batch_index += 1
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
tenant_ids = {row.tenant_id for row in run_rows}
billing_start = time.monotonic()
free_tenants = self._filter_free_tenants(tenant_ids)
billing_ms = (time.monotonic() - billing_start) * 1000
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
paid_or_skipped = len(run_rows) - len(free_runs)
if not free_runs:
if self.log_sql:
click.echo(
click.style(
f"[batch #{batch_index}] fetch_ms={fetch_ms:.1f} billing_ms={billing_ms:.1f}",
fg="white",
)
)
if not free_runs:
skipped_message = (
f"[batch #{batch_index}] skipped (no sandbox runs in batch, "
f"{paid_or_skipped} paid/unknown)"
)
click.echo(
click.style(
skipped_message,
fg="yellow",
)
)
continue
total_runs_targeted += len(free_runs)
if self.dry_run:
count_start = time.monotonic()
batch_counts = self.workflow_run_repo.count_runs_with_related(
free_runs,
count_node_executions=self._count_node_executions,
count_trigger_logs=self._count_trigger_logs,
)
count_ms = (time.monotonic() - count_start) * 1000
if self.log_sql:
click.echo(click.style(f"[batch #{batch_index}] count_ms={count_ms:.1f}", fg="white"))
if related_totals is not None:
for key in related_totals:
related_totals[key] += batch_counts.get(key, 0)
sample_ids = ", ".join(run.id for run in free_runs[:5])
click.echo(
click.style(
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
fg="yellow",
)
)
continue
try:
counts = self.workflow_run_repo.delete_runs_with_related(
free_runs,
delete_node_executions=self._delete_node_executions,
delete_trigger_logs=self._delete_trigger_logs,
)
except Exception:
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
raise
total_runs_deleted += counts["runs"]
click.echo(
click.style(
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)",
fg="yellow",
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
f"skipped {paid_or_skipped} paid/unknown",
fg="green",
)
)
continue
total_runs_targeted += len(free_runs)
if self.dry_run:
batch_counts = self.workflow_run_repo.count_runs_with_related(
free_runs,
count_node_executions=self._count_node_executions,
count_trigger_logs=self._count_trigger_logs,
)
if related_totals is not None:
for key in related_totals:
related_totals[key] += batch_counts.get(key, 0)
sample_ids = ", ".join(run.id for run in free_runs[:5])
click.echo(
click.style(
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
fg="yellow",
)
)
continue
try:
counts = self.workflow_run_repo.delete_runs_with_related(
free_runs,
delete_node_executions=self._delete_node_executions,
delete_trigger_logs=self._delete_trigger_logs,
)
except Exception:
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
raise
total_runs_deleted += counts["runs"]
click.echo(
click.style(
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
f"skipped {paid_or_skipped} paid/unknown",
fg="green",
)
)
if self.dry_run:
if self.window_start: