Compare commits

...

3 Commits

Author SHA1 Message Date
hj24
d1cdc85a2e chore: add random sleep for workflow cleanup
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
2026-02-06 13:58:49 +08:00
hj24
b4414901d1 fix: add random sleep to reduce db IOPS 2026-02-05 22:12:38 +08:00
hj24
34caf19f5b chore: add performance logs 2026-02-05 16:10:15 +08:00
7 changed files with 105 additions and 0 deletions

View File

@@ -715,5 +715,6 @@ ANNOTATION_IMPORT_MAX_CONCURRENT=5
# Sandbox expired records clean configuration
SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL=200
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30
SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL=90000

View File

@@ -1309,6 +1309,10 @@ class SandboxExpiredRecordsCleanConfig(BaseSettings):
description="Maximum number of records to process in each batch",
default=1000,
)
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL: PositiveInt = Field(
description="Maximum interval in milliseconds between batches",
default=200,
)
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS: PositiveInt = Field(
description="Retention days for sandbox expired workflow_run records and message records",
default=30,

View File

@@ -16,6 +16,7 @@ class SavedMessage(TypeBase):
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="saved_message_pkey"),
sa.Index("saved_message_message_idx", "app_id", "message_id", "created_by_role", "created_by"),
sa.Index("saved_message_message_id_idx", "message_id"),
)
id: Mapped[str] = mapped_column(

View File

@@ -1,6 +1,8 @@
import datetime
import logging
import os
import random
import time
from collections.abc import Sequence
from typing import cast
@@ -193,11 +195,15 @@ class MessagesCleanService:
self._end_before,
)
max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200))
while True:
stats["batches"] += 1
batch_start = time.monotonic()
# Step 1: Fetch a batch of messages using cursor
with Session(db.engine, expire_on_commit=False) as session:
fetch_messages_start = time.monotonic()
msg_stmt = (
select(Message.id, Message.app_id, Message.created_at)
.where(Message.created_at < self._end_before)
@@ -223,6 +229,12 @@ class MessagesCleanService:
SimpleMessage(id=msg_id, app_id=app_id, created_at=msg_created_at)
for msg_id, app_id, msg_created_at in raw_messages
]
logger.info(
"clean_messages (batch %s): fetched %s messages in %sms",
stats["batches"],
len(messages),
int((time.monotonic() - fetch_messages_start) * 1000),
)
# Track total messages fetched across all batches
stats["total_messages"] += len(messages)
@@ -241,8 +253,16 @@ class MessagesCleanService:
logger.info("clean_messages (batch %s): no app_ids found, skip", stats["batches"])
continue
fetch_apps_start = time.monotonic()
app_stmt = select(App.id, App.tenant_id).where(App.id.in_(app_ids))
apps = list(session.execute(app_stmt).all())
logger.info(
"clean_messages (batch %s): fetched %s apps for %s app_ids in %sms",
stats["batches"],
len(apps),
len(app_ids),
int((time.monotonic() - fetch_apps_start) * 1000),
)
if not apps:
logger.info("clean_messages (batch %s): no apps found, skip", stats["batches"])
@@ -252,7 +272,15 @@ class MessagesCleanService:
app_to_tenant: dict[str, str] = {app.id: app.tenant_id for app in apps}
# Step 3: Delegate to policy to determine which messages to delete
policy_start = time.monotonic()
message_ids_to_delete = self._policy.filter_message_ids(messages, app_to_tenant)
logger.info(
"clean_messages (batch %s): policy selected %s/%s messages in %sms",
stats["batches"],
len(message_ids_to_delete),
len(messages),
int((time.monotonic() - policy_start) * 1000),
)
if not message_ids_to_delete:
logger.info("clean_messages (batch %s): no messages to delete, skip", stats["batches"])
@@ -263,14 +291,20 @@ class MessagesCleanService:
# Step 4: Batch delete messages and their relations
if not self._dry_run:
with Session(db.engine, expire_on_commit=False) as session:
delete_relations_start = time.monotonic()
# Delete related records first
self._batch_delete_message_relations(session, message_ids_to_delete)
delete_relations_ms = int((time.monotonic() - delete_relations_start) * 1000)
# Delete messages
delete_messages_start = time.monotonic()
delete_stmt = delete(Message).where(Message.id.in_(message_ids_to_delete))
delete_result = cast(CursorResult, session.execute(delete_stmt))
messages_deleted = delete_result.rowcount
delete_messages_ms = int((time.monotonic() - delete_messages_start) * 1000)
commit_start = time.monotonic()
session.commit()
commit_ms = int((time.monotonic() - commit_start) * 1000)
stats["total_deleted"] += messages_deleted
@@ -280,6 +314,19 @@ class MessagesCleanService:
len(messages),
messages_deleted,
)
logger.info(
"clean_messages (batch %s): relations %sms, messages %sms, commit %sms, batch total %sms",
stats["batches"],
delete_relations_ms,
delete_messages_ms,
commit_ms,
int((time.monotonic() - batch_start) * 1000),
)
# Random sleep between batches to avoid overwhelming the database
sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311
logger.info("clean_messages (batch %s): sleeping for %.2fms", stats["batches"], sleep_ms)
time.sleep(sleep_ms / 1000)
else:
# Log random sample of message IDs that would be deleted (up to 10)
sample_size = min(10, len(message_ids_to_delete))

