Compare commits

...

27 Commits

Author SHA1 Message Date
autofix-ci[bot]
0685e294c4 [autofix.ci] apply automated fixes 2026-02-04 05:34:58 +00:00
Yansong Zhang
f8cc056604 fix start_time -> update_time 2026-02-04 13:30:45 +08:00
autofix-ci[bot]
ef1e233c2d [autofix.ci] apply automated fixes 2026-02-04 05:27:16 +00:00
Yansong Zhang
6d87424ab8 fix start_time -> update_time 2026-02-04 13:18:47 +08:00
Yansong Zhang
aaa98c9550 Merge branch 'fix/api-token-lock' of github.com:langgenius/dify into fix/api-token-lock 2026-02-04 12:20:51 +08:00
Yansong Zhang
4719f2569c fix start_time -> update_time 2026-02-04 12:20:42 +08:00
autofix-ci[bot]
282ec583db [autofix.ci] apply automated fixes 2026-02-04 04:06:50 +00:00
Yansong Zhang
c7337d5b67 make it great agin 2026-02-04 12:02:43 +08:00
Yansong Zhang
e1efea16a4 make it great agin 2026-02-04 12:00:43 +08:00
autofix-ci[bot]
dcba86b707 [autofix.ci] apply automated fixes 2026-02-04 03:22:17 +00:00
Yansong Zhang
d02ed82854 Merge branch 'fix/api-token-lock' of github.com:langgenius/dify into fix/api-token-lock 2026-02-04 11:18:03 +08:00
Yansong Zhang
60e3a7b419 make it great agin 2026-02-04 11:17:37 +08:00
Yansong Zhang
240684e723 make it great agin 2026-02-04 11:17:30 +08:00
autofix-ci[bot]
edfd34bc90 [autofix.ci] apply automated fixes 2026-02-03 08:32:34 +00:00
Yansong Zhang
292a9ff487 fix linter 2026-02-03 16:28:28 +08:00
Yansong Zhang
132684898b fix linter 2026-02-03 16:26:52 +08:00
autofix-ci[bot]
138117526a [autofix.ci] apply automated fixes 2026-02-03 07:53:38 +00:00
Yansong Zhang
396834c808 fix linter 2026-02-03 15:48:55 +08:00
autofix-ci[bot]
657b3f5990 [autofix.ci] apply automated fixes 2026-02-03 07:29:15 +00:00
Yansong Zhang
ea5089aba7 fix linter 2026-02-03 15:25:15 +08:00
Yansong Zhang
d69e4de47b fix linter 2026-02-03 15:22:01 +08:00
zyssyz123
79ead90487 Update api/tasks/update_api_token_last_used_task.py
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2026-02-03 15:19:59 +08:00
zyssyz123
c7de79dcbf Update api/libs/api_token_cache.py
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2026-02-03 15:16:29 +08:00
Yansong Zhang
da9abcf885 Merge branch 'fix/api-token-lock' of github.com:langgenius/dify into fix/api-token-lock 2026-02-03 15:15:01 +08:00
Yansong Zhang
d54b08701e fix linter 2026-02-03 15:14:17 +08:00
autofix-ci[bot]
c2fdfdc504 [autofix.ci] apply automated fixes 2026-02-03 07:08:47 +00:00
Yansong Zhang
d58d3f5bde add redis for api token 2026-02-03 15:03:11 +08:00
12 changed files with 1306 additions and 26 deletions

View File

@@ -122,7 +122,7 @@ These commands assume you start from the repository root.
```bash
cd api
uv run celery -A app.celery worker -P threads -c 2 --loglevel INFO -Q dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention
uv run celery -A app.celery worker -P threads -c 2 --loglevel INFO -Q api_token_update,dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention
```
1. Optional: start Celery Beat (scheduled tasks, in a new terminal).

View File

@@ -6,6 +6,7 @@ from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden
from extensions.ext_database import db
from libs.api_token_cache import ApiTokenCache
from libs.helper import TimestampField
from libs.login import current_account_with_tenant, login_required
from models.dataset import Dataset
@@ -131,6 +132,11 @@ class BaseApiKeyResource(Resource):
if key is None:
flask_restx.abort(HTTPStatus.NOT_FOUND, message="API key not found")
# Invalidate cache before deleting from database
# Type assertion: key is guaranteed to be non-None here because abort() raises
assert key is not None # nosec - for type checker only
ApiTokenCache.delete(key.token, key.type)
db.session.query(ApiToken).where(ApiToken.id == api_key_id).delete()
db.session.commit()

View File

@@ -51,6 +51,7 @@ from fields.dataset_fields import (
weighted_score_fields,
)
from fields.document_fields import document_status_fields
from libs.api_token_cache import ApiTokenCache
from libs.login import current_account_with_tenant, login_required
from models import ApiToken, Dataset, Document, DocumentSegment, UploadFile
from models.dataset import DatasetPermissionEnum
@@ -820,6 +821,11 @@ class DatasetApiDeleteApi(Resource):
if key is None:
console_ns.abort(404, message="API key not found")
# Invalidate cache before deleting from database
# Type assertion: key is guaranteed to be non-None here because abort() raises
assert key is not None # nosec - for type checker only
ApiTokenCache.delete(key.token, key.type)
db.session.query(ApiToken).where(ApiToken.id == api_key_id).delete()
db.session.commit()

View File

