mirror of
https://github.com/langgenius/dify.git
synced 2026-02-24 18:05:11 +00:00
refactor(api): replace AutoRenewRedisLock with DbMigrationAutoRenewLock
- Updated the database migration locking mechanism to use DbMigrationAutoRenewLock for improved clarity and functionality. - Removed the AutoRenewRedisLock implementation and its associated tests. - Adjusted integration and unit tests to reflect the new locking class and its usage in the upgrade_db command. (cherry picked from commit c812ad9ff26bed3eb59862bd7a5179b7ee83f11f)
This commit is contained in:
@@ -30,7 +30,7 @@ from extensions.ext_redis import redis_client
|
||||
from extensions.ext_storage import storage
|
||||
from extensions.storage.opendal_storage import OpenDALStorage
|
||||
from extensions.storage.storage_type import StorageType
|
||||
from libs.auto_renew_redis_lock import AutoRenewRedisLock
|
||||
from libs.db_migration_lock import DbMigrationAutoRenewLock
|
||||
from libs.helper import email as email_validate
|
||||
from libs.password import hash_password, password_pattern, valid_password
|
||||
from libs.rsa import generate_key_pair
|
||||
@@ -730,7 +730,7 @@ def create_tenant(email: str, language: str | None = None, name: str | None = No
|
||||
@click.command("upgrade-db", help="Upgrade the database")
|
||||
def upgrade_db():
|
||||
click.echo("Preparing database migration...")
|
||||
lock = AutoRenewRedisLock(
|
||||
lock = DbMigrationAutoRenewLock(
|
||||
redis_client=redis_client,
|
||||
name="db_upgrade_lock",
|
||||
ttl_seconds=DB_UPGRADE_LOCK_TTL_SECONDS,
|
||||
|
||||
@@ -1,15 +1,13 @@
|
||||
"""
|
||||
Auto-renewing Redis distributed lock (redis-py Lock).
|
||||
DB migration Redis lock with heartbeat renewal.
|
||||
|
||||
Why this exists:
|
||||
- A fixed, long lock TTL can leave a stale lock for a long time if the process is killed
|
||||
before releasing it.
|
||||
- A fixed, short lock TTL can expire during long critical sections (e.g. DB migrations),
|
||||
allowing another instance to acquire the same lock concurrently.
|
||||
This is intentionally migration-specific. Background renewal is a trade-off that makes sense
|
||||
for unbounded, blocking operations like DB migrations (DDL/DML) where the main thread cannot
|
||||
periodically refresh the lock TTL.
|
||||
|
||||
This wrapper keeps a short base TTL and renews it in a daemon thread using `Lock.reacquire()`
|
||||
while the process is alive. If the process is terminated, the renewal stops and the lock
|
||||
expires soon.
|
||||
Do NOT use this as a general-purpose lock primitive for normal application code. Prefer explicit
|
||||
lock lifecycle management (e.g. redis-py Lock context manager + `extend()` / `reacquire()` from
|
||||
the same thread) when execution flow is under control.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -23,9 +21,9 @@ from redis.exceptions import LockNotOwnedError, RedisError
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AutoRenewRedisLock:
|
||||
class DbMigrationAutoRenewLock:
|
||||
"""
|
||||
Redis lock wrapper that automatically renews TTL while held.
|
||||
Redis lock wrapper that automatically renews TTL while held (migration-only).
|
||||
|
||||
Notes:
|
||||
- We force `thread_local=False` when creating the underlying redis-py lock, because the
|
||||
@@ -76,7 +74,7 @@ class AutoRenewRedisLock:
|
||||
|
||||
def acquire(self, *args: Any, **kwargs: Any) -> bool:
|
||||
"""
|
||||
Acquire the lock and start auto-renew heartbeat on success.
|
||||
Acquire the lock and start heartbeat renewal on success.
|
||||
|
||||
Accepts the same args/kwargs as redis-py `Lock.acquire()`.
|
||||
"""
|
||||
@@ -111,7 +109,7 @@ class AutoRenewRedisLock:
|
||||
target=self._heartbeat_loop,
|
||||
args=(self._lock, self._stop_event),
|
||||
daemon=True,
|
||||
name=f"AutoRenewRedisLock({self._name})",
|
||||
name=f"DbMigrationAutoRenewLock({self._name})",
|
||||
)
|
||||
self._thread.start()
|
||||
|
||||
@@ -121,20 +119,20 @@ class AutoRenewRedisLock:
|
||||
lock.reacquire()
|
||||
except LockNotOwnedError:
|
||||
self._logger.warning(
|
||||
"Auto-renew lock is no longer owned during heartbeat%s; stop renewing.",
|
||||
"DB migration lock is no longer owned during heartbeat%s; stop renewing.",
|
||||
f" ({self._log_context})" if self._log_context else "",
|
||||
exc_info=True,
|
||||
)
|
||||
return
|
||||
except RedisError:
|
||||
self._logger.warning(
|
||||
"Failed to renew auto-renew lock due to Redis error%s; will retry.",
|
||||
"Failed to renew DB migration lock due to Redis error%s; will retry.",
|
||||
f" ({self._log_context})" if self._log_context else "",
|
||||
exc_info=True,
|
||||
)
|
||||
except Exception:
|
||||
self._logger.warning(
|
||||
"Unexpected error while renewing auto-renew lock%s; will retry.",
|
||||
"Unexpected error while renewing DB migration lock%s; will retry.",
|
||||
f" ({self._log_context})" if self._log_context else "",
|
||||
exc_info=True,
|
||||
)
|
||||
@@ -157,21 +155,21 @@ class AutoRenewRedisLock:
|
||||
lock.release()
|
||||
except LockNotOwnedError:
|
||||
self._logger.warning(
|
||||
"Auto-renew lock not owned on release%s%s; ignoring.",
|
||||
"DB migration lock not owned on release%s%s; ignoring.",
|
||||
f" after {status} operation" if status else "",
|
||||
f" ({self._log_context})" if self._log_context else "",
|
||||
exc_info=True,
|
||||
)
|
||||
except RedisError:
|
||||
self._logger.warning(
|
||||
"Failed to release auto-renew lock due to Redis error%s%s; ignoring.",
|
||||
"Failed to release DB migration lock due to Redis error%s%s; ignoring.",
|
||||
f" after {status} operation" if status else "",
|
||||
f" ({self._log_context})" if self._log_context else "",
|
||||
exc_info=True,
|
||||
)
|
||||
except Exception:
|
||||
self._logger.warning(
|
||||
"Unexpected error while releasing auto-renew lock%s%s; ignoring.",
|
||||
"Unexpected error while releasing DB migration lock%s%s; ignoring.",
|
||||
f" after {status} operation" if status else "",
|
||||
f" ({self._log_context})" if self._log_context else "",
|
||||
exc_info=True,
|
||||
@@ -189,7 +187,7 @@ class AutoRenewRedisLock:
|
||||
self._thread.join(timeout=join_timeout_seconds)
|
||||
if self._thread.is_alive():
|
||||
self._logger.warning(
|
||||
"Auto-renew lock heartbeat thread did not stop within %.2fs%s; ignoring.",
|
||||
"DB migration lock heartbeat thread did not stop within %.2fs%s; ignoring.",
|
||||
join_timeout_seconds,
|
||||
f" ({self._log_context})" if self._log_context else "",
|
||||
)
|
||||
@@ -1,5 +1,5 @@
|
||||
"""
|
||||
Integration tests for AutoRenewRedisLock using real Redis via TestContainers.
|
||||
Integration tests for DbMigrationAutoRenewLock using real Redis via TestContainers.
|
||||
"""
|
||||
|
||||
import time
|
||||
@@ -8,20 +8,20 @@ import uuid
|
||||
import pytest
|
||||
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs.auto_renew_redis_lock import AutoRenewRedisLock
|
||||
from libs.db_migration_lock import DbMigrationAutoRenewLock
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("flask_app_with_containers")
|
||||
def test_auto_renew_redis_lock_renews_ttl_and_releases():
|
||||
lock_name = f"test:auto_renew_lock:{uuid.uuid4().hex}"
|
||||
def test_db_migration_lock_renews_ttl_and_releases():
|
||||
lock_name = f"test:db_migration_auto_renew_lock:{uuid.uuid4().hex}"
|
||||
|
||||
# Keep base TTL very small, and renew frequently so the test is stable even on slower CI.
|
||||
lock = AutoRenewRedisLock(
|
||||
lock = DbMigrationAutoRenewLock(
|
||||
redis_client=redis_client,
|
||||
name=lock_name,
|
||||
ttl_seconds=1.0,
|
||||
renew_interval_seconds=0.2,
|
||||
log_context="test_auto_renew_redis_lock",
|
||||
log_context="test_db_migration_lock",
|
||||
)
|
||||
|
||||
acquired = lock.acquire(blocking=True, blocking_timeout=5)
|
||||
|
||||
@@ -4,7 +4,7 @@ import types
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import commands
|
||||
from libs.auto_renew_redis_lock import LockNotOwnedError, RedisError
|
||||
from libs.db_migration_lock import LockNotOwnedError, RedisError
|
||||
|
||||
HEARTBEAT_WAIT_TIMEOUT_SECONDS = 5.0
|
||||
|
||||
|
||||
Reference in New Issue
Block a user