feat(api): implement heartbeat mechanism for database migration lock

- Added a heartbeat function to renew the Redis lock during database migrations, preventing long blockages from crashed processes.
- Updated the upgrade_db command to utilize the new locking mechanism with a configurable TTL.
- Removed the deprecated MIGRATION_LOCK_TTL from DeploymentConfig and related files.
- Enhanced unit tests to cover the new lock renewal behavior and error handling during migrations.

(cherry picked from commit a3331c622435f9f215b95f6b0261f43ae56a9d9c)
This commit is contained in:
L1nSn0w
2026-02-13 12:48:10 +08:00
parent 9acdfbde2f
commit afdd5b6c86
5 changed files with 115 additions and 19 deletions

View File

@@ -3,8 +3,9 @@ import datetime
import json
import logging
import secrets
import threading
import time
from typing import Any
from typing import TYPE_CHECKING, Any
import click
import sqlalchemy as sa
@@ -55,6 +56,35 @@ from tasks.remove_app_and_related_data_task import delete_draft_variables_batch
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from redis.lock import Lock
DB_UPGRADE_LOCK_TTL_SECONDS = 60
def _heartbeat_db_upgrade_lock(lock: "Lock", stop_event: threading.Event, ttl_seconds: float) -> None:
"""
Keep the DB upgrade lock alive while migrations are running.
We intentionally keep the base TTL small (e.g. 60s) so that if the process is killed and can't
release the lock, the lock will naturally expire soon. While the process is alive, this
heartbeat periodically resets the TTL via `lock.reacquire()`.
"""
interval_seconds = max(0.1, ttl_seconds / 3)
while not stop_event.wait(interval_seconds):
try:
lock.reacquire()
except LockNotOwnedError:
# Another process took over / TTL expired; continuing to retry won't help.
logger.warning("DB migration lock is no longer owned during heartbeat; stop renewing.")
return
except RedisError:
# Best-effort: keep trying while the process is alive.
logger.warning("Failed to renew DB migration lock due to Redis error; will retry.", exc_info=True)
except Exception:
logger.warning("Unexpected error while renewing DB migration lock; will retry.", exc_info=True)
@click.command("reset-password", help="Reset the account password.")
@click.option("--email", prompt=True, help="Account email to reset password for")
@@ -728,8 +758,21 @@ def create_tenant(email: str, language: str | None = None, name: str | None = No
@click.command("upgrade-db", help="Upgrade the database")
def upgrade_db():
click.echo("Preparing database migration...")
lock = redis_client.lock(name="db_upgrade_lock", timeout=dify_config.MIGRATION_LOCK_TTL)
# Use a short base TTL + heartbeat renewal, so a crashed process doesn't block migrations for long.
# thread_local=False is required because heartbeat runs in a separate thread.
lock = redis_client.lock(
name="db_upgrade_lock",
timeout=DB_UPGRADE_LOCK_TTL_SECONDS,
thread_local=False,
)
if lock.acquire(blocking=False):
stop_event = threading.Event()
heartbeat_thread = threading.Thread(
target=_heartbeat_db_upgrade_lock,
args=(lock, stop_event, float(DB_UPGRADE_LOCK_TTL_SECONDS)),
daemon=True,
)
heartbeat_thread.start()
try:
click.echo(click.style("Starting database migration.", fg="green"))
@@ -745,6 +788,8 @@ def upgrade_db():
click.echo(click.style(f"Database migration failed: {e}", fg="red"))
raise SystemExit(1)
finally:
stop_event.set()
heartbeat_thread.join(timeout=5)
# Lock release errors should never mask the real migration failure.
try:
lock.release()

View File

@@ -1,4 +1,4 @@
from pydantic import Field, PositiveInt
from pydantic import Field
from pydantic_settings import BaseSettings
@@ -33,7 +33,3 @@ class DeploymentConfig(BaseSettings):
default="PRODUCTION",
)
MIGRATION_LOCK_TTL: PositiveInt = Field(
description="Redis lock TTL for startup DB migration (seconds). Increase for large/slow databases.",
default=3600,
)

View File

@@ -1,9 +1,9 @@
import sys
import threading
import types
from unittest.mock import MagicMock
import commands
from configs import dify_config
def _install_fake_flask_migrate(monkeypatch, upgrade_impl) -> None:
@@ -21,7 +21,7 @@ def _invoke_upgrade_db() -> int:
def test_upgrade_db_skips_when_lock_not_acquired(monkeypatch, capsys):
monkeypatch.setattr(dify_config, "MIGRATION_LOCK_TTL", 1234)
monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 1234)
lock = MagicMock()
lock.acquire.return_value = False
@@ -33,13 +33,13 @@ def test_upgrade_db_skips_when_lock_not_acquired(monkeypatch, capsys):
assert exit_code == 0
assert "Database migration skipped" in captured.out
commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=1234)
commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=1234, thread_local=False)
lock.acquire.assert_called_once_with(blocking=False)
lock.release.assert_not_called()
def test_upgrade_db_failure_not_masked_by_lock_release(monkeypatch, capsys):
monkeypatch.setattr(dify_config, "MIGRATION_LOCK_TTL", 321)
monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 321)
lock = MagicMock()
lock.acquire.return_value = True
@@ -57,13 +57,13 @@ def test_upgrade_db_failure_not_masked_by_lock_release(monkeypatch, capsys):
assert exit_code == 1
assert "Database migration failed: boom" in captured.out
commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=321)
commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=321, thread_local=False)
lock.acquire.assert_called_once_with(blocking=False)
lock.release.assert_called_once()
def test_upgrade_db_success_ignores_lock_not_owned_on_release(monkeypatch, capsys):
monkeypatch.setattr(dify_config, "MIGRATION_LOCK_TTL", 999)
monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 999)
lock = MagicMock()
lock.acquire.return_value = True
@@ -78,7 +78,67 @@ def test_upgrade_db_success_ignores_lock_not_owned_on_release(monkeypatch, capsy
assert exit_code == 0
assert "Database migration successful!" in captured.out
commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=999)
commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=999, thread_local=False)
lock.acquire.assert_called_once_with(blocking=False)
lock.release.assert_called_once()
def test_upgrade_db_renews_lock_during_migration(monkeypatch, capsys):
"""
Ensure the lock is renewed while migrations are running, so the base TTL can stay short.
"""
# Use a small TTL so the heartbeat interval triggers quickly.
monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 0.3)
lock = MagicMock()
lock.acquire.return_value = True
commands.redis_client.lock.return_value = lock
renewed = threading.Event()
def _reacquire():
renewed.set()
return True
lock.reacquire.side_effect = _reacquire
def _upgrade():
assert renewed.wait(1.0)
_install_fake_flask_migrate(monkeypatch, _upgrade)
exit_code = _invoke_upgrade_db()
_ = capsys.readouterr()
assert exit_code == 0
assert lock.reacquire.call_count >= 1
def test_upgrade_db_ignores_reacquire_errors(monkeypatch, capsys):
# Use a small TTL so heartbeat runs during the upgrade call.
monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 0.3)
lock = MagicMock()
lock.acquire.return_value = True
commands.redis_client.lock.return_value = lock
attempted = threading.Event()
def _reacquire():
attempted.set()
raise commands.RedisError("simulated")
lock.reacquire.side_effect = _reacquire
def _upgrade():
assert attempted.wait(1.0)
_install_fake_flask_migrate(monkeypatch, _upgrade)
exit_code = _invoke_upgrade_db()
_ = capsys.readouterr()
assert exit_code == 0
assert lock.reacquire.call_count >= 1