mirror of
https://github.com/langgenius/dify.git
synced 2026-02-13 12:13:59 +00:00
Compare commits
2 Commits
fix/db-mig
...
build/opti
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f5078f271 | ||
|
|
f6c07a53cb |
@@ -719,6 +719,7 @@ SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL=200
|
||||
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS=0
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL=90000
|
||||
|
||||
|
||||
|
||||
@@ -1355,6 +1355,12 @@ class SandboxExpiredRecordsCleanConfig(BaseSettings):
|
||||
description="Retention days for sandbox expired workflow_run records and message records",
|
||||
default=30,
|
||||
)
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS: NonNegativeInt = Field(
|
||||
description="Scan window lower bound in days relative to cutoff for scheduled cleanup tasks. "
|
||||
"0 means disabled (scan all records before cutoff). "
|
||||
"When > 0, only records in [cutoff - window, cutoff) are scanned.",
|
||||
default=0,
|
||||
)
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL: PositiveInt = Field(
|
||||
description="Lock TTL for sandbox expired records clean task in seconds",
|
||||
default=90000,
|
||||
|
||||
@@ -29,7 +29,7 @@ from typing import Any, cast
|
||||
|
||||
import sqlalchemy as sa
|
||||
from pydantic import ValidationError
|
||||
from sqlalchemy import and_, delete, func, null, or_, select
|
||||
from sqlalchemy import and_, delete, func, null, or_, select, tuple_
|
||||
from sqlalchemy.engine import CursorResult
|
||||
from sqlalchemy.orm import Session, selectinload, sessionmaker
|
||||
|
||||
@@ -423,9 +423,10 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
|
||||
if last_seen:
|
||||
stmt = stmt.where(
|
||||
or_(
|
||||
WorkflowRun.created_at > last_seen[0],
|
||||
and_(WorkflowRun.created_at == last_seen[0], WorkflowRun.id > last_seen[1]),
|
||||
tuple_(WorkflowRun.created_at, WorkflowRun.id)
|
||||
> tuple_(
|
||||
sa.literal(last_seen[0], type_=sa.DateTime()),
|
||||
sa.literal(last_seen[1], type_=WorkflowRun.id.type),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import click
|
||||
from redis.exceptions import LockError
|
||||
@@ -32,16 +33,30 @@ def clean_messages():
|
||||
graceful_period_days=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD,
|
||||
)
|
||||
|
||||
retention_days = dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS
|
||||
scan_window_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS
|
||||
batch_size = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE
|
||||
|
||||
# Create and run the cleanup service
|
||||
# lock the task to avoid concurrent execution in case of the future data volume growth
|
||||
with redis_client.lock(
|
||||
"retention:clean_messages", timeout=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL, blocking=False
|
||||
):
|
||||
service = MessagesCleanService.from_days(
|
||||
policy=policy,
|
||||
days=dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS,
|
||||
batch_size=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE,
|
||||
)
|
||||
if scan_window_days > 0:
|
||||
end_before = datetime.now() - timedelta(days=retention_days)
|
||||
start_from = end_before - timedelta(days=scan_window_days)
|
||||
service = MessagesCleanService.from_time_range(
|
||||
policy=policy,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
batch_size=batch_size,
|
||||
)
|
||||
else:
|
||||
service = MessagesCleanService.from_days(
|
||||
policy=policy,
|
||||
days=retention_days,
|
||||
batch_size=batch_size,
|
||||
)
|
||||
stats = service.run()
|
||||
|
||||
end_at = time.perf_counter()
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import logging
|
||||
from datetime import UTC, datetime
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
import click
|
||||
from redis.exceptions import LockError
|
||||
@@ -30,6 +30,16 @@ def clean_workflow_runs_task() -> None:
|
||||
|
||||
start_time = datetime.now(UTC)
|
||||
|
||||
retention_days = dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS
|
||||
scan_window_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS
|
||||
|
||||
if scan_window_days > 0:
|
||||
end_before = datetime.now() - timedelta(days=retention_days)
|
||||
start_from = end_before - timedelta(days=scan_window_days)
|
||||
else:
|
||||
start_from = None
|
||||
end_before = None
|
||||
|
||||
try:
|
||||
# lock the task to avoid concurrent execution in case of the future data volume growth
|
||||
with redis_client.lock(
|
||||
@@ -38,10 +48,10 @@ def clean_workflow_runs_task() -> None:
|
||||
blocking=False,
|
||||
):
|
||||
WorkflowRunCleanup(
|
||||
days=dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS,
|
||||
days=retention_days,
|
||||
batch_size=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE,
|
||||
start_from=None,
|
||||
end_before=None,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
).run()
|
||||
|
||||
end_time = datetime.now(UTC)
|
||||
|
||||
@@ -1527,6 +1527,7 @@ SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL=200
|
||||
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS=0
|
||||
|
||||
|
||||
# Redis URL used for PubSub between API and
|
||||
|
||||
@@ -687,6 +687,7 @@ x-shared-env: &shared-api-worker-env
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE: ${SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE:-1000}
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL: ${SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL:-200}
|
||||
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS: ${SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS:-30}
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS: ${SANDBOX_EXPIRED_RECORDS_CLEAN_SCAN_WINDOW_DAYS:-0}
|
||||
PUBSUB_REDIS_URL: ${PUBSUB_REDIS_URL:-}
|
||||
PUBSUB_REDIS_CHANNEL_TYPE: ${PUBSUB_REDIS_CHANNEL_TYPE:-pubsub}
|
||||
PUBSUB_REDIS_USE_CLUSTERS: ${PUBSUB_REDIS_USE_CLUSTERS:-false}
|
||||
|
||||
Reference in New Issue
Block a user