From afdd5b6c860b2d0cbc401f4b299fc46df9c5f8c5 Mon Sep 17 00:00:00 2001 From: L1nSn0w Date: Fri, 13 Feb 2026 12:48:10 +0800 Subject: [PATCH] feat(api): implement heartbeat mechanism for database migration lock - Added a heartbeat function to renew the Redis lock during database migrations, preventing long blockages from crashed processes. - Updated the upgrade_db command to utilize the new locking mechanism with a configurable TTL. - Removed the deprecated MIGRATION_LOCK_TTL from DeploymentConfig and related files. - Enhanced unit tests to cover the new lock renewal behavior and error handling during migrations. (cherry picked from commit a3331c622435f9f215b95f6b0261f43ae56a9d9c) --- api/commands.py | 49 +++++++++++- api/configs/deploy/__init__.py | 6 +- .../unit_tests/commands/test_upgrade_db.py | 74 +++++++++++++++++-- docker/.env.example | 4 - docker/docker-compose.yaml | 1 - 5 files changed, 115 insertions(+), 19 deletions(-) diff --git a/api/commands.py b/api/commands.py index fbf16de8be..4cc2f476f2 100644 --- a/api/commands.py +++ b/api/commands.py @@ -3,8 +3,9 @@ import datetime import json import logging import secrets +import threading import time -from typing import Any +from typing import TYPE_CHECKING, Any import click import sqlalchemy as sa @@ -55,6 +56,35 @@ from tasks.remove_app_and_related_data_task import delete_draft_variables_batch logger = logging.getLogger(__name__) +if TYPE_CHECKING: + from redis.lock import Lock + +DB_UPGRADE_LOCK_TTL_SECONDS = 60 + + +def _heartbeat_db_upgrade_lock(lock: "Lock", stop_event: threading.Event, ttl_seconds: float) -> None: + """ + Keep the DB upgrade lock alive while migrations are running. + + We intentionally keep the base TTL small (e.g. 60s) so that if the process is killed and can't + release the lock, the lock will naturally expire soon. While the process is alive, this + heartbeat periodically resets the TTL via `lock.reacquire()`. + """ + + interval_seconds = max(0.1, ttl_seconds / 3) + while not stop_event.wait(interval_seconds): + try: + lock.reacquire() + except LockNotOwnedError: + # Another process took over / TTL expired; continuing to retry won't help. + logger.warning("DB migration lock is no longer owned during heartbeat; stop renewing.") + return + except RedisError: + # Best-effort: keep trying while the process is alive. + logger.warning("Failed to renew DB migration lock due to Redis error; will retry.", exc_info=True) + except Exception: + logger.warning("Unexpected error while renewing DB migration lock; will retry.", exc_info=True) + @click.command("reset-password", help="Reset the account password.") @click.option("--email", prompt=True, help="Account email to reset password for") @@ -728,8 +758,21 @@ def create_tenant(email: str, language: str | None = None, name: str | None = No @click.command("upgrade-db", help="Upgrade the database") def upgrade_db(): click.echo("Preparing database migration...") - lock = redis_client.lock(name="db_upgrade_lock", timeout=dify_config.MIGRATION_LOCK_TTL) + # Use a short base TTL + heartbeat renewal, so a crashed process doesn't block migrations for long. + # thread_local=False is required because heartbeat runs in a separate thread. + lock = redis_client.lock( + name="db_upgrade_lock", + timeout=DB_UPGRADE_LOCK_TTL_SECONDS, + thread_local=False, + ) if lock.acquire(blocking=False): + stop_event = threading.Event() + heartbeat_thread = threading.Thread( + target=_heartbeat_db_upgrade_lock, + args=(lock, stop_event, float(DB_UPGRADE_LOCK_TTL_SECONDS)), + daemon=True, + ) + heartbeat_thread.start() try: click.echo(click.style("Starting database migration.", fg="green")) @@ -745,6 +788,8 @@ def upgrade_db(): click.echo(click.style(f"Database migration failed: {e}", fg="red")) raise SystemExit(1) finally: + stop_event.set() + heartbeat_thread.join(timeout=5) # Lock release errors should never mask the real migration failure. try: lock.release() diff --git a/api/configs/deploy/__init__.py b/api/configs/deploy/__init__.py index 4ac57f0370..7db212a3d8 100644 --- a/api/configs/deploy/__init__.py +++ b/api/configs/deploy/__init__.py @@ -1,4 +1,4 @@ -from pydantic import Field, PositiveInt +from pydantic import Field from pydantic_settings import BaseSettings @@ -33,7 +33,3 @@ class DeploymentConfig(BaseSettings): default="PRODUCTION", ) - MIGRATION_LOCK_TTL: PositiveInt = Field( - description="Redis lock TTL for startup DB migration (seconds). Increase for large/slow databases.", - default=3600, - ) diff --git a/api/tests/unit_tests/commands/test_upgrade_db.py b/api/tests/unit_tests/commands/test_upgrade_db.py index c262ef71cc..59d47de895 100644 --- a/api/tests/unit_tests/commands/test_upgrade_db.py +++ b/api/tests/unit_tests/commands/test_upgrade_db.py @@ -1,9 +1,9 @@ import sys +import threading import types from unittest.mock import MagicMock import commands -from configs import dify_config def _install_fake_flask_migrate(monkeypatch, upgrade_impl) -> None: @@ -21,7 +21,7 @@ def _invoke_upgrade_db() -> int: def test_upgrade_db_skips_when_lock_not_acquired(monkeypatch, capsys): - monkeypatch.setattr(dify_config, "MIGRATION_LOCK_TTL", 1234) + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 1234) lock = MagicMock() lock.acquire.return_value = False @@ -33,13 +33,13 @@ def test_upgrade_db_skips_when_lock_not_acquired(monkeypatch, capsys): assert exit_code == 0 assert "Database migration skipped" in captured.out - commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=1234) + commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=1234, thread_local=False) lock.acquire.assert_called_once_with(blocking=False) lock.release.assert_not_called() def test_upgrade_db_failure_not_masked_by_lock_release(monkeypatch, capsys): - monkeypatch.setattr(dify_config, "MIGRATION_LOCK_TTL", 321) + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 321) lock = MagicMock() lock.acquire.return_value = True @@ -57,13 +57,13 @@ def test_upgrade_db_failure_not_masked_by_lock_release(monkeypatch, capsys): assert exit_code == 1 assert "Database migration failed: boom" in captured.out - commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=321) + commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=321, thread_local=False) lock.acquire.assert_called_once_with(blocking=False) lock.release.assert_called_once() def test_upgrade_db_success_ignores_lock_not_owned_on_release(monkeypatch, capsys): - monkeypatch.setattr(dify_config, "MIGRATION_LOCK_TTL", 999) + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 999) lock = MagicMock() lock.acquire.return_value = True @@ -78,7 +78,67 @@ def test_upgrade_db_success_ignores_lock_not_owned_on_release(monkeypatch, capsy assert exit_code == 0 assert "Database migration successful!" in captured.out - commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=999) + commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=999, thread_local=False) lock.acquire.assert_called_once_with(blocking=False) lock.release.assert_called_once() + +def test_upgrade_db_renews_lock_during_migration(monkeypatch, capsys): + """ + Ensure the lock is renewed while migrations are running, so the base TTL can stay short. + """ + + # Use a small TTL so the heartbeat interval triggers quickly. + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 0.3) + + lock = MagicMock() + lock.acquire.return_value = True + commands.redis_client.lock.return_value = lock + + renewed = threading.Event() + + def _reacquire(): + renewed.set() + return True + + lock.reacquire.side_effect = _reacquire + + def _upgrade(): + assert renewed.wait(1.0) + + _install_fake_flask_migrate(monkeypatch, _upgrade) + + exit_code = _invoke_upgrade_db() + _ = capsys.readouterr() + + assert exit_code == 0 + assert lock.reacquire.call_count >= 1 + + +def test_upgrade_db_ignores_reacquire_errors(monkeypatch, capsys): + # Use a small TTL so heartbeat runs during the upgrade call. + monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 0.3) + + lock = MagicMock() + lock.acquire.return_value = True + commands.redis_client.lock.return_value = lock + + attempted = threading.Event() + + def _reacquire(): + attempted.set() + raise commands.RedisError("simulated") + + lock.reacquire.side_effect = _reacquire + + def _upgrade(): + assert attempted.wait(1.0) + + _install_fake_flask_migrate(monkeypatch, _upgrade) + + exit_code = _invoke_upgrade_db() + _ = capsys.readouterr() + + assert exit_code == 0 + assert lock.reacquire.call_count >= 1 + diff --git a/docker/.env.example b/docker/.env.example index 9339404b58..41a0205bf5 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -125,10 +125,6 @@ OPENAI_API_BASE=https://api.openai.com/v1 # and the application will start after the migrations have completed. MIGRATION_ENABLED=true -# Redis lock TTL (in seconds) for startup database migrations. -# Increase this value for long-running migrations to avoid concurrent upgrades in multi-replica deployments. -MIGRATION_LOCK_TTL=3600 - # File Access Time specifies a time interval in seconds for the file to be accessed. # The default value is 300 seconds. FILES_ACCESS_TIMEOUT=300 diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 57a0c089c8..1886f848e0 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -32,7 +32,6 @@ x-shared-env: &shared-api-worker-env CHECK_UPDATE_URL: ${CHECK_UPDATE_URL:-https://updates.dify.ai} OPENAI_API_BASE: ${OPENAI_API_BASE:-https://api.openai.com/v1} MIGRATION_ENABLED: ${MIGRATION_ENABLED:-true} - MIGRATION_LOCK_TTL: ${MIGRATION_LOCK_TTL:-3600} FILES_ACCESS_TIMEOUT: ${FILES_ACCESS_TIMEOUT:-300} ACCESS_TOKEN_EXPIRE_MINUTES: ${ACCESS_TOKEN_EXPIRE_MINUTES:-60} REFRESH_TOKEN_EXPIRE_DAYS: ${REFRESH_TOKEN_EXPIRE_DAYS:-30}