Compare commits

...

2 Commits

Author SHA1 Message Date
hj24
9f5078f271 feat: support scan window for celery schedule clean tasks
Some checks are pending
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Waiting to run
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Blocked by required conditions
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Blocked by required conditions
2026-02-13 17:41:09 +08:00
hj24
f6c07a53cb fix: optimize workflow run get batch cursor 2026-02-13 09:50:38 +08:00
7 changed files with 48 additions and 13 deletions

View File

@@ -719,6 +719,7 @@ SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL=200
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30
SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS=0
SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL=90000

View File

@@ -1355,6 +1355,12 @@ class SandboxExpiredRecordsCleanConfig(BaseSettings):
description="Retention days for sandbox expired workflow_run records and message records",
default=30,
)
SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS: NonNegativeInt = Field(
description="Scan window lower bound in days relative to cutoff for scheduled cleanup tasks. "
"0 means disabled (scan all records before cutoff). "
"When > 0, only records in [cutoff - window, cutoff) are scanned.",
default=0,
)
SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL: PositiveInt = Field(
description="Lock TTL for sandbox expired records clean task in seconds",
default=90000,

View File

@@ -29,7 +29,7 @@ from typing import Any, cast
import sqlalchemy as sa
from pydantic import ValidationError
from sqlalchemy import and_, delete, func, null, or_, select
from sqlalchemy import and_, delete, func, null, or_, select, tuple_
from sqlalchemy.engine import CursorResult
from sqlalchemy.orm import Session, selectinload, sessionmaker
@@ -423,9 +423,10 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
if last_seen:
stmt = stmt.where(
or_(
WorkflowRun.created_at > last_seen[0],
and_(WorkflowRun.created_at == last_seen[0], WorkflowRun.id > last_seen[1]),
tuple_(WorkflowRun.created_at, WorkflowRun.id)
> tuple_(
sa.literal(last_seen[0], type_=sa.DateTime()),
sa.literal(last_seen[1], type_=WorkflowRun.id.type),
)
)

View File

@@ -1,5 +1,6 @@
import logging
import time
from datetime import datetime, timedelta
import click
from redis.exceptions import LockError
@@ -32,16 +33,30 @@ def clean_messages():
graceful_period_days=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD,
)
retention_days = dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS
scan_window_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS
batch_size = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE
# Create and run the cleanup service
# lock the task to avoid concurrent execution in case of the future data volume growth
with redis_client.lock(
"retention:clean_messages", timeout=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL, blocking=False
):
service = MessagesCleanService.from_days(
policy=policy,
days=dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS,
batch_size=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE,
)
if scan_window_days > 0:
end_before = datetime.now() - timedelta(days=retention_days)
start_from = end_before - timedelta(days=scan_window_days)
service = MessagesCleanService.from_time_range(
policy=policy,
start_from=start_from,
end_before=end_before,
batch_size=batch_size,
)
else:
service = MessagesCleanService.from_days(
policy=policy,
days=retention_days,
batch_size=batch_size,
)
stats = service.run()
end_at = time.perf_counter()

View File

@@ -1,5 +1,5 @@
import logging
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
import click
from redis.exceptions import LockError
@@ -30,6 +30,16 @@ def clean_workflow_runs_task() -> None:
start_time = datetime.now(UTC)
retention_days = dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS
scan_window_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS
if scan_window_days > 0:
end_before = datetime.now() - timedelta(days=retention_days)
start_from = end_before - timedelta(days=scan_window_days)
else:
start_from = None
end_before = None
try:
# lock the task to avoid concurrent execution in case of the future data volume growth
with redis_client.lock(
@@ -38,10 +48,10 @@ def clean_workflow_runs_task() -> None:
blocking=False,
):
WorkflowRunCleanup(
days=dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS,
days=retention_days,
batch_size=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE,
start_from=None,
end_before=None,
start_from=start_from,
end_before=end_before,
).run()
end_time = datetime.now(UTC)

View File

@@ -1527,6 +1527,7 @@ SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL=200
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30
SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS=0
# Redis URL used for PubSub between API and

View File

@@ -687,6 +687,7 @@ x-shared-env: &shared-api-worker-env
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE: ${SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE:-1000}
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL: ${SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL:-200}
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS: ${SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS:-30}
SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS: ${SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS:-0}
PUBSUB_REDIS_URL: ${PUBSUB_REDIS_URL:-}
PUBSUB_REDIS_CHANNEL_TYPE: ${PUBSUB_REDIS_CHANNEL_TYPE:-pubsub}
PUBSUB_REDIS_USE_CLUSTERS: ${PUBSUB_REDIS_USE_CLUSTERS:-false}