mirror of
https://github.com/langgenius/dify.git
synced 2026-01-18 04:49:57 +00:00
Compare commits
3 Commits
deploy/dev
...
build/clea
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b19451dbed | ||
|
|
c5eeb929cf | ||
|
|
65a9233559 |
@@ -881,12 +881,25 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[
|
||||
is_flag=True,
|
||||
help="Preview cleanup results without deleting any workflow run data.",
|
||||
)
|
||||
@click.option(
|
||||
"--log-sql",
|
||||
is_flag=True,
|
||||
help="Log SQL statements and timings for cleanup queries.",
|
||||
)
|
||||
@click.option(
|
||||
"--log-sql-min-ms",
|
||||
default=0,
|
||||
show_default=True,
|
||||
help="Only log SQL statements slower than N milliseconds (0 logs all).",
|
||||
)
|
||||
def clean_workflow_runs(
|
||||
days: int,
|
||||
batch_size: int,
|
||||
start_from: datetime.datetime | None,
|
||||
end_before: datetime.datetime | None,
|
||||
dry_run: bool,
|
||||
log_sql: bool,
|
||||
log_sql_min_ms: int,
|
||||
):
|
||||
"""
|
||||
Clean workflow runs and related workflow data for free tenants.
|
||||
@@ -903,6 +916,8 @@ def clean_workflow_runs(
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
dry_run=dry_run,
|
||||
log_sql=log_sql,
|
||||
log_sql_min_ms=log_sql_min_ms,
|
||||
).run()
|
||||
|
||||
end_time = datetime.datetime.now(datetime.UTC)
|
||||
|
||||
@@ -9,7 +9,7 @@ from collections.abc import Sequence
|
||||
from datetime import datetime
|
||||
from typing import TypedDict, cast
|
||||
|
||||
from sqlalchemy import asc, delete, desc, func, select, tuple_
|
||||
from sqlalchemy import asc, delete, desc, func, select
|
||||
from sqlalchemy.engine import CursorResult
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
@@ -328,39 +328,14 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
|
||||
"""
|
||||
Delete node executions (and offloads) for the given workflow runs using indexed columns.
|
||||
|
||||
Uses the composite index on (tenant_id, app_id, workflow_id, triggered_from, workflow_run_id)
|
||||
by filtering on those columns with tuple IN.
|
||||
Uses the workflow_run_id index to target executions by run id.
|
||||
"""
|
||||
if not runs:
|
||||
return 0, 0
|
||||
|
||||
tuple_values = [
|
||||
(
|
||||
run["tenant_id"],
|
||||
run["app_id"],
|
||||
run["workflow_id"],
|
||||
DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from(
|
||||
run["triggered_from"]
|
||||
),
|
||||
run["run_id"],
|
||||
)
|
||||
for run in runs
|
||||
]
|
||||
|
||||
node_execution_ids = session.scalars(
|
||||
select(WorkflowNodeExecutionModel.id).where(
|
||||
tuple_(
|
||||
WorkflowNodeExecutionModel.tenant_id,
|
||||
WorkflowNodeExecutionModel.app_id,
|
||||
WorkflowNodeExecutionModel.workflow_id,
|
||||
WorkflowNodeExecutionModel.triggered_from,
|
||||
WorkflowNodeExecutionModel.workflow_run_id,
|
||||
).in_(tuple_values)
|
||||
)
|
||||
).all()
|
||||
|
||||
if not node_execution_ids:
|
||||
return 0, 0
|
||||
run_ids = [run["run_id"] for run in runs]
|
||||
run_id_filter = WorkflowNodeExecutionModel.workflow_run_id.in_(run_ids)
|
||||
node_execution_ids = select(WorkflowNodeExecutionModel.id).where(run_id_filter)
|
||||
|
||||
offloads_deleted = (
|
||||
cast(
|
||||
@@ -378,7 +353,7 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
|
||||
cast(
|
||||
CursorResult,
|
||||
session.execute(
|
||||
delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(node_execution_ids))
|
||||
delete(WorkflowNodeExecutionModel).where(run_id_filter)
|
||||
),
|
||||
).rowcount
|
||||
or 0
|
||||
@@ -394,38 +369,18 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
|
||||
if not runs:
|
||||
return 0, 0
|
||||
|
||||
tuple_values = [
|
||||
(
|
||||
run["tenant_id"],
|
||||
run["app_id"],
|
||||
run["workflow_id"],
|
||||
DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from(
|
||||
run["triggered_from"]
|
||||
),
|
||||
run["run_id"],
|
||||
)
|
||||
for run in runs
|
||||
]
|
||||
tuple_filter = tuple_(
|
||||
WorkflowNodeExecutionModel.tenant_id,
|
||||
WorkflowNodeExecutionModel.app_id,
|
||||
WorkflowNodeExecutionModel.workflow_id,
|
||||
WorkflowNodeExecutionModel.triggered_from,
|
||||
WorkflowNodeExecutionModel.workflow_run_id,
|
||||
).in_(tuple_values)
|
||||
run_ids = [run["run_id"] for run in runs]
|
||||
run_id_filter = WorkflowNodeExecutionModel.workflow_run_id.in_(run_ids)
|
||||
|
||||
node_executions_count = (
|
||||
session.scalar(select(func.count()).select_from(WorkflowNodeExecutionModel).where(tuple_filter)) or 0
|
||||
session.scalar(select(func.count()).select_from(WorkflowNodeExecutionModel).where(run_id_filter)) or 0
|
||||
)
|
||||
node_execution_ids = select(WorkflowNodeExecutionModel.id).where(run_id_filter)
|
||||
offloads_count = (
|
||||
session.scalar(
|
||||
select(func.count())
|
||||
.select_from(WorkflowNodeExecutionOffload)
|
||||
.join(
|
||||
WorkflowNodeExecutionModel,
|
||||
WorkflowNodeExecutionOffload.node_execution_id == WorkflowNodeExecutionModel.id,
|
||||
)
|
||||
.where(tuple_filter)
|
||||
.where(WorkflowNodeExecutionOffload.node_execution_id.in_(node_execution_ids))
|
||||
)
|
||||
or 0
|
||||
)
|
||||
|
||||
@@ -42,6 +42,7 @@ class BillingService:
|
||||
params = {"tenant_id": tenant_id}
|
||||
|
||||
billing_info = cls._send_request("GET", "/subscription/info", params=params)
|
||||
cls._log_expiration_date_if_missing(tenant_id, billing_info, "get_info")
|
||||
return billing_info
|
||||
|
||||
@classmethod
|
||||
@@ -281,6 +282,7 @@ class BillingService:
|
||||
try:
|
||||
subscription_plan = subscription_adapter.validate_python(plan)
|
||||
results[tenant_id] = subscription_plan
|
||||
cls._log_expiration_date_if_missing(tenant_id, subscription_plan, "get_plan_bulk")
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"get_plan_bulk: failed to validate subscription plan for tenant(%s)", tenant_id
|
||||
@@ -296,6 +298,21 @@ class BillingService:
|
||||
def _make_plan_cache_key(cls, tenant_id: str) -> str:
|
||||
return f"{cls._PLAN_CACHE_KEY_PREFIX}{tenant_id}"
|
||||
|
||||
@staticmethod
|
||||
def _log_expiration_date_if_missing(tenant_id: str, payload: dict, source: str) -> None:
|
||||
expiration_date = None
|
||||
if isinstance(payload, dict):
|
||||
if "expiration_date" in payload:
|
||||
expiration_date = payload.get("expiration_date")
|
||||
elif isinstance(payload.get("subscription"), dict):
|
||||
expiration_date = payload["subscription"].get("expiration_date")
|
||||
if expiration_date == -1:
|
||||
logger.warning(
|
||||
"billing %s: tenant %s returned expiration_date=-1",
|
||||
source,
|
||||
tenant_id,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_plan_bulk_with_cache(cls, tenant_ids: Sequence[str]) -> dict[str, SubscriptionPlan]:
|
||||
"""
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
import datetime
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Iterable, Sequence
|
||||
from contextlib import contextmanager
|
||||
|
||||
import click
|
||||
from sqlalchemy import event
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from configs import dify_config
|
||||
@@ -28,6 +31,8 @@ class WorkflowRunCleanup:
|
||||
end_before: datetime.datetime | None = None,
|
||||
workflow_run_repo: APIWorkflowRunRepository | None = None,
|
||||
dry_run: bool = False,
|
||||
log_sql: bool = False,
|
||||
log_sql_min_ms: int = 0,
|
||||
):
|
||||
if (start_from is None) ^ (end_before is None):
|
||||
raise ValueError("start_from and end_before must be both set or both omitted.")
|
||||
@@ -45,6 +50,8 @@ class WorkflowRunCleanup:
|
||||
self.batch_size = batch_size
|
||||
self._cleanup_whitelist: set[str] | None = None
|
||||
self.dry_run = dry_run
|
||||
self.log_sql = log_sql
|
||||
self.log_sql_min_ms = max(0, log_sql_min_ms)
|
||||
self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD
|
||||
self.workflow_run_repo: APIWorkflowRunRepository
|
||||
if workflow_run_repo:
|
||||
@@ -56,6 +63,38 @@ class WorkflowRunCleanup:
|
||||
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
|
||||
self.workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
|
||||
|
||||
@contextmanager
|
||||
def _sql_logger(self):
|
||||
if not self.log_sql:
|
||||
yield
|
||||
return
|
||||
|
||||
def _before_cursor_execute(conn, cursor, statement, parameters, context, executemany) -> None:
|
||||
context._dify_sql_start_time = time.monotonic()
|
||||
context._dify_sql_statement = statement
|
||||
context._dify_sql_parameters = parameters
|
||||
|
||||
def _after_cursor_execute(conn, cursor, statement, parameters, context, executemany) -> None:
|
||||
start = getattr(context, "_dify_sql_start_time", None)
|
||||
if start is None:
|
||||
return
|
||||
elapsed_ms = (time.monotonic() - start) * 1000
|
||||
if elapsed_ms < self.log_sql_min_ms:
|
||||
return
|
||||
logged_statement = getattr(context, "_dify_sql_statement", statement)
|
||||
logged_parameters = getattr(context, "_dify_sql_parameters", parameters)
|
||||
click.echo(f"[sql] {elapsed_ms:.1f} ms {logged_statement}")
|
||||
if logged_parameters:
|
||||
click.echo(f"[sql] params: {logged_parameters}")
|
||||
|
||||
event.listen(db.engine, "before_cursor_execute", _before_cursor_execute)
|
||||
event.listen(db.engine, "after_cursor_execute", _after_cursor_execute)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
event.remove(db.engine, "before_cursor_execute", _before_cursor_execute)
|
||||
event.remove(db.engine, "after_cursor_execute", _after_cursor_execute)
|
||||
|
||||
def run(self) -> None:
|
||||
click.echo(
|
||||
click.style(
|
||||
@@ -74,74 +113,95 @@ class WorkflowRunCleanup:
|
||||
batch_index = 0
|
||||
last_seen: tuple[datetime.datetime, str] | None = None
|
||||
|
||||
while True:
|
||||
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
|
||||
start_from=self.window_start,
|
||||
end_before=self.window_end,
|
||||
last_seen=last_seen,
|
||||
batch_size=self.batch_size,
|
||||
)
|
||||
if not run_rows:
|
||||
break
|
||||
with self._sql_logger():
|
||||
while True:
|
||||
batch_start = time.monotonic()
|
||||
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
|
||||
start_from=self.window_start,
|
||||
end_before=self.window_end,
|
||||
last_seen=last_seen,
|
||||
batch_size=self.batch_size,
|
||||
)
|
||||
fetch_ms = (time.monotonic() - batch_start) * 1000
|
||||
if not run_rows:
|
||||
break
|
||||
|
||||
batch_index += 1
|
||||
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
|
||||
tenant_ids = {row.tenant_id for row in run_rows}
|
||||
free_tenants = self._filter_free_tenants(tenant_ids)
|
||||
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
|
||||
paid_or_skipped = len(run_rows) - len(free_runs)
|
||||
batch_index += 1
|
||||
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
|
||||
tenant_ids = {row.tenant_id for row in run_rows}
|
||||
billing_start = time.monotonic()
|
||||
free_tenants = self._filter_free_tenants(tenant_ids)
|
||||
billing_ms = (time.monotonic() - billing_start) * 1000
|
||||
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
|
||||
paid_or_skipped = len(run_rows) - len(free_runs)
|
||||
|
||||
if not free_runs:
|
||||
if self.log_sql:
|
||||
click.echo(
|
||||
click.style(
|
||||
f"[batch #{batch_index}] fetch_ms={fetch_ms:.1f} billing_ms={billing_ms:.1f}",
|
||||
fg="white",
|
||||
)
|
||||
)
|
||||
|
||||
if not free_runs:
|
||||
skipped_message = (
|
||||
f"[batch #{batch_index}] skipped (no sandbox runs in batch, "
|
||||
f"{paid_or_skipped} paid/unknown)"
|
||||
)
|
||||
click.echo(
|
||||
click.style(
|
||||
skipped_message,
|
||||
fg="yellow",
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
total_runs_targeted += len(free_runs)
|
||||
|
||||
if self.dry_run:
|
||||
count_start = time.monotonic()
|
||||
batch_counts = self.workflow_run_repo.count_runs_with_related(
|
||||
free_runs,
|
||||
count_node_executions=self._count_node_executions,
|
||||
count_trigger_logs=self._count_trigger_logs,
|
||||
)
|
||||
count_ms = (time.monotonic() - count_start) * 1000
|
||||
if self.log_sql:
|
||||
click.echo(click.style(f"[batch #{batch_index}] count_ms={count_ms:.1f}", fg="white"))
|
||||
if related_totals is not None:
|
||||
for key in related_totals:
|
||||
related_totals[key] += batch_counts.get(key, 0)
|
||||
sample_ids = ", ".join(run.id for run in free_runs[:5])
|
||||
click.echo(
|
||||
click.style(
|
||||
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
|
||||
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
|
||||
fg="yellow",
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
counts = self.workflow_run_repo.delete_runs_with_related(
|
||||
free_runs,
|
||||
delete_node_executions=self._delete_node_executions,
|
||||
delete_trigger_logs=self._delete_trigger_logs,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
|
||||
raise
|
||||
|
||||
total_runs_deleted += counts["runs"]
|
||||
click.echo(
|
||||
click.style(
|
||||
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)",
|
||||
fg="yellow",
|
||||
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
|
||||
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
|
||||
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
|
||||
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
|
||||
f"skipped {paid_or_skipped} paid/unknown",
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
total_runs_targeted += len(free_runs)
|
||||
|
||||
if self.dry_run:
|
||||
batch_counts = self.workflow_run_repo.count_runs_with_related(
|
||||
free_runs,
|
||||
count_node_executions=self._count_node_executions,
|
||||
count_trigger_logs=self._count_trigger_logs,
|
||||
)
|
||||
if related_totals is not None:
|
||||
for key in related_totals:
|
||||
related_totals[key] += batch_counts.get(key, 0)
|
||||
sample_ids = ", ".join(run.id for run in free_runs[:5])
|
||||
click.echo(
|
||||
click.style(
|
||||
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
|
||||
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
|
||||
fg="yellow",
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
counts = self.workflow_run_repo.delete_runs_with_related(
|
||||
free_runs,
|
||||
delete_node_executions=self._delete_node_executions,
|
||||
delete_trigger_logs=self._delete_trigger_logs,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
|
||||
raise
|
||||
|
||||
total_runs_deleted += counts["runs"]
|
||||
click.echo(
|
||||
click.style(
|
||||
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
|
||||
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
|
||||
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
|
||||
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
|
||||
f"skipped {paid_or_skipped} paid/unknown",
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
|
||||
if self.dry_run:
|
||||
if self.window_start:
|
||||
|
||||
Reference in New Issue
Block a user