Compare commits

...

8 Commits

Author SHA1 Message Date
autofix-ci[bot]
cce3cb5587 [autofix.ci] apply automated fixes 2026-02-14 12:19:19 +08:00
L1nSn0w
77aad22e61 refactor(api): replace AutoRenewRedisLock with DbMigrationAutoRenewLock
- Updated the database migration locking mechanism to use DbMigrationAutoRenewLock for improved clarity and functionality.
- Removed the AutoRenewRedisLock implementation and its associated tests.
- Adjusted integration and unit tests to reflect the new locking class and its usage in the upgrade_db command.
2026-02-14 12:19:19 +08:00
L1nSn0w
c9645186de refactor(api): replace heartbeat mechanism with AutoRenewRedisLock for database migration
- Removed the manual heartbeat function for renewing the Redis lock during database migrations.
- Integrated AutoRenewRedisLock to handle lock renewal automatically, simplifying the upgrade_db command.
- Updated unit tests to reflect changes in lock handling and error management during migrations.
2026-02-14 12:19:19 +08:00
L1nSn0w
46bbd1dc4b refactor(tests): replace hardcoded wait time with constant for clarity
- Introduced HEARTBEAT_WAIT_TIMEOUT_SECONDS constant to improve readability and maintainability of test code.
- Updated test assertions to use the new constant instead of a hardcoded value.
2026-02-14 12:19:19 +08:00
autofix-ci[bot]
44defff163 [autofix.ci] apply automated fixes 2026-02-14 12:19:19 +08:00
L1nSn0w
ff37f08e60 fix(api): improve logging for database migration lock release
- Added a migration_succeeded flag to track the success of database migrations.
- Enhanced logging messages to indicate the status of the migration when releasing the lock, providing clearer context for potential issues.
2026-02-14 12:19:19 +08:00
L1nSn0w
f5f8ab3f1f feat(api): implement heartbeat mechanism for database migration lock
- Added a heartbeat function to renew the Redis lock during database migrations, preventing long blockages from crashed processes.
- Updated the upgrade_db command to utilize the new locking mechanism with a configurable TTL.
- Removed the deprecated MIGRATION_LOCK_TTL from DeploymentConfig and related files.
- Enhanced unit tests to cover the new lock renewal behavior and error handling during migrations.
2026-02-14 12:19:19 +08:00
L1nSn0w
327eb947f0 feat(api): enhance database migration locking mechanism and configuration
- Introduced a configurable Redis lock TTL for database migrations in DeploymentConfig.
- Updated the upgrade_db command to handle lock release errors gracefully.
- Added documentation for the new MIGRATION_LOCK_TTL environment variable in the .env.example file and docker-compose.yaml.
2026-02-14 12:19:19 +08:00
5 changed files with 518 additions and 2 deletions

View File

@@ -30,6 +30,7 @@ from extensions.ext_redis import redis_client
from extensions.ext_storage import storage
from extensions.storage.opendal_storage import OpenDALStorage
from extensions.storage.storage_type import StorageType
from libs.db_migration_lock import DbMigrationAutoRenewLock
from libs.helper import email as email_validate
from libs.password import hash_password, password_pattern, valid_password
from libs.rsa import generate_key_pair
@@ -54,6 +55,8 @@ from tasks.remove_app_and_related_data_task import delete_draft_variables_batch
logger = logging.getLogger(__name__)
DB_UPGRADE_LOCK_TTL_SECONDS = 60
@click.command("reset-password", help="Reset the account password.")
@click.option("--email", prompt=True, help="Account email to reset password for")
@@ -727,8 +730,15 @@ def create_tenant(email: str, language: str | None = None, name: str | None = No
@click.command("upgrade-db", help="Upgrade the database")
def upgrade_db():
click.echo("Preparing database migration...")
lock = redis_client.lock(name="db_upgrade_lock", timeout=60)
lock = DbMigrationAutoRenewLock(
redis_client=redis_client,
name="db_upgrade_lock",
ttl_seconds=DB_UPGRADE_LOCK_TTL_SECONDS,
logger=logger,
log_context="db_migration",
)
if lock.acquire(blocking=False):
migration_succeeded = False
try:
click.echo(click.style("Starting database migration.", fg="green"))
@@ -737,6 +747,7 @@ def upgrade_db():
flask_migrate.upgrade()
migration_succeeded = True
click.echo(click.style("Database migration successful!", fg="green"))
except Exception as e:
@@ -744,7 +755,8 @@ def upgrade_db():
click.echo(click.style(f"Database migration failed: {e}", fg="red"))
raise SystemExit(1)
finally:
lock.release()
status = "successful" if migration_succeeded else "failed"
lock.release_safely(status=status)
else:
click.echo("Database migration skipped")

View File

@@ -0,0 +1,195 @@
"""
DB migration Redis lock with heartbeat renewal.
This is intentionally migration-specific. Background renewal is a trade-off that makes sense
for unbounded, blocking operations like DB migrations (DDL/DML) where the main thread cannot
periodically refresh the lock TTL.
Do NOT use this as a general-purpose lock primitive for normal application code. Prefer explicit
lock lifecycle management (e.g. redis-py Lock context manager + `extend()` / `reacquire()` from
the same thread) when execution flow is under control.
"""
from __future__ import annotations
import logging
import threading
from typing import Any
from redis.exceptions import LockNotOwnedError, RedisError
logger = logging.getLogger(__name__)
class DbMigrationAutoRenewLock:
"""
Redis lock wrapper that automatically renews TTL while held (migration-only).
Notes:
- We force `thread_local=False` when creating the underlying redis-py lock, because the
lock token must be accessible from the heartbeat thread for `reacquire()` to work.
- `release_safely()` is best-effort: it never raises, so it won't mask the caller's
primary error/exit code.
"""
_redis_client: Any
_name: str
_ttl_seconds: float
_renew_interval_seconds: float
_log_context: str | None
_logger: logging.Logger
_lock: Any
_stop_event: threading.Event | None
_thread: threading.Thread | None
_acquired: bool
def __init__(
self,
redis_client: Any,
name: str,
ttl_seconds: float = 60,
renew_interval_seconds: float | None = None,
*,
logger: logging.Logger | None = None,
log_context: str | None = None,
) -> None:
self._redis_client = redis_client
self._name = name
self._ttl_seconds = float(ttl_seconds)
self._renew_interval_seconds = (
float(renew_interval_seconds) if renew_interval_seconds is not None else max(0.1, self._ttl_seconds / 3)
)
self._logger = logger or logging.getLogger(__name__)
self._log_context = log_context
self._lock = None
self._stop_event = None
self._thread = None
self._acquired = False
@property
def name(self) -> str:
return self._name
def acquire(self, *args: Any, **kwargs: Any) -> bool:
"""
Acquire the lock and start heartbeat renewal on success.
Accepts the same args/kwargs as redis-py `Lock.acquire()`.
"""
self._lock = self._redis_client.lock(
name=self._name,
timeout=self._ttl_seconds,
thread_local=False,
)
acquired = bool(self._lock.acquire(*args, **kwargs))
self._acquired = acquired
if acquired:
self._start_heartbeat()
return acquired
def owned(self) -> bool:
if self._lock is None:
return False
try:
return bool(self._lock.owned())
except Exception:
# Ownership checks are best-effort and must not break callers.
return False
def _start_heartbeat(self) -> None:
if self._lock is None:
return
if self._stop_event is not None:
return
self._stop_event = threading.Event()
self._thread = threading.Thread(
target=self._heartbeat_loop,
args=(self._lock, self._stop_event),
daemon=True,
name=f"DbMigrationAutoRenewLock({self._name})",
)
self._thread.start()
def _heartbeat_loop(self, lock: Any, stop_event: threading.Event) -> None:
while not stop_event.wait(self._renew_interval_seconds):
try:
lock.reacquire()
except LockNotOwnedError:
self._logger.warning(
"DB migration lock is no longer owned during heartbeat%s; stop renewing.",
f" ({self._log_context})" if self._log_context else "",
exc_info=True,
)
return
except RedisError:
self._logger.warning(
"Failed to renew DB migration lock due to Redis error%s; will retry.",
f" ({self._log_context})" if self._log_context else "",
exc_info=True,
)
except Exception:
self._logger.warning(
"Unexpected error while renewing DB migration lock%s; will retry.",
f" ({self._log_context})" if self._log_context else "",
exc_info=True,
)
def release_safely(self, *, status: str | None = None) -> None:
"""
Stop heartbeat and release lock. Never raises.
Args:
status: Optional caller-provided status (e.g. 'successful'/'failed') to add context to logs.
"""
lock = self._lock
if lock is None:
return
self._stop_heartbeat()
# Lock release errors should never mask the real error/exit code.
try:
lock.release()
except LockNotOwnedError:
self._logger.warning(
"DB migration lock not owned on release%s%s; ignoring.",
f" after {status} operation" if status else "",
f" ({self._log_context})" if self._log_context else "",
exc_info=True,
)
except RedisError:
self._logger.warning(
"Failed to release DB migration lock due to Redis error%s%s; ignoring.",
f" after {status} operation" if status else "",
f" ({self._log_context})" if self._log_context else "",
exc_info=True,
)
except Exception:
self._logger.warning(
"Unexpected error while releasing DB migration lock%s%s; ignoring.",
f" after {status} operation" if status else "",
f" ({self._log_context})" if self._log_context else "",
exc_info=True,
)
finally:
self._acquired = False
def _stop_heartbeat(self) -> None:
if self._stop_event is None:
return
self._stop_event.set()
if self._thread is not None:
# Best-effort join: if Redis calls are blocked, the daemon thread may remain alive.
join_timeout_seconds = max(0.5, min(5.0, self._renew_interval_seconds * 2))
self._thread.join(timeout=join_timeout_seconds)
if self._thread.is_alive():
self._logger.warning(
"DB migration lock heartbeat thread did not stop within %.2fs%s; ignoring.",
join_timeout_seconds,
f" ({self._log_context})" if self._log_context else "",
)
self._stop_event = None
self._thread = None

View File

@@ -0,0 +1,38 @@
"""
Integration tests for DbMigrationAutoRenewLock using real Redis via TestContainers.
"""
import time
import uuid
import pytest
from extensions.ext_redis import redis_client
from libs.db_migration_lock import DbMigrationAutoRenewLock
@pytest.mark.usefixtures("flask_app_with_containers")
def test_db_migration_lock_renews_ttl_and_releases():
lock_name = f"test:db_migration_auto_renew_lock:{uuid.uuid4().hex}"
# Keep base TTL very small, and renew frequently so the test is stable even on slower CI.
lock = DbMigrationAutoRenewLock(
redis_client=redis_client,
name=lock_name,
ttl_seconds=1.0,
renew_interval_seconds=0.2,
log_context="test_db_migration_lock",
)
acquired = lock.acquire(blocking=True, blocking_timeout=5)
assert acquired is True
# Wait beyond the base TTL; key should still exist due to renewal.
time.sleep(1.5)
ttl = redis_client.ttl(lock_name)
assert ttl > 0
lock.release_safely(status="successful")
# After release, the key should not exist.
assert redis_client.exists(lock_name) == 0

View File

@@ -0,0 +1,146 @@
import sys
import threading
import types
from unittest.mock import MagicMock
import commands
from libs.db_migration_lock import LockNotOwnedError, RedisError
HEARTBEAT_WAIT_TIMEOUT_SECONDS = 5.0
def _install_fake_flask_migrate(monkeypatch, upgrade_impl) -> None:
module = types.ModuleType("flask_migrate")
module.upgrade = upgrade_impl
monkeypatch.setitem(sys.modules, "flask_migrate", module)
def _invoke_upgrade_db() -> int:
try:
commands.upgrade_db.callback()
except SystemExit as e:
return int(e.code or 0)
return 0
def test_upgrade_db_skips_when_lock_not_acquired(monkeypatch, capsys):
monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 1234)
lock = MagicMock()
lock.acquire.return_value = False
commands.redis_client.lock.return_value = lock
exit_code = _invoke_upgrade_db()
captured = capsys.readouterr()
assert exit_code == 0
assert "Database migration skipped" in captured.out
commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=1234, thread_local=False)
lock.acquire.assert_called_once_with(blocking=False)
lock.release.assert_not_called()
def test_upgrade_db_failure_not_masked_by_lock_release(monkeypatch, capsys):
monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 321)
lock = MagicMock()
lock.acquire.return_value = True
lock.release.side_effect = LockNotOwnedError("simulated")
commands.redis_client.lock.return_value = lock
def _upgrade():
raise RuntimeError("boom")
_install_fake_flask_migrate(monkeypatch, _upgrade)
exit_code = _invoke_upgrade_db()
captured = capsys.readouterr()
assert exit_code == 1
assert "Database migration failed: boom" in captured.out
commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=321, thread_local=False)
lock.acquire.assert_called_once_with(blocking=False)
lock.release.assert_called_once()
def test_upgrade_db_success_ignores_lock_not_owned_on_release(monkeypatch, capsys):
monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 999)
lock = MagicMock()
lock.acquire.return_value = True
lock.release.side_effect = LockNotOwnedError("simulated")
commands.redis_client.lock.return_value = lock
_install_fake_flask_migrate(monkeypatch, lambda: None)
exit_code = _invoke_upgrade_db()
captured = capsys.readouterr()
assert exit_code == 0
assert "Database migration successful!" in captured.out
commands.redis_client.lock.assert_called_once_with(name="db_upgrade_lock", timeout=999, thread_local=False)
lock.acquire.assert_called_once_with(blocking=False)
lock.release.assert_called_once()
def test_upgrade_db_renews_lock_during_migration(monkeypatch, capsys):
"""
Ensure the lock is renewed while migrations are running, so the base TTL can stay short.
"""
# Use a small TTL so the heartbeat interval triggers quickly.
monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 0.3)
lock = MagicMock()
lock.acquire.return_value = True
commands.redis_client.lock.return_value = lock
renewed = threading.Event()
def _reacquire():
renewed.set()
return True
lock.reacquire.side_effect = _reacquire
def _upgrade():
assert renewed.wait(HEARTBEAT_WAIT_TIMEOUT_SECONDS)
_install_fake_flask_migrate(monkeypatch, _upgrade)
exit_code = _invoke_upgrade_db()
_ = capsys.readouterr()
assert exit_code == 0
assert lock.reacquire.call_count >= 1
def test_upgrade_db_ignores_reacquire_errors(monkeypatch, capsys):
# Use a small TTL so heartbeat runs during the upgrade call.
monkeypatch.setattr(commands, "DB_UPGRADE_LOCK_TTL_SECONDS", 0.3)
lock = MagicMock()
lock.acquire.return_value = True
commands.redis_client.lock.return_value = lock
attempted = threading.Event()
def _reacquire():
attempted.set()
raise RedisError("simulated")
lock.reacquire.side_effect = _reacquire
def _upgrade():
assert attempted.wait(HEARTBEAT_WAIT_TIMEOUT_SECONDS)
_install_fake_flask_migrate(monkeypatch, _upgrade)
exit_code = _invoke_upgrade_db()
_ = capsys.readouterr()
assert exit_code == 0
assert lock.reacquire.call_count >= 1

View File

@@ -0,0 +1,125 @@
"""Unit tests for enterprise service integrations.
This module covers the enterprise-only default workspace auto-join behavior:
- Enterprise mode disabled: no external calls
- Successful join / skipped join: no errors
- Failures (network/invalid response/invalid UUID): soft-fail wrapper must not raise
"""
from unittest.mock import patch
import pytest
from services.enterprise.enterprise_service import (
DefaultWorkspaceJoinResult,
EnterpriseService,
try_join_default_workspace,
)
class TestJoinDefaultWorkspace:
def test_join_default_workspace_success(self):
account_id = "11111111-1111-1111-1111-111111111111"
response = {"workspace_id": "22222222-2222-2222-2222-222222222222", "joined": True, "message": "ok"}
with patch("services.enterprise.enterprise_service.EnterpriseRequest.send_request") as mock_send_request:
mock_send_request.return_value = response
result = EnterpriseService.join_default_workspace(account_id=account_id)
assert isinstance(result, DefaultWorkspaceJoinResult)
assert result.workspace_id == response["workspace_id"]
assert result.joined is True
assert result.message == "ok"
mock_send_request.assert_called_once_with(
"POST",
"/default-workspace/members",
json={"account_id": account_id},
)
def test_join_default_workspace_invalid_response_format_raises(self):
account_id = "11111111-1111-1111-1111-111111111111"
with patch("services.enterprise.enterprise_service.EnterpriseRequest.send_request") as mock_send_request:
mock_send_request.return_value = "not-a-dict"
with pytest.raises(ValueError, match="Invalid response format"):
EnterpriseService.join_default_workspace(account_id=account_id)
def test_join_default_workspace_invalid_account_id_raises(self):
with pytest.raises(ValueError):
EnterpriseService.join_default_workspace(account_id="not-a-uuid")
class TestTryJoinDefaultWorkspace:
def test_try_join_default_workspace_enterprise_disabled_noop(self):
with (
patch("services.enterprise.enterprise_service.dify_config") as mock_config,
patch("services.enterprise.enterprise_service.EnterpriseService.join_default_workspace") as mock_join,
):
mock_config.ENTERPRISE_ENABLED = False
try_join_default_workspace("11111111-1111-1111-1111-111111111111")
mock_join.assert_not_called()
def test_try_join_default_workspace_successful_join_does_not_raise(self):
account_id = "11111111-1111-1111-1111-111111111111"
with (
patch("services.enterprise.enterprise_service.dify_config") as mock_config,
patch("services.enterprise.enterprise_service.EnterpriseService.join_default_workspace") as mock_join,
):
mock_config.ENTERPRISE_ENABLED = True
mock_join.return_value = DefaultWorkspaceJoinResult(
workspace_id="22222222-2222-2222-2222-222222222222",
joined=True,
message="ok",
)
# Should not raise
try_join_default_workspace(account_id)
mock_join.assert_called_once_with(account_id=account_id)
def test_try_join_default_workspace_skipped_join_does_not_raise(self):
account_id = "11111111-1111-1111-1111-111111111111"
with (
patch("services.enterprise.enterprise_service.dify_config") as mock_config,
patch("services.enterprise.enterprise_service.EnterpriseService.join_default_workspace") as mock_join,
):
mock_config.ENTERPRISE_ENABLED = True
mock_join.return_value = DefaultWorkspaceJoinResult(
workspace_id="",
joined=False,
message="no default workspace configured",
)
# Should not raise
try_join_default_workspace(account_id)
mock_join.assert_called_once_with(account_id=account_id)
def test_try_join_default_workspace_api_failure_soft_fails(self):
account_id = "11111111-1111-1111-1111-111111111111"
with (
patch("services.enterprise.enterprise_service.dify_config") as mock_config,
patch("services.enterprise.enterprise_service.EnterpriseService.join_default_workspace") as mock_join,
):
mock_config.ENTERPRISE_ENABLED = True
mock_join.side_effect = Exception("network failure")
# Should not raise
try_join_default_workspace(account_id)
mock_join.assert_called_once_with(account_id=account_id)
def test_try_join_default_workspace_invalid_account_id_soft_fails(self):
with patch("services.enterprise.enterprise_service.dify_config") as mock_config:
mock_config.ENTERPRISE_ENABLED = True
# Should not raise even though UUID parsing fails inside join_default_workspace
try_join_default_workspace("not-a-uuid")