View File

@@ -1,5 +1,8 @@
import datetime
import logging
import os
import random
import time
from collections.abc import Iterable, Sequence
import click
@@ -72,7 +75,12 @@ class WorkflowRunCleanup:
batch_index = 0
last_seen: tuple[datetime.datetime, str] | None = None
max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200))
while True:
batch_start = time.monotonic()
fetch_start = time.monotonic()
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
start_from=self.window_start,
end_before=self.window_end,
@@ -80,12 +88,30 @@ class WorkflowRunCleanup:
batch_size=self.batch_size,
)
if not run_rows:
logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1)
break
batch_index += 1
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
logger.info(
"workflow_run_cleanup (batch #%s): fetched %s rows in %sms",
batch_index,
len(run_rows),
int((time.monotonic() - fetch_start) * 1000),
)
tenant_ids = {row.tenant_id for row in run_rows}
filter_start = time.monotonic()
free_tenants = self._filter_free_tenants(tenant_ids)
logger.info(
"workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms",
batch_index,
len(free_tenants),
len(tenant_ids),
int((time.monotonic() - filter_start) * 1000),
)
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
paid_or_skipped = len(run_rows) - len(free_runs)
@@ -104,11 +130,17 @@ class WorkflowRunCleanup:
total_runs_targeted += len(free_runs)
if self.dry_run:
count_start = time.monotonic()
batch_counts = self.workflow_run_repo.count_runs_with_related(
free_runs,
count_node_executions=self._count_node_executions,
count_trigger_logs=self._count_trigger_logs,
)
logger.info(
"workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms",
batch_index,
int((time.monotonic() - count_start) * 1000),
)
if related_totals is not None:
for key in related_totals:
related_totals[key] += batch_counts.get(key, 0)
@@ -120,14 +152,21 @@ class WorkflowRunCleanup:
fg="yellow",
)
)
logger.info(
"workflow_run_cleanup (batch #%s, dry_run): batch total %sms",
batch_index,
int((time.monotonic() - batch_start) * 1000),
)
continue
try:
delete_start = time.monotonic()
counts = self.workflow_run_repo.delete_runs_with_related(
free_runs,
delete_node_executions=self._delete_node_executions,
delete_trigger_logs=self._delete_trigger_logs,
)
delete_ms = int((time.monotonic() - delete_start) * 1000)
except Exception:
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
raise
@@ -143,6 +182,17 @@ class WorkflowRunCleanup:
fg="green",
)
)
logger.info(
"workflow_run_cleanup (batch #%s): delete %sms, batch total %sms",
batch_index,
delete_ms,
int((time.monotonic() - batch_start) * 1000),
)
# Random sleep between batches to avoid overwhelming the database
sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311
logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms)
time.sleep(sleep_ms / 1000)
if self.dry_run:
if self.window_start:

View File

@@ -1518,5 +1518,6 @@ AMPLITUDE_API_KEY=
# Sandbox expired records clean configuration
SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL=200
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30
SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL=90000

View File

@@ -682,6 +682,7 @@ x-shared-env: &shared-api-worker-env
AMPLITUDE_API_KEY: ${AMPLITUDE_API_KEY:-}
SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD: ${SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD:-21}
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE: ${SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE:-1000}
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL: ${SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL:-200}
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS: ${SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS:-30}
SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL: ${SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL:-90000}