@@ -1,7 +1,6 @@
import logging
import time
from collections.abc import Callable
from datetime import timedelta
from enum import StrEnum, auto
from functools import wraps
from typing import Concatenate, ParamSpec, TypeVar
@@ -10,13 +9,15 @@ from flask import current_app, request
from flask_login import user_logged_in
from flask_restx import Resource
from pydantic import BaseModel
from sqlalchemy import select, update
from sqlalchemy import select
from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden, NotFound, Unauthorized
from enums.cloud_plan import CloudPlan
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.api_token_cache import ApiTokenCache
from libs.api_token_updater import update_token_last_used_at
from libs.datetime_utils import naive_utc_now
from libs.login import current_user
from models import Account, Tenant, TenantAccountJoin, TenantStatus
@@ -296,7 +297,14 @@ def validate_dataset_token(view: Callable[Concatenate[T, P], R] | None = None):
def validate_and_get_api_token(scope: str | None = None):
"""
Validate and get API token.
Validate and get API token with Redis caching.
This function uses a two-tier approach:
1. First checks Redis cache for the token
2. If not cached, queries database and caches the result
The last_used_at field is updated asynchronously via Celery task
to avoid blocking the request.
"""
auth_header = request.headers.get("Authorization")
if auth_header is None or " " not in auth_header:
@@ -308,29 +316,103 @@ def validate_and_get_api_token(scope: str | None = None):
if auth_scheme != "bearer":
raise Unauthorized("Authorization scheme must be 'Bearer'")
current_time = naive_utc_now()
cutoff_time = current_time - timedelta(minutes=1)
with Session(db.engine, expire_on_commit=False) as session:
update_stmt = (
update(ApiToken)
.where(
ApiToken.token == auth_token,
(ApiToken.last_used_at.is_(None) | (ApiToken.last_used_at < cutoff_time)),
ApiToken.type == scope,
)
.values(last_used_at=current_time)
)
stmt = select(ApiToken).where(ApiToken.token == auth_token, ApiToken.type == scope)
result = session.execute(update_stmt)
api_token = session.scalar(stmt)
# Try to get token from cache first
# Returns a CachedApiToken (plain Python object), not a SQLAlchemy model
cached_token = ApiTokenCache.get(auth_token, scope)
if cached_token is not None:
logger.debug("Token validation served from cache for scope: %s", scope)
# Asynchronously update last_used_at (non-blocking)
_async_update_token_last_used_at(auth_token, scope)
return cached_token
if hasattr(result, "rowcount") and result.rowcount > 0:
session.commit()
# Cache miss - use Redis lock for single-flight mode
# This ensures only one request queries DB for the same token concurrently
logger.debug("Token cache miss, attempting to acquire query lock for scope: %s", scope)
if not api_token:
raise Unauthorized("Access token is invalid")
lock_key = f"api_token_query_lock:{scope}:{auth_token}"
lock = redis_client.lock(lock_key, timeout=10, blocking_timeout=5)
return api_token
try:
if lock.acquire(blocking=True):
try:
# Double-check cache after acquiring lock
# (another concurrent request might have already cached it)
cached_token = ApiTokenCache.get(auth_token, scope)
if cached_token is not None:
logger.debug("Token cached by concurrent request, using cached version")
return cached_token
# Still not cached - query database
with Session(db.engine, expire_on_commit=False) as session:
current_time = naive_utc_now()
update_token_last_used_at(auth_token, scope, current_time, session=session)
stmt = select(ApiToken).where(ApiToken.token == auth_token, ApiToken.type == scope)
api_token = session.scalar(stmt)
if not api_token:
ApiTokenCache.set(auth_token, scope, None)
raise Unauthorized("Access token is invalid")
ApiTokenCache.set(auth_token, scope, api_token)
return api_token
finally:
lock.release()
else:
# Lock acquisition timeout - fallback to direct query
logger.warning("Lock timeout for token: %s, proceeding with direct query", auth_token[:10])
with Session(db.engine, expire_on_commit=False) as session:
current_time = naive_utc_now()
update_token_last_used_at(auth_token, scope, current_time, session=session)
stmt = select(ApiToken).where(ApiToken.token == auth_token, ApiToken.type == scope)
api_token = session.scalar(stmt)
if not api_token:
ApiTokenCache.set(auth_token, scope, None)
raise Unauthorized("Access token is invalid")
ApiTokenCache.set(auth_token, scope, api_token)
return api_token
except Exception as e:
# Redis lock failure - fallback to direct query to ensure service availability
logger.warning("Redis lock failed for token query: %s, proceeding anyway", e)
with Session(db.engine, expire_on_commit=False) as session:
current_time = naive_utc_now()
update_token_last_used_at(auth_token, scope, current_time, session=session)
stmt = select(ApiToken).where(ApiToken.token == auth_token, ApiToken.type == scope)
api_token = session.scalar(stmt)
if not api_token:
ApiTokenCache.set(auth_token, scope, None)
raise Unauthorized("Access token is invalid")
ApiTokenCache.set(auth_token, scope, api_token)
return api_token
def _async_update_token_last_used_at(auth_token: str, scope: str | None):
"""
Asynchronously update the last_used_at timestamp for a token.
This schedules a Celery task to update the database without blocking
the current request. The update time is passed to ensure only older
records are updated, providing natural concurrency control.
"""
try:
from tasks.update_api_token_last_used_task import update_api_token_last_used_task
# Record the update time for concurrency control
update_time = naive_utc_now()
update_time_iso = update_time.isoformat()
# Fire and forget - don't wait for result
update_api_token_last_used_task.delay(auth_token, scope, update_time_iso)
logger.debug("Scheduled async update for last_used_at (scope: %s, update_time: %s)", scope, update_time_iso)
except Exception as e:
# Don't fail the request if task scheduling fails
logger.warning("Failed to schedule last_used_at update task: %s", e)
class DatasetApiResource(Resource):

View File

@@ -35,10 +35,10 @@ if [[ "${MODE}" == "worker" ]]; then
if [[ -z "${CELERY_QUEUES}" ]]; then
if [[ "${EDITION}" == "CLOUD" ]]; then
# Cloud edition: separate queues for dataset and trigger tasks
DEFAULT_QUEUES="dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention"
DEFAULT_QUEUES="api_token_update,dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention"
else
# Community edition (SELF_HOSTED): dataset, pipeline and workflow have separate queues
DEFAULT_QUEUES="dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention"
DEFAULT_QUEUES="api_token_update,dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention"
fi
else
DEFAULT_QUEUES="${CELERY_QUEUES}"

View File

@@ -104,6 +104,7 @@ def init_app(app: DifyApp) -> Celery:
"tasks.trigger_processing_tasks", # async trigger processing
"tasks.generate_summary_index_task", # summary index generation
"tasks.regenerate_summary_index_task", # summary index regeneration
"tasks.update_api_token_last_used_task", # async API token last_used_at update
]
day = dify_config.CELERY_BEAT_SCHEDULER_TIME

279
api/libs/api_token_cache.py Normal file
View File

@@ -0,0 +1,279 @@
"""
API Token Cache Module
Provides Redis-based caching for API token validation to reduce database load.
"""
import logging
from datetime import datetime
from typing import Any
import orjson
from pydantic import BaseModel
from extensions.ext_redis import redis_client, redis_fallback
logger = logging.getLogger(__name__)
class CachedApiToken(BaseModel):
"""
Pydantic model for cached API token data.
This is NOT a SQLAlchemy model instance, but a plain Pydantic model
that mimics the ApiToken model interface for read-only access.
Using Pydantic provides:
- Automatic type validation
- Better IDE support
- Built-in serialization/deserialization
"""
id: str
app_id: str | None
tenant_id: str | None
type: str
token: str
last_used_at: datetime | None
created_at: datetime | None
def __repr__(self) -> str:
return f"<CachedApiToken id={self.id} type={self.type}>"
# Cache configuration
CACHE_KEY_PREFIX = "api_token"
CACHE_TTL_SECONDS = 600 # 10 minutes
CACHE_NULL_TTL_SECONDS = 60 # 1 minute for non-existent tokens
class ApiTokenCache:
"""
Redis cache wrapper for API tokens.
Handles serialization, deserialization, and cache invalidation.
"""
@staticmethod
def _make_cache_key(token: str, scope: str | None = None) -> str:
"""
Generate cache key for the given token and scope.
Args:
token: The API token string
scope: The token type/scope (e.g., 'app', 'dataset')
Returns:
Cache key string
"""
scope_str = scope or "any"
return f"{CACHE_KEY_PREFIX}:{scope_str}:{token}"
@staticmethod
def _serialize_token(api_token: Any) -> bytes:
"""
Serialize ApiToken object to JSON bytes using orjson for better performance.
Args:
api_token: ApiToken model instance or CachedApiToken
Returns:
JSON bytes representation
"""
# If it's already a Pydantic model, use model_dump
if isinstance(api_token, CachedApiToken):
# Pydantic model -> dict -> orjson
return orjson.dumps(api_token.model_dump(mode="json"))
# Otherwise, convert from SQLAlchemy model
data = {
"id": str(api_token.id),
"app_id": str(api_token.app_id) if api_token.app_id else None,
"tenant_id": str(api_token.tenant_id) if api_token.tenant_id else None,
"type": api_token.type,
"token": api_token.token,
"last_used_at": api_token.last_used_at.isoformat() if api_token.last_used_at else None,
"created_at": api_token.created_at.isoformat() if api_token.created_at else None,
}
return orjson.dumps(data)
@staticmethod
def _deserialize_token(cached_data: bytes | str) -> Any:
"""
Deserialize JSON bytes/string back to a CachedApiToken Pydantic model using orjson.
Args:
cached_data: JSON bytes or string from cache
Returns:
CachedApiToken instance or None
"""
if cached_data in {b"null", "null"}:
# Cached null value (token doesn't exist)
return None
try:
# orjson.loads accepts bytes or str
data = orjson.loads(cached_data)
# Use Pydantic's model_validate for automatic validation
token_obj = CachedApiToken.model_validate(data)
return token_obj
except (ValueError, orjson.JSONDecodeError) as e:
logger.warning("Failed to deserialize token from cache: %s", e)
return None
@staticmethod
@redis_fallback(default_return=None)
def get(token: str, scope: str | None) -> Any | None:
"""
Get API token from cache.
Args:
token: The API token string
scope: The token type/scope
Returns:
CachedApiToken instance if found in cache, None if not cached or cache miss
"""
cache_key = ApiTokenCache._make_cache_key(token, scope)
cached_data = redis_client.get(cache_key)
if cached_data is None:
logger.debug("Cache miss for token key: %s", cache_key)
return None
# orjson.loads handles both bytes and str automatically
logger.debug("Cache hit for token key: %s", cache_key)
return ApiTokenCache._deserialize_token(cached_data)
@staticmethod
def _add_to_tenant_index(tenant_id: str | None, cache_key: str) -> None:
"""
Add cache key to tenant index for efficient invalidation.
Maintains a Redis SET: tenant_tokens:{tenant_id} containing all cache keys
for that tenant. This allows O(1) tenant-wide invalidation.
Args:
tenant_id: The tenant ID
cache_key: The cache key to add to the index
"""
if not tenant_id:
return
try:
index_key = f"tenant_tokens:{tenant_id}"
redis_client.sadd(index_key, cache_key)
# Set TTL on the index itself (slightly longer than cache TTL)
redis_client.expire(index_key, CACHE_TTL_SECONDS + 60)
except Exception as e:
# Don't fail if index update fails
logger.warning("Failed to update tenant index: %s", e)
@staticmethod
def _remove_from_tenant_index(tenant_id: str | None, cache_key: str) -> None:
"""
Remove cache key from tenant index.
Args:
tenant_id: The tenant ID
cache_key: The cache key to remove from the index
"""
if not tenant_id:
return
try:
index_key = f"tenant_tokens:{tenant_id}"
redis_client.srem(index_key, cache_key)
except Exception as e:
# Don't fail if index update fails
logger.warning("Failed to remove from tenant index: %s", e)
@staticmethod
@redis_fallback(default_return=False)
def set(token: str, scope: str | None, api_token: Any | None, ttl: int = CACHE_TTL_SECONDS) -> bool:
"""
Set API token in cache.
Args:
token: The API token string
scope: The token type/scope
api_token: ApiToken instance to cache (None for non-existent tokens)
ttl: Time to live in seconds
Returns:
True if successful, False otherwise
"""
cache_key = ApiTokenCache._make_cache_key(token, scope)
if api_token is None:
# Cache null value to prevent cache penetration
cached_value = b"null"
ttl = CACHE_NULL_TTL_SECONDS
else:
cached_value = ApiTokenCache._serialize_token(api_token)
try:
redis_client.setex(cache_key, ttl, cached_value)
# Add to tenant index for efficient tenant-wide invalidation
if api_token is not None and hasattr(api_token, "tenant_id"):
ApiTokenCache._add_to_tenant_index(api_token.tenant_id, cache_key)
logger.debug("Cached token with key: %s, ttl: %ss", cache_key, ttl)
return True
except Exception as e:
logger.warning("Failed to cache token: %s", e)
return False
@staticmethod
@redis_fallback(default_return=False)
def delete(token: str, scope: str | None = None) -> bool:
"""
Delete API token from cache.
Args:
token: The API token string
scope: The token type/scope (None to delete all scopes)
Returns:
True if successful, False otherwise
"""
if scope is None:
# Delete all possible scopes for this token
# This is a safer approach when scope is unknown
pattern = f"{CACHE_KEY_PREFIX}:*:{token}"
try:
keys_to_delete = list(redis_client.scan_iter(match=pattern))
if keys_to_delete:
redis_client.delete(*keys_to_delete)
logger.info("Deleted %d cache entries for token", len(keys_to_delete))
return True
except Exception as e:
logger.warning("Failed to delete token cache with pattern: %s", e)
return False
else:
cache_key = ApiTokenCache._make_cache_key(token, scope)
try:
# Try to get tenant_id before deleting (for index cleanup)
tenant_id = None
try:
cached_data = redis_client.get(cache_key)
if cached_data and cached_data != b"null":
data = orjson.loads(cached_data)
tenant_id = data.get("tenant_id")
except Exception as e:
# If we can't get tenant_id, just delete the key without index cleanup
logger.debug("Failed to get tenant_id for cache cleanup: %s", e)
# Delete the cache key
redis_client.delete(cache_key)
# Remove from tenant index
if tenant_id:
ApiTokenCache._remove_from_tenant_index(tenant_id, cache_key)
logger.info("Deleted cache for key: %s", cache_key)
return True
except Exception as e:
logger.warning("Failed to delete token cache: %s", e)
return False

View File

@@ -0,0 +1,76 @@
"""
Unified API Token update utilities.
This module provides a centralized method for updating API token last_used_at
to avoid code duplication between sync and async update paths.
"""
import logging
from datetime import datetime
from sqlalchemy import update
from sqlalchemy.orm import Session
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.model import ApiToken
logger = logging.getLogger(__name__)
def update_token_last_used_at(
token: str, scope: str | None, update_time: datetime, session: Session | None = None
) -> dict:
"""
Unified method to update API token last_used_at timestamp.
This method is used by both:
1. Direct database update (cache miss scenario)
2. Async Celery task (cache hit scenario)
Args:
token: The API token string
scope: The token type/scope (e.g., 'app', 'dataset')
update_time: The time to use for the update (for concurrency control)
session: Optional existing session to use (if None, creates new one)
Returns:
Dict with status, rowcount, and other metadata
"""
current_time = naive_utc_now()
def _do_update(s: Session) -> dict:
"""Execute the update within the session."""
update_stmt = (
update(ApiToken)
.where(
ApiToken.token == token,
ApiToken.type == scope,
# Only update if last_used_at is older than update_time
(ApiToken.last_used_at.is_(None) | (ApiToken.last_used_at < update_time)),
)
.values(last_used_at=current_time)
)
result = s.execute(update_stmt)
rowcount = getattr(result, "rowcount", 0)
if rowcount > 0:
s.commit()
logger.debug("Updated last_used_at for token: %s... (scope: %s)", token[:10], scope)
return {"status": "updated", "rowcount": rowcount}
else:
logger.debug("No update needed for token: %s... (already up-to-date)", token[:10])
return {"status": "no_update_needed", "reason": "last_used_at >= update_time"}
try:
if session:
# Use provided session (sync path)
return _do_update(session)
else:
# Create new session (async path)
with Session(db.engine, expire_on_commit=False) as new_session:
return _do_update(new_session)
except Exception as e:
logger.warning("Failed to update last_used_at for token: %s", e)
return {"status": "failed", "error": str(e)}

View File

@@ -14,6 +14,7 @@ from sqlalchemy.orm import sessionmaker
from configs import dify_config
from core.db.session_factory import session_factory
from extensions.ext_database import db
from libs.api_token_cache import ApiTokenCache
from libs.archive_storage import ArchiveStorageNotConfiguredError, get_archive_storage
from models import (
ApiToken,
@@ -134,6 +135,12 @@ def _delete_app_mcp_servers(tenant_id: str, app_id: str):
def _delete_app_api_tokens(tenant_id: str, app_id: str):
def del_api_token(session, api_token_id: str):
# Fetch token details for cache invalidation
token_obj = session.query(ApiToken).where(ApiToken.id == api_token_id).first()
if token_obj:
# Invalidate cache before deletion
ApiTokenCache.delete(token_obj.token, token_obj.type)
session.query(ApiToken).where(ApiToken.id == api_token_id).delete(synchronize_session=False)
_delete_records(

View File

@@ -0,0 +1,59 @@
"""
Celery task for updating API token last_used_at timestamp asynchronously.
"""
import logging
from datetime import datetime
from celery import shared_task
from libs.api_token_updater import update_token_last_used_at
logger = logging.getLogger(__name__)
@shared_task(queue="api_token_update", bind=True)
def update_api_token_last_used_task(self, token: str, scope: str | None, update_time_iso: str):
"""
Asynchronously update the last_used_at timestamp for an API token.
Uses the unified update_token_last_used_at() method to avoid code duplication.
Queue: api_token_update (dedicated queue to isolate from other tasks and
prevent accumulation in production environment)
Args:
token: The API token string
scope: The token type/scope (e.g., 'app', 'dataset')
update_time_iso: ISO format timestamp for the update operation
Returns:
Dict with status and metadata
Raises:
Exception: Re-raises exceptions to allow Celery retry mechanism and monitoring
"""
try:
# Parse update_time from ISO format
update_time = datetime.fromisoformat(update_time_iso)
# Use unified update method
result = update_token_last_used_at(token, scope, update_time, session=None)
if result["status"] == "updated":
logger.info("Updated last_used_at for token (async): %s... (scope: %s)", token[:10], scope)
elif result["status"] == "failed":
# If update failed, log and raise for retry
error_msg = result.get("error", "Unknown error")
logger.error("Failed to update last_used_at for token (async): %s", error_msg)
raise Exception(f"Token update failed: {error_msg}")
return result
except Exception:
# Log the error with full context (logger.exception includes traceback automatically)
logger.exception("Error in update_api_token_last_used_task (token: %s..., scope: %s)", token[:10], scope)
# Raise exception to let Celery handle retry and monitoring
# This allows Flower and other monitoring tools to track failures
raise

View File

@@ -0,0 +1,508 @@
"""
Integration tests for API Token Cache with Redis and Celery.
These tests require:
- Redis server running
- Test database configured
- Celery worker running (for full integration test)
"""
import time
from datetime import datetime, timedelta
from unittest.mock import patch
import pytest
from extensions.ext_redis import redis_client
from libs.api_token_cache import ApiTokenCache, CachedApiToken
from libs.api_token_updater import update_token_last_used_at
from models.model import ApiToken
class TestApiTokenCacheRedisIntegration:
"""Integration tests with real Redis."""
def setup_method(self):
"""Setup test fixtures and clean Redis."""
self.test_token = "test-integration-token-123"
self.test_scope = "app"
self.cache_key = f"api_token:{self.test_scope}:{self.test_token}"
# Clean up any existing test data
self._cleanup()
def teardown_method(self):
"""Cleanup test data from Redis."""
self._cleanup()
def _cleanup(self):
"""Remove test data from Redis."""
try:
# Delete test cache key
redis_client.delete(self.cache_key)
# Delete any test tenant index
redis_client.delete("tenant_tokens:test-tenant-id")
# Delete any test locks
redis_client.delete(f"api_token_last_used_lock:{self.test_scope}:{self.test_token}")
except Exception:
pass # Ignore cleanup errors
def test_cache_set_and_get_with_real_redis(self):
"""Test cache set and get operations with real Redis."""
# Create a mock token
from unittest.mock import MagicMock
mock_token = MagicMock()
mock_token.id = "test-id-123"
mock_token.app_id = "test-app-456"
mock_token.tenant_id = "test-tenant-789"
mock_token.type = "app"
mock_token.token = self.test_token
mock_token.last_used_at = datetime.now()
mock_token.created_at = datetime.now() - timedelta(days=30)
# Set in cache
result = ApiTokenCache.set(self.test_token, self.test_scope, mock_token)
assert result is True
# Verify in Redis
cached_data = redis_client.get(self.cache_key)
assert cached_data is not None
# Get from cache
cached_token = ApiTokenCache.get(self.test_token, self.test_scope)
assert cached_token is not None
assert isinstance(cached_token, CachedApiToken)
assert cached_token.id == "test-id-123"
assert cached_token.app_id == "test-app-456"
assert cached_token.tenant_id == "test-tenant-789"
assert cached_token.type == "app"
assert cached_token.token == self.test_token
def test_cache_ttl_with_real_redis(self):
"""Test cache TTL is set correctly."""
from unittest.mock import MagicMock
mock_token = MagicMock()
mock_token.id = "test-id"
mock_token.app_id = "test-app"
mock_token.tenant_id = "test-tenant"
mock_token.type = "app"
mock_token.token = self.test_token
mock_token.last_used_at = None
mock_token.created_at = datetime.now()
# Set in cache
ApiTokenCache.set(self.test_token, self.test_scope, mock_token)
# Check TTL
ttl = redis_client.ttl(self.cache_key)
assert 595 <= ttl <= 600 # Should be around 600 seconds (10 minutes)
def test_cache_null_value_for_invalid_token(self):
"""Test caching null value for invalid tokens"""
# Cache null value
result = ApiTokenCache.set(self.test_token, self.test_scope, None)
assert result is True
# Verify in Redis
cached_data = redis_client.get(self.cache_key)
assert cached_data == b"null"
# Get from cache should return None
cached_token = ApiTokenCache.get(self.test_token, self.test_scope)
assert cached_token is None
# Check TTL is shorter for null values
ttl = redis_client.ttl(self.cache_key)
assert 55 <= ttl <= 60 # Should be around 60 seconds
def test_cache_delete_with_real_redis(self):
"""Test cache deletion with real Redis."""
from unittest.mock import MagicMock
mock_token = MagicMock()
mock_token.id = "test-id"
mock_token.app_id = "test-app"
mock_token.tenant_id = "test-tenant"
mock_token.type = "app"
mock_token.token = self.test_token
mock_token.last_used_at = None
mock_token.created_at = datetime.now()
# Set in cache
ApiTokenCache.set(self.test_token, self.test_scope, mock_token)
assert redis_client.exists(self.cache_key) == 1
# Delete from cache
result = ApiTokenCache.delete(self.test_token, self.test_scope)
assert result is True
# Verify deleted
assert redis_client.exists(self.cache_key) == 0
def test_tenant_index_creation(self):
"""Test tenant index is created when caching token."""
from unittest.mock import MagicMock
tenant_id = "test-tenant-id"
mock_token = MagicMock()
mock_token.id = "test-id"
mock_token.app_id = "test-app"
mock_token.tenant_id = tenant_id
mock_token.type = "app"
mock_token.token = self.test_token
mock_token.last_used_at = None
mock_token.created_at = datetime.now()
# Set in cache
ApiTokenCache.set(self.test_token, self.test_scope, mock_token)
# Verify tenant index exists
index_key = f"tenant_tokens:{tenant_id}"
assert redis_client.exists(index_key) == 1
# Verify cache key is in the index
members = redis_client.smembers(index_key)
cache_keys = [m.decode("utf-8") if isinstance(m, bytes) else m for m in members]
assert self.cache_key in cache_keys
def test_invalidate_by_tenant_via_index(self):
"""Test tenant-wide cache invalidation using index (fast path)."""
from unittest.mock import MagicMock
tenant_id = "test-tenant-id"
# Create multiple tokens for the same tenant
for i in range(3):
token_value = f"test-token-{i}"
mock_token = MagicMock()
mock_token.id = f"test-id-{i}"
mock_token.app_id = "test-app"
mock_token.tenant_id = tenant_id
mock_token.type = "app"
mock_token.token = token_value
mock_token.last_used_at = None
mock_token.created_at = datetime.now()
ApiTokenCache.set(token_value, "app", mock_token)
# Verify all cached
for i in range(3):
key = f"api_token:app:test-token-{i}"
assert redis_client.exists(key) == 1
# Invalidate by tenant
result = ApiTokenCache.invalidate_by_tenant(tenant_id)
assert result is True
# Verify all deleted
for i in range(3):
key = f"api_token:app:test-token-{i}"
assert redis_client.exists(key) == 0
# Verify index also deleted
assert redis_client.exists(f"tenant_tokens:{tenant_id}") == 0
def test_concurrent_cache_access(self):
"""Test concurrent cache access doesn't cause issues."""
import concurrent.futures
from unittest.mock import MagicMock
mock_token = MagicMock()
mock_token.id = "test-id"
mock_token.app_id = "test-app"
mock_token.tenant_id = "test-tenant"
mock_token.type = "app"
mock_token.token = self.test_token
mock_token.last_used_at = None
mock_token.created_at = datetime.now()
# Set once
ApiTokenCache.set(self.test_token, self.test_scope, mock_token)
# Concurrent reads
def get_from_cache():
return ApiTokenCache.get(self.test_token, self.test_scope)
# Execute 50 concurrent reads
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(get_from_cache) for _ in range(50)]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
# All should succeed
assert len(results) == 50
assert all(r is not None for r in results)
assert all(isinstance(r, CachedApiToken) for r in results)
class TestApiTokenUpdaterIntegration:
"""Integration tests for unified token updater."""
@pytest.mark.usefixtures("db_session")
def test_update_token_last_used_at_with_session(self, db_session):
"""Test unified update method with provided session."""
# Create a test token in database
test_token = ApiToken()
test_token.id = "test-updater-id"
test_token.token = "test-updater-token"
test_token.type = "app"
test_token.app_id = "test-app"
test_token.tenant_id = "test-tenant"
test_token.last_used_at = datetime.now() - timedelta(minutes=10)
test_token.created_at = datetime.now() - timedelta(days=30)
db_session.add(test_token)
db_session.commit()
try:
# Update using unified method
start_time = datetime.now()
result = update_token_last_used_at(test_token.token, test_token.type, start_time, session=db_session)
# Verify result
assert result["status"] == "updated"
assert result["rowcount"] == 1
# Verify in database
db_session.refresh(test_token)
assert test_token.last_used_at >= start_time
finally:
# Cleanup
db_session.delete(test_token)
db_session.commit()
@pytest.mark.celery_integration
class TestCeleryTaskIntegration:
"""
Integration tests for Celery task.
Requires Celery worker running with api_token_update queue.
Run with: pytest -m celery_integration
"""
@pytest.mark.usefixtures("db_session")
def test_celery_task_execution(self, db_session):
"""Test Celery task can be executed successfully."""
from tasks.update_api_token_last_used_task import update_api_token_last_used_task
# Create a test token in database
test_token = ApiToken()
test_token.id = "test-celery-id"
test_token.token = "test-celery-token"
test_token.type = "app"
test_token.app_id = "test-app"
test_token.tenant_id = "test-tenant"
test_token.last_used_at = datetime.now() - timedelta(minutes=10)
test_token.created_at = datetime.now() - timedelta(days=30)
db_session.add(test_token)
db_session.commit()
try:
# Send task
start_time_iso = datetime.now().isoformat()
result = update_api_token_last_used_task.delay(test_token.token, test_token.type, start_time_iso)
# Wait for task to complete (with timeout)
task_result = result.get(timeout=10)
# Verify task executed
assert task_result["status"] in ["updated", "no_update_needed"]
# Verify in database
db_session.refresh(test_token)
# last_used_at should be updated or already recent
assert test_token.last_used_at is not None
finally:
# Cleanup
db_session.delete(test_token)
db_session.commit()
@pytest.mark.usefixtures("db_session")
def test_concurrent_celery_tasks_with_redis_lock(self, db_session):
"""Test multiple Celery tasks with Redis lock (防抖)."""
from tasks.update_api_token_last_used_task import update_api_token_last_used_task
# Create a test token
test_token = ApiToken()
test_token.id = "test-concurrent-id"
test_token.token = "test-concurrent-token"
test_token.type = "app"
test_token.app_id = "test-app"
test_token.tenant_id = "test-tenant"
test_token.last_used_at = datetime.now() - timedelta(minutes=10)
test_token.created_at = datetime.now() - timedelta(days=30)
db_session.add(test_token)
db_session.commit()
try:
# Send 10 tasks concurrently
start_time_iso = datetime.now().isoformat()
tasks = []
for _ in range(10):
result = update_api_token_last_used_task.delay(test_token.token, test_token.type, start_time_iso)
tasks.append(result)
# Wait for all tasks
results = [task.get(timeout=15) for task in tasks]
# Count how many actually updated
updated_count = sum(1 for r in results if r["status"] == "updated")
skipped_count = sum(1 for r in results if r["status"] == "skipped")
# Due to Redis lock, most should be skipped
assert skipped_count >= 8 # At least 8 out of 10 should be skipped
assert updated_count <= 2 # At most 2 should actually update
finally:
# Cleanup
db_session.delete(test_token)
db_session.commit()
class TestEndToEndCacheFlow:
"""End-to-end integration test for complete cache flow."""
@pytest.mark.usefixtures("db_session")
def test_complete_flow_cache_miss_then_hit(self, db_session):
"""
Test complete flow:
1. First request (cache miss) -> query DB -> cache result
2. Second request (cache hit) -> return from cache
3. Verify Redis state
"""
test_token_value = "test-e2e-token"
test_scope = "app"
# Create test token in DB
test_token = ApiToken()
test_token.id = "test-e2e-id"
test_token.token = test_token_value
test_token.type = test_scope
test_token.app_id = "test-app"
test_token.tenant_id = "test-tenant"
test_token.last_used_at = None
test_token.created_at = datetime.now()
db_session.add(test_token)
db_session.commit()
try:
# Step 1: Cache miss - set token in cache
ApiTokenCache.set(test_token_value, test_scope, test_token)
# Verify cached
cache_key = f"api_token:{test_scope}:{test_token_value}"
assert redis_client.exists(cache_key) == 1
# Step 2: Cache hit - get from cache
cached_token = ApiTokenCache.get(test_token_value, test_scope)
assert cached_token is not None
assert cached_token.id == test_token.id
assert cached_token.token == test_token_value
# Step 3: Verify tenant index
index_key = f"tenant_tokens:{test_token.tenant_id}"
assert redis_client.exists(index_key) == 1
assert cache_key.encode() in redis_client.smembers(index_key)
# Step 4: Delete and verify cleanup
ApiTokenCache.delete(test_token_value, test_scope)
assert redis_client.exists(cache_key) == 0
# Index should be cleaned up
assert cache_key.encode() not in redis_client.smembers(index_key)
finally:
# Cleanup
db_session.delete(test_token)
db_session.commit()
redis_client.delete(f"api_token:{test_scope}:{test_token_value}")
redis_client.delete(f"tenant_tokens:{test_token.tenant_id}")
def test_high_concurrency_simulation(self):
"""Simulate high concurrency access to cache."""
import concurrent.futures
from unittest.mock import MagicMock
test_token_value = "test-concurrent-token"
test_scope = "app"
# Setup cache
mock_token = MagicMock()
mock_token.id = "concurrent-id"
mock_token.app_id = "test-app"
mock_token.tenant_id = "test-tenant"
mock_token.type = test_scope
mock_token.token = test_token_value
mock_token.last_used_at = datetime.now()
mock_token.created_at = datetime.now()
ApiTokenCache.set(test_token_value, test_scope, mock_token)
try:
# Simulate 100 concurrent cache reads
def read_cache():
return ApiTokenCache.get(test_token_value, test_scope)
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = [executor.submit(read_cache) for _ in range(100)]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
elapsed = time.time() - start_time
# All should succeed
assert len(results) == 100
assert all(r is not None for r in results)
# Should be fast (< 1 second for 100 reads)
assert elapsed < 1.0, f"Too slow: {elapsed}s for 100 cache reads"
print(f"\n✓ 100 concurrent cache reads in {elapsed:.3f}s")
print(f"✓ Average: {(elapsed / 100) * 1000:.2f}ms per read")
finally:
# Cleanup
ApiTokenCache.delete(test_token_value, test_scope)
redis_client.delete(f"tenant_tokens:{mock_token.tenant_id}")
class TestRedisFailover:
"""Test behavior when Redis is unavailable."""
@patch("libs.api_token_cache.redis_client")
def test_graceful_degradation_when_redis_fails(self, mock_redis):
"""Test system degrades gracefully when Redis is unavailable."""
from redis import RedisError
# Simulate Redis failure
mock_redis.get.side_effect = RedisError("Connection failed")
mock_redis.setex.side_effect = RedisError("Connection failed")
# Cache operations should not raise exceptions
result_get = ApiTokenCache.get("test-token", "app")
assert result_get is None # Returns None (fallback)
result_set = ApiTokenCache.set("test-token", "app", None)
assert result_set is False # Returns False (fallback)
# Application should continue working (using database directly)
if __name__ == "__main__":
# Run integration tests
pytest.main(
[
__file__,
"-v",
"-s",
"--tb=short",
"-m",
"not celery_integration", # Skip Celery tests by default
]
)

View File

@@ -0,0 +1,256 @@
"""
Unit tests for API Token Cache module.
"""
import json
from datetime import datetime
from unittest.mock import MagicMock, patch
import pytest
from libs.api_token_cache import (
CACHE_KEY_PREFIX,
CACHE_NULL_TTL_SECONDS,
CACHE_TTL_SECONDS,
ApiTokenCache,
CachedApiToken,
)
class TestApiTokenCache:
"""Test cases for ApiTokenCache class."""
def setup_method(self):
"""Setup test fixtures."""
self.mock_token = MagicMock()
self.mock_token.id = "test-token-id-123"
self.mock_token.app_id = "test-app-id-456"
self.mock_token.tenant_id = "test-tenant-id-789"
self.mock_token.type = "app"
self.mock_token.token = "test-token-value-abc"
self.mock_token.last_used_at = datetime(2026, 2, 3, 10, 0, 0)
self.mock_token.created_at = datetime(2026, 1, 1, 0, 0, 0)
def test_make_cache_key(self):
"""Test cache key generation."""
# Test with scope
key = ApiTokenCache._make_cache_key("my-token", "app")
assert key == f"{CACHE_KEY_PREFIX}:app:my-token"
# Test without scope
key = ApiTokenCache._make_cache_key("my-token", None)
assert key == f"{CACHE_KEY_PREFIX}:any:my-token"
def test_serialize_token(self):
"""Test token serialization."""
serialized = ApiTokenCache._serialize_token(self.mock_token)
data = json.loads(serialized)
assert data["id"] == "test-token-id-123"
assert data["app_id"] == "test-app-id-456"
assert data["tenant_id"] == "test-tenant-id-789"
assert data["type"] == "app"
assert data["token"] == "test-token-value-abc"
assert data["last_used_at"] == "2026-02-03T10:00:00"
assert data["created_at"] == "2026-01-01T00:00:00"
def test_serialize_token_with_nulls(self):
"""Test token serialization with None values."""
mock_token = MagicMock()
mock_token.id = "test-id"
mock_token.app_id = None
mock_token.tenant_id = None
mock_token.type = "dataset"
mock_token.token = "test-token"
mock_token.last_used_at = None
mock_token.created_at = datetime(2026, 1, 1, 0, 0, 0)
serialized = ApiTokenCache._serialize_token(mock_token)
data = json.loads(serialized)
assert data["app_id"] is None
assert data["tenant_id"] is None
assert data["last_used_at"] is None
def test_deserialize_token(self):
"""Test token deserialization."""
cached_data = json.dumps(
{
"id": "test-id",
"app_id": "test-app",
"tenant_id": "test-tenant",
"type": "app",
"token": "test-token",
"last_used_at": "2026-02-03T10:00:00",
"created_at": "2026-01-01T00:00:00",
}
)
result = ApiTokenCache._deserialize_token(cached_data)
assert isinstance(result, CachedApiToken)
assert result.id == "test-id"
assert result.app_id == "test-app"
assert result.tenant_id == "test-tenant"
assert result.type == "app"
assert result.token == "test-token"
assert result.last_used_at == datetime(2026, 2, 3, 10, 0, 0)
assert result.created_at == datetime(2026, 1, 1, 0, 0, 0)
def test_deserialize_null_token(self):
"""Test deserialization of null token (cached miss)."""
result = ApiTokenCache._deserialize_token("null")
assert result is None
def test_deserialize_invalid_json(self):
"""Test deserialization with invalid JSON."""
result = ApiTokenCache._deserialize_token("invalid-json{")
assert result is None
@patch("libs.api_token_cache.redis_client")
def test_get_cache_hit(self, mock_redis):
"""Test cache hit scenario."""
cached_data = json.dumps(
{
"id": "test-id",
"app_id": "test-app",
"tenant_id": "test-tenant",
"type": "app",
"token": "test-token",
"last_used_at": "2026-02-03T10:00:00",
"created_at": "2026-01-01T00:00:00",
}
)
mock_redis.get.return_value = cached_data.encode("utf-8")
result = ApiTokenCache.get("test-token", "app")
assert result is not None
assert isinstance(result, CachedApiToken)
assert result.app_id == "test-app"
mock_redis.get.assert_called_once_with(f"{CACHE_KEY_PREFIX}:app:test-token")
@patch("libs.api_token_cache.redis_client")
def test_get_cache_miss(self, mock_redis):
"""Test cache miss scenario."""
mock_redis.get.return_value = None
result = ApiTokenCache.get("test-token", "app")
assert result is None
mock_redis.get.assert_called_once()
@patch("libs.api_token_cache.redis_client")
def test_set_valid_token(self, mock_redis):
"""Test setting a valid token in cache."""
result = ApiTokenCache.set("test-token", "app", self.mock_token)
assert result is True
mock_redis.setex.assert_called_once()
args = mock_redis.setex.call_args[0]
assert args[0] == f"{CACHE_KEY_PREFIX}:app:test-token"
assert args[1] == CACHE_TTL_SECONDS
@patch("libs.api_token_cache.redis_client")
def test_set_null_token(self, mock_redis):
"""Test setting a null token (cache penetration prevention)."""
result = ApiTokenCache.set("invalid-token", "app", None)
assert result is True
mock_redis.setex.assert_called_once()
args = mock_redis.setex.call_args[0]
assert args[0] == f"{CACHE_KEY_PREFIX}:app:invalid-token"
assert args[1] == CACHE_NULL_TTL_SECONDS
assert args[2] == "null"
@patch("libs.api_token_cache.redis_client")
def test_delete_with_scope(self, mock_redis):
"""Test deleting token cache with specific scope."""
result = ApiTokenCache.delete("test-token", "app")
assert result is True
mock_redis.delete.assert_called_once_with(f"{CACHE_KEY_PREFIX}:app:test-token")
@patch("libs.api_token_cache.redis_client")
def test_delete_without_scope(self, mock_redis):
"""Test deleting token cache without scope (delete all)."""
# Mock scan_iter to return an iterator of keys
mock_redis.scan_iter.return_value = iter(
[
b"api_token:app:test-token",
b"api_token:dataset:test-token",
]
)
result = ApiTokenCache.delete("test-token", None)
assert result is True
# Verify scan_iter was called with the correct pattern
mock_redis.scan_iter.assert_called_once()
call_args = mock_redis.scan_iter.call_args
assert call_args[1]["match"] == f"{CACHE_KEY_PREFIX}:*:test-token"
# Verify delete was called with all matched keys
mock_redis.delete.assert_called_once_with(
b"api_token:app:test-token",
b"api_token:dataset:test-token",
)
@patch("libs.api_token_cache.redis_client")
def test_redis_fallback_on_exception(self, mock_redis):
"""Test Redis fallback when Redis is unavailable."""
from redis import RedisError
mock_redis.get.side_effect = RedisError("Connection failed")
result = ApiTokenCache.get("test-token", "app")
# Should return None (fallback) instead of raising exception
assert result is None
class TestApiTokenCacheIntegration:
"""Integration test scenarios."""
@patch("libs.api_token_cache.redis_client")
def test_full_cache_lifecycle(self, mock_redis):
"""Test complete cache lifecycle: set -> get -> delete."""
# Setup mock token
mock_token = MagicMock()
mock_token.id = "id-123"
mock_token.app_id = "app-456"
mock_token.tenant_id = "tenant-789"
mock_token.type = "app"
mock_token.token = "token-abc"
mock_token.last_used_at = datetime(2026, 2, 3, 10, 0, 0)
mock_token.created_at = datetime(2026, 1, 1, 0, 0, 0)
# 1. Set token in cache
ApiTokenCache.set("token-abc", "app", mock_token)
assert mock_redis.setex.called
# 2. Simulate cache hit
cached_data = ApiTokenCache._serialize_token(mock_token)
mock_redis.get.return_value = cached_data.encode("utf-8")
retrieved = ApiTokenCache.get("token-abc", "app")
assert retrieved is not None
assert isinstance(retrieved, CachedApiToken)
# 3. Delete from cache
ApiTokenCache.delete("token-abc", "app")
assert mock_redis.delete.called
@patch("libs.api_token_cache.redis_client")
def test_cache_penetration_prevention(self, mock_redis):
"""Test that non-existent tokens are cached as null."""
# Set null token (cache miss)
ApiTokenCache.set("non-existent-token", "app", None)
args = mock_redis.setex.call_args[0]
assert args[2] == "null"
assert args[1] == CACHE_NULL_TTL_SECONDS # Shorter TTL for null values
if __name__ == "__main__":
pytest.main([__file__, "-v"])