Compare commits

...

1 Commits

Author SHA1 Message Date
Harry
ef9a741781 feat(trigger): enhance trigger management with new error handling and response structure
- Added `TriggerInvokeError` and `TriggerIgnoreEventError` for better error categorization during trigger invocation.
- Updated `TriggerInvokeResponse` to include a `cancelled` field, indicating if a trigger was ignored.
- Enhanced `TriggerManager` to handle specific errors and return appropriate responses.
- Refactored `dispatch_triggered_workflows` to improve workflow execution logic and error handling.

These changes improve the robustness and clarity of the trigger management system.
2025-09-23 16:01:59 +08:00
13 changed files with 228 additions and 245 deletions

View File

@@ -247,6 +247,7 @@ class Event(BaseModel):
class TriggerInvokeResponse(BaseModel):
event: Event
cancelled: Optional[bool] = False
class PluginTriggerDispatchResponse(BaseModel):

View File

@@ -49,7 +49,7 @@ class TriggerProviderApiEntity(BaseModel):
supported_creation_methods: list[TriggerCreationMethod] = Field(
default_factory=list,
description="Supported creation methods for the trigger provider. Possible values: 'OAUTH', 'APIKEY', 'MANUAL'."
description="Supported creation methods for the trigger provider. like 'OAUTH', 'APIKEY', 'MANUAL'.",
)
credentials_schema: list[ProviderConfig] = Field(description="The credentials schema of the trigger provider")

View File

@@ -269,11 +269,6 @@ class TriggerInputs(BaseModel):
trigger_name: str
subscription_id: str
@classmethod
def from_trigger_entity(cls, request_id: str, subscription_id: str, trigger: TriggerEntity) -> "TriggerInputs":
"""Create from trigger entity (for production)."""
return cls(request_id=request_id, trigger_name=trigger.identity.name, subscription_id=subscription_id)
def to_workflow_args(self) -> dict[str, Any]:
"""Convert to workflow arguments format."""
return {"inputs": self.model_dump(), "files": []}
@@ -282,11 +277,13 @@ class TriggerInputs(BaseModel):
"""Convert to dict (alias for model_dump)."""
return self.model_dump()
class TriggerCreationMethod(StrEnum):
OAUTH = "OAUTH"
APIKEY = "APIKEY"
MANUAL = "MANUAL"
# Export all entities
__all__ = [
"OAuthSchema",

View File

@@ -1,2 +1,8 @@
class TriggerProviderCredentialValidationError(ValueError):
pass
class TriggerInvokeError(Exception):
pass
class TriggerIgnoreEventError(TriggerInvokeError):
pass

View File

@@ -12,7 +12,8 @@ from flask import Request
import contexts
from core.plugin.entities.plugin import TriggerProviderID
from core.plugin.entities.plugin_daemon import CredentialType
from core.plugin.entities.request import TriggerInvokeResponse
from core.plugin.entities.request import Event, TriggerInvokeResponse
from core.plugin.impl.exc import PluginInvokeError
from core.plugin.impl.trigger import PluginTriggerManager
from core.trigger.entities.entities import (
Subscription,
@@ -168,7 +169,14 @@ class TriggerManager:
trigger = provider.get_trigger(trigger_name)
if not trigger:
raise ValueError(f"Trigger {trigger_name} not found in provider {provider_id}")
return provider.invoke_trigger(user_id, trigger_name, parameters, credentials, credential_type, request)
try:
return provider.invoke_trigger(user_id, trigger_name, parameters, credentials, credential_type, request)
except PluginInvokeError as e:
if e.get_error_type() == "TriggerIgnoreEventError":
return TriggerInvokeResponse(event=Event(variables={}), cancelled=True)
else:
logger.exception("Failed to invoke trigger")
raise
@classmethod
def subscribe_trigger(

View File

@@ -1,18 +1,11 @@
from collections.abc import Mapping
from typing import Any, Optional
from core.plugin.entities.plugin import TriggerProviderID
from core.plugin.impl.exc import PluginDaemonClientSideError, PluginInvokeError
from core.plugin.utils.http_parser import deserialize_request
from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity
from core.trigger.trigger_manager import TriggerManager
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.nodes.base import BaseNode
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
from core.workflow.nodes.enums import ErrorStrategy, NodeType
from extensions.ext_storage import storage
from services.trigger.trigger_provider_service import TriggerProviderService
from .entities import PluginTriggerData
@@ -78,74 +71,9 @@ class TriggerPluginNode(BaseNode):
"plugin_unique_identifier": self._node_data.plugin_unique_identifier,
},
}
request_id = trigger_inputs.get("request_id")
trigger_name = trigger_inputs.get("trigger_name", "")
subscription_id = trigger_inputs.get("subscription_id", "")
if not request_id or not subscription_id:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=trigger_inputs,
outputs={"error": "No request ID or subscription ID available"},
)
try:
subscription: TriggerProviderSubscriptionApiEntity | None = TriggerProviderService.get_subscription_by_id(
tenant_id=self.tenant_id, subscription_id=subscription_id
)
if not subscription:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=trigger_inputs,
outputs={"error": f"Invalid subscription {subscription_id} not found"},
)
except Exception as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=trigger_inputs,
outputs={"error": f"Failed to get subscription: {str(e)}"},
)
try:
request = deserialize_request(storage.load_once(f"triggers/{request_id}"))
parameters = self._node_data.parameters if hasattr(self, "_node_data") and self._node_data else {}
invoke_response = TriggerManager.invoke_trigger(
tenant_id=self.tenant_id,
user_id=self.user_id,
provider_id=TriggerProviderID(subscription.provider),
trigger_name=trigger_name,
parameters=parameters,
credentials=subscription.credentials,
credential_type=subscription.credential_type,
request=request,
)
outputs = invoke_response.event.variables or {}
return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs=trigger_inputs, outputs=outputs)
except PluginInvokeError as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=trigger_inputs,
metadata=metadata,
error="An error occurred in the plugin, "
f"please contact the author of {subscription.provider} for help, "
f"error type: {e.get_error_type()}, "
f"error details: {e.get_error_message()}",
error_type=type(e).__name__,
)
except PluginDaemonClientSideError as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=trigger_inputs,
metadata=metadata,
error=f"Failed to invoke trigger, error: {e.description}",
error_type=type(e).__name__,
)
except Exception as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=trigger_inputs,
metadata=metadata,
error=f"Failed to invoke trigger: {str(e)}",
error_type=type(e).__name__,
)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs=trigger_inputs,
outputs=trigger_inputs,
metadata=metadata,
)

View File

@@ -33,7 +33,7 @@ def calculate_next_run_at(
parts = cron_expression.strip().split()
# Support both 5-field format and predefined expressions (matching frontend)
if len(parts) != 5 and not cron_expression.startswith('@'):
if len(parts) != 5 and not cron_expression.startswith("@"):
raise ValueError(
f"Cron expression must have exactly 5 fields or be a predefined expression "
f"(@daily, @weekly, etc.). Got {len(parts)} fields: '{cron_expression}'"

View File

@@ -169,9 +169,7 @@ class TriggerSubscriptionBuilderService:
expires_at=-1,
)
cache_key = cls.encode_cache_key(subscription_id)
redis_client.setex(
cache_key, cls.__BUILDER_CACHE_EXPIRE_SECONDS__, subscription_builder.model_dump_json()
)
redis_client.setex(cache_key, cls.__BUILDER_CACHE_EXPIRE_SECONDS__, subscription_builder.model_dump_json())
return cls.builder_to_api_entity(controller=provider_controller, entity=subscription_builder)
@classmethod

View File

@@ -1,15 +1,19 @@
import logging
import time
import uuid
from collections.abc import Mapping, Sequence
from flask import Request, Response
from sqlalchemy import select
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from core.plugin.entities.plugin import TriggerProviderID
from core.plugin.utils.http_parser import serialize_request
from core.trigger.entities.entities import TriggerEntity, TriggerInputs
from core.plugin.entities.plugin_daemon import CredentialType
from core.plugin.utils.http_parser import deserialize_request, serialize_request
from core.trigger.entities.entities import TriggerEntity
from core.trigger.trigger_manager import TriggerManager
from core.workflow.nodes.enums import NodeType
from core.workflow.nodes.trigger_schedule.exc import TenantOwnerNotFoundError
from extensions.ext_database import db
from extensions.ext_storage import storage
from models.account import Account, TenantAccountJoin, TenantAccountRole
@@ -30,6 +34,40 @@ class TriggerService:
__WEBHOOK_NODE_CACHE_KEY__ = "webhook_nodes"
@classmethod
def _get_latest_workflows_by_app_ids(
cls, session: Session, subscribers: Sequence[WorkflowPluginTrigger]
) -> Mapping[str, Workflow]:
"""Get the latest workflows by app_ids"""
workflow_query = (
select(Workflow.app_id, func.max(Workflow.created_at).label("max_created_at"))
.where(
Workflow.app_id.in_({t.app_id for t in subscribers}),
Workflow.version != Workflow.VERSION_DRAFT,
)
.group_by(Workflow.app_id)
.subquery()
)
workflows = session.scalars(
select(Workflow).join(
workflow_query,
(Workflow.app_id == workflow_query.c.app_id) & (Workflow.created_at == workflow_query.c.max_created_at),
)
).all()
return {w.app_id: w for w in workflows}
@classmethod
def _get_tenant_owner(cls, session: Session, tenant_id: str) -> Account:
"""Get the tenant owner account for workflow execution."""
owner = session.scalar(
select(Account)
.join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id)
.where(TenantAccountJoin.tenant_id == tenant_id, TenantAccountJoin.role == TenantAccountRole.OWNER)
)
if not owner:
raise TenantOwnerNotFoundError(f"Tenant owner not found for tenant {tenant_id}")
return owner
@classmethod
def dispatch_triggered_workflows(
cls, subscription: TriggerSubscription, trigger: TriggerEntity, request_id: str
@@ -41,8 +79,12 @@ class TriggerService:
trigger: The trigger entity that was activated
request_id: The ID of the stored request in storage system
"""
request = deserialize_request(storage.load_once(f"triggers/{request_id}"))
if not request:
logger.error("Request not found for request_id %s", request_id)
return 0
subscribers = cls.get_subscriber_triggers(
subscribers: list[WorkflowPluginTrigger] = cls.get_subscriber_triggers(
tenant_id=subscription.tenant_id, subscription_id=subscription.id, trigger_name=trigger.identity.name
)
if not subscribers:
@@ -53,32 +95,13 @@ class TriggerService:
)
return 0
dispatched_count = 0
with Session(db.engine) as session:
# Get tenant owner for workflow execution
tenant_owner = session.scalar(
select(Account)
.join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id)
.where(
TenantAccountJoin.tenant_id == subscription.tenant_id,
TenantAccountJoin.role == TenantAccountRole.OWNER,
)
)
if not tenant_owner:
logger.error("Tenant owner not found for tenant %s", subscription.tenant_id)
return 0
dispatched_count = 0
tenant_owner = cls._get_tenant_owner(session, subscription.tenant_id)
workflows = cls._get_latest_workflows_by_app_ids(session, subscribers)
for plugin_trigger in subscribers:
# Get workflow
workflow = session.scalar(
select(Workflow)
.where(
Workflow.app_id == plugin_trigger.app_id,
Workflow.version != Workflow.VERSION_DRAFT,
)
.order_by(Workflow.created_at.desc())
)
# Get workflow from mapping
workflow = workflows.get(plugin_trigger.app_id)
if not workflow:
logger.error(
"Workflow not found for app %s",
@@ -86,10 +109,35 @@ class TriggerService:
)
continue
# Create trigger inputs using new structure
trigger_inputs = TriggerInputs.from_trigger_entity(
request_id=request_id, subscription_id=subscription.id, trigger=trigger
# Find the trigger node in the workflow
trigger_node = None
for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN):
if node_id == plugin_trigger.node_id:
trigger_node = node_config
break
if not trigger_node:
logger.error("Trigger node not found for app %s", plugin_trigger.app_id)
continue
# invoke triger
invoke_response = TriggerManager.invoke_trigger(
tenant_id=subscription.tenant_id,
user_id=subscription.user_id,
provider_id=TriggerProviderID(subscription.provider_id),
trigger_name=trigger.identity.name,
parameters=trigger_node.get("config", {}),
credentials=subscription.credentials,
credential_type=CredentialType.of(subscription.credential_type),
request=request,
)
if invoke_response.cancelled:
logger.info(
"Trigger ignored for app %s with trigger %s",
plugin_trigger.app_id,
trigger.identity.name,
)
continue
# Create trigger data for async execution
trigger_data = PluginTriggerData(
@@ -100,7 +148,7 @@ class TriggerService:
trigger_type=WorkflowRunTriggeredFrom.PLUGIN,
plugin_id=subscription.provider_id,
endpoint_id=subscription.endpoint_id,
inputs=trigger_inputs.to_dict(),
inputs=invoke_response.event.variables,
)
# Trigger async workflow
@@ -150,6 +198,7 @@ class TriggerService:
# Production dispatch
from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async
plugin_trigger_dispatch_data = PluginTriggerDispatchData(
endpoint_id=endpoint_id,
provider_id=subscription.provider_id,

View File

@@ -57,6 +57,7 @@ class PluginTriggerData(TriggerData):
plugin_id: str
endpoint_id: str
class PluginTriggerDispatchData(BaseModel):
"""Plugin trigger dispatch data for Celery tasks"""
@@ -67,6 +68,7 @@ class PluginTriggerDispatchData(BaseModel):
triggers: list[str]
request_id: str
class WorkflowTaskData(BaseModel):
"""Lightweight data structure for Celery workflow tasks"""

View File

@@ -43,9 +43,7 @@ def dispatch_triggered_workflows_async(
Returns:
dict: Execution result with status and dispatched trigger count
"""
dispatch_params: PluginTriggerDispatchData = PluginTriggerDispatchData.model_validate(
dispatch_data
)
dispatch_params: PluginTriggerDispatchData = PluginTriggerDispatchData.model_validate(dispatch_data)
endpoint_id = dispatch_params.endpoint_id
provider_id = dispatch_params.provider_id
subscription_id = dispatch_params.subscription_id

View File

@@ -4,6 +4,7 @@ Enhanced cron syntax compatibility tests for croniter backend.
This test suite mirrors the frontend cron-parser tests to ensure
complete compatibility between frontend and backend cron processing.
"""
import unittest
from datetime import UTC, datetime, timedelta
@@ -24,15 +25,15 @@ class TestCronCompatibility(unittest.TestCase):
def test_enhanced_dayofweek_syntax(self):
"""Test enhanced day-of-week syntax compatibility."""
test_cases = [
("0 9 * * 7", 0), # Sunday as 7
("0 9 * * 0", 0), # Sunday as 0
("0 9 * * MON", 1), # Monday abbreviation
("0 9 * * TUE", 2), # Tuesday abbreviation
("0 9 * * WED", 3), # Wednesday abbreviation
("0 9 * * THU", 4), # Thursday abbreviation
("0 9 * * FRI", 5), # Friday abbreviation
("0 9 * * SAT", 6), # Saturday abbreviation
("0 9 * * SUN", 0), # Sunday abbreviation
("0 9 * * 7", 0), # Sunday as 7
("0 9 * * 0", 0), # Sunday as 0
("0 9 * * MON", 1), # Monday abbreviation
("0 9 * * TUE", 2), # Tuesday abbreviation
("0 9 * * WED", 3), # Wednesday abbreviation
("0 9 * * THU", 4), # Thursday abbreviation
("0 9 * * FRI", 5), # Friday abbreviation
("0 9 * * SAT", 6), # Saturday abbreviation
("0 9 * * SUN", 0), # Sunday abbreviation
]
for expr, expected_weekday in test_cases:
@@ -46,18 +47,18 @@ class TestCronCompatibility(unittest.TestCase):
def test_enhanced_month_syntax(self):
"""Test enhanced month syntax compatibility."""
test_cases = [
("0 9 1 JAN *", 1), # January abbreviation
("0 9 1 FEB *", 2), # February abbreviation
("0 9 1 MAR *", 3), # March abbreviation
("0 9 1 APR *", 4), # April abbreviation
("0 9 1 MAY *", 5), # May abbreviation
("0 9 1 JUN *", 6), # June abbreviation
("0 9 1 JUL *", 7), # July abbreviation
("0 9 1 AUG *", 8), # August abbreviation
("0 9 1 SEP *", 9), # September abbreviation
("0 9 1 OCT *", 10), # October abbreviation
("0 9 1 NOV *", 11), # November abbreviation
("0 9 1 DEC *", 12), # December abbreviation
("0 9 1 JAN *", 1), # January abbreviation
("0 9 1 FEB *", 2), # February abbreviation
("0 9 1 MAR *", 3), # March abbreviation
("0 9 1 APR *", 4), # April abbreviation
("0 9 1 MAY *", 5), # May abbreviation
("0 9 1 JUN *", 6), # June abbreviation
("0 9 1 JUL *", 7), # July abbreviation
("0 9 1 AUG *", 8), # August abbreviation
("0 9 1 SEP *", 9), # September abbreviation
("0 9 1 OCT *", 10), # October abbreviation
("0 9 1 NOV *", 11), # November abbreviation
("0 9 1 DEC *", 12), # December abbreviation
]
for expr, expected_month in test_cases:
@@ -89,9 +90,9 @@ class TestCronCompatibility(unittest.TestCase):
def test_special_characters(self):
"""Test special characters in cron expressions."""
test_cases = [
"0 9 ? * 1", # ? wildcard
"0 12 * * 7", # Sunday as 7
"0 15 L * *", # Last day of month
"0 9 ? * 1", # ? wildcard
"0 12 * * 7", # Sunday as 7
"0 15 L * *", # Last day of month
]
for expr in test_cases:
@@ -106,8 +107,8 @@ class TestCronCompatibility(unittest.TestCase):
def test_range_and_list_syntax(self):
"""Test range and list syntax with abbreviations."""
test_cases = [
"0 9 * * MON-FRI", # Weekday range with abbreviations
"0 9 * JAN-MAR *", # Month range with abbreviations
"0 9 * * MON-FRI", # Weekday range with abbreviations
"0 9 * JAN-MAR *", # Month range with abbreviations
"0 9 * * SUN,WED,FRI", # Weekday list with abbreviations
"0 9 1 JAN,JUN,DEC *", # Month list with abbreviations
]
@@ -124,13 +125,13 @@ class TestCronCompatibility(unittest.TestCase):
def test_invalid_enhanced_syntax(self):
"""Test that invalid enhanced syntax is properly rejected."""
invalid_expressions = [
"0 12 * JANUARY *", # Full month name (not supported)
"0 12 * * MONDAY", # Full day name (not supported)
"0 12 32 JAN *", # Invalid day with valid month
"15 10 1 * 8", # Invalid day of week
"15 10 1 INVALID *", # Invalid month abbreviation
"15 10 1 * INVALID", # Invalid day abbreviation
"@invalid", # Invalid predefined expression
"0 12 * JANUARY *", # Full month name (not supported)
"0 12 * * MONDAY", # Full day name (not supported)
"0 12 32 JAN *", # Invalid day with valid month
"15 10 1 * 8", # Invalid day of week
"15 10 1 INVALID *", # Invalid month abbreviation
"15 10 1 * INVALID", # Invalid day abbreviation
"@invalid", # Invalid predefined expression
]
for expr in invalid_expressions:
@@ -221,7 +222,7 @@ class TestTimezoneCompatibility(unittest.TestCase):
def test_half_hour_timezones(self):
"""Test timezones with half-hour offsets."""
timezones_with_offsets = [
("Asia/Kolkata", 17, 30), # UTC+5:30 -> 12:00 UTC = 17:30 IST
("Asia/Kolkata", 17, 30), # UTC+5:30 -> 12:00 UTC = 17:30 IST
("Australia/Adelaide", 22, 30), # UTC+10:30 -> 12:00 UTC = 22:30 ACDT (summer time)
]
@@ -262,7 +263,7 @@ class TestFrontendBackendIntegration(unittest.TestCase):
# This mirrors the exact usage from execution-time-calculator.ts:47
test_data = {
"cron_expression": "30 14 * * 1-5", # 2:30 PM weekdays
"timezone": "America/New_York"
"timezone": "America/New_York",
}
# Get next 5 execution times (like the frontend does)
@@ -270,11 +271,7 @@ class TestFrontendBackendIntegration(unittest.TestCase):
current_base = self.base_time
for _ in range(5):
next_time = calculate_next_run_at(
test_data["cron_expression"],
test_data["timezone"],
current_base
)
next_time = calculate_next_run_at(test_data["cron_expression"], test_data["timezone"], current_base)
assert next_time is not None
execution_times.append(next_time)
current_base = next_time + timedelta(seconds=1) # Move slightly forward
@@ -306,22 +303,19 @@ class TestFrontendBackendIntegration(unittest.TestCase):
{
"frequency": "monthly",
"config": VisualConfig(time="9:00 AM", monthly_days=[1]),
"expected_cron": "0 9 1 * *"
"expected_cron": "0 9 1 * *",
},
# Test with weekday abbreviations
{
"frequency": "weekly",
"config": VisualConfig(time="2:30 PM", weekdays=["mon", "wed", "fri"]),
"expected_cron": "30 14 * * 1,3,5"
}
"expected_cron": "30 14 * * 1,3,5",
},
]
for test_case in visual_configs:
with self.subTest(frequency=test_case["frequency"]):
cron_expr = ScheduleService.visual_to_cron(
test_case["frequency"],
test_case["config"]
)
cron_expr = ScheduleService.visual_to_cron(test_case["frequency"], test_case["config"])
assert cron_expr == test_case["expected_cron"]
# Verify the generated cron expression is valid
@@ -331,14 +325,14 @@ class TestFrontendBackendIntegration(unittest.TestCase):
def test_error_handling_consistency(self):
"""Test that error handling matches frontend expectations."""
invalid_expressions = [
"60 10 1 * *", # Invalid minute
"15 25 1 * *", # Invalid hour
"15 10 32 * *", # Invalid day
"15 10 1 13 *", # Invalid month
"15 10 1", # Too few fields
"15 10 1 * * *", # 6 fields (not supported in frontend)
"0 15 10 1 * * *", # 7 fields (not supported in frontend)
"invalid expression", # Completely invalid
"60 10 1 * *", # Invalid minute
"15 25 1 * *", # Invalid hour
"15 10 32 * *", # Invalid day
"15 10 1 13 *", # Invalid month
"15 10 1", # Too few fields
"15 10 1 * * *", # 6 fields (not supported in frontend)
"0 15 10 1 * * *", # 7 fields (not supported in frontend)
"invalid expression", # Completely invalid
]
for expr in invalid_expressions:
@@ -354,12 +348,12 @@ class TestFrontendBackendIntegration(unittest.TestCase):
import time
complex_expressions = [
"*/5 9-17 * * 1-5", # Every 5 minutes, weekdays, business hours
"0 */2 1,15 * *", # Every 2 hours on 1st and 15th
"30 14 * * 1,3,5", # Mon, Wed, Fri at 14:30
"15,45 8-18 * * 1-5", # 15 and 45 minutes past hour, weekdays
"0 9 * JAN-MAR MON-FRI", # Enhanced syntax: Q1 weekdays at 9 AM
"0 12 ? * SUN", # Enhanced syntax: Sundays at noon with ?
"*/5 9-17 * * 1-5", # Every 5 minutes, weekdays, business hours
"0 */2 1,15 * *", # Every 2 hours on 1st and 15th
"30 14 * * 1,3,5", # Mon, Wed, Fri at 14:30
"15,45 8-18 * * 1-5", # 15 and 45 minutes past hour, weekdays
"0 9 * JAN-MAR MON-FRI", # Enhanced syntax: Q1 weekdays at 9 AM
"0 12 ? * SUN", # Enhanced syntax: Sundays at noon with ?
]
start_time = time.time()
@@ -383,4 +377,5 @@ class TestFrontendBackendIntegration(unittest.TestCase):
if __name__ == "__main__":
# Import timedelta for the test
from datetime import timedelta
unittest.main()
unittest.main()

View File

@@ -4,6 +4,7 @@ Enhanced schedule_utils tests for new cron syntax support.
These tests verify that the backend schedule_utils functions properly support
the enhanced cron syntax introduced in the frontend, ensuring full compatibility.
"""
import unittest
from datetime import UTC, datetime, timedelta
@@ -25,18 +26,18 @@ class TestEnhancedCronSyntax(unittest.TestCase):
def test_month_abbreviations(self):
"""Test month abbreviations (JAN, FEB, etc.)."""
test_cases = [
("0 12 1 JAN *", 1), # January
("0 12 1 FEB *", 2), # February
("0 12 1 MAR *", 3), # March
("0 12 1 APR *", 4), # April
("0 12 1 MAY *", 5), # May
("0 12 1 JUN *", 6), # June
("0 12 1 JUL *", 7), # July
("0 12 1 AUG *", 8), # August
("0 12 1 SEP *", 9), # September
("0 12 1 OCT *", 10), # October
("0 12 1 NOV *", 11), # November
("0 12 1 DEC *", 12), # December
("0 12 1 JAN *", 1), # January
("0 12 1 FEB *", 2), # February
("0 12 1 MAR *", 3), # March
("0 12 1 APR *", 4), # April
("0 12 1 MAY *", 5), # May
("0 12 1 JUN *", 6), # June
("0 12 1 JUL *", 7), # July
("0 12 1 AUG *", 8), # August
("0 12 1 SEP *", 9), # September
("0 12 1 OCT *", 10), # October
("0 12 1 NOV *", 11), # November
("0 12 1 DEC *", 12), # December
]
for expr, expected_month in test_cases:
@@ -51,13 +52,13 @@ class TestEnhancedCronSyntax(unittest.TestCase):
def test_weekday_abbreviations(self):
"""Test weekday abbreviations (SUN, MON, etc.)."""
test_cases = [
("0 9 * * SUN", 6), # Sunday (weekday() = 6)
("0 9 * * MON", 0), # Monday (weekday() = 0)
("0 9 * * TUE", 1), # Tuesday
("0 9 * * WED", 2), # Wednesday
("0 9 * * THU", 3), # Thursday
("0 9 * * FRI", 4), # Friday
("0 9 * * SAT", 5), # Saturday
("0 9 * * SUN", 6), # Sunday (weekday() = 6)
("0 9 * * MON", 0), # Monday (weekday() = 0)
("0 9 * * TUE", 1), # Tuesday
("0 9 * * WED", 2), # Wednesday
("0 9 * * THU", 3), # Thursday
("0 9 * * FRI", 4), # Friday
("0 9 * * SAT", 5), # Saturday
]
for expr, expected_weekday in test_cases:
@@ -112,7 +113,7 @@ class TestEnhancedCronSyntax(unittest.TestCase):
"""Test ? wildcard character."""
# ? in day position with specific weekday
result_question = calculate_next_run_at("0 9 ? * 1", "UTC", self.base_time) # Monday
result_star = calculate_next_run_at("0 9 * * 1", "UTC", self.base_time) # Monday
result_star = calculate_next_run_at("0 9 * * 1", "UTC", self.base_time) # Monday
assert result_question is not None
assert result_star is not None
@@ -141,9 +142,9 @@ class TestEnhancedCronSyntax(unittest.TestCase):
def test_range_with_abbreviations(self):
"""Test ranges using abbreviations."""
test_cases = [
"0 9 * * MON-FRI", # Weekday range
"0 12 * JAN-MAR *", # Q1 months
"0 15 * APR-JUN *", # Q2 months
"0 9 * * MON-FRI", # Weekday range
"0 12 * JAN-MAR *", # Q1 months
"0 15 * APR-JUN *", # Q2 months
]
for expr in test_cases:
@@ -155,7 +156,7 @@ class TestEnhancedCronSyntax(unittest.TestCase):
def test_list_with_abbreviations(self):
"""Test lists using abbreviations."""
test_cases = [
("0 9 * * SUN,WED,FRI", [6, 2, 4]), # Specific weekdays
("0 9 * * SUN,WED,FRI", [6, 2, 4]), # Specific weekdays
("0 12 1 JAN,JUN,DEC *", [1, 6, 12]), # Specific months
]
@@ -172,9 +173,9 @@ class TestEnhancedCronSyntax(unittest.TestCase):
def test_mixed_syntax(self):
"""Test mixed traditional and enhanced syntax."""
test_cases = [
"30 14 15 JAN,JUN,DEC *", # Numbers + month abbreviations
"0 9 * JAN-MAR MON-FRI", # Month range + weekday range
"45 8 1,15 * MON", # Numbers + weekday abbreviation
"30 14 15 JAN,JUN,DEC *", # Numbers + month abbreviations
"0 9 * JAN-MAR MON-FRI", # Month range + weekday range
"45 8 1,15 * MON", # Numbers + weekday abbreviation
]
for expr in test_cases:
@@ -187,9 +188,9 @@ class TestEnhancedCronSyntax(unittest.TestCase):
"""Test complex expressions with multiple enhanced features."""
# Note: Some of these might not be supported by croniter, that's OK
complex_expressions = [
"0 9 L JAN *", # Last day of January
"30 14 * * FRI#1", # First Friday of month (if supported)
"0 12 15 JAN-DEC/3 *", # 15th of every 3rd month (quarterly)
"0 9 L JAN *", # Last day of January
"30 14 * * FRI#1", # First Friday of month (if supported)
"0 12 15 JAN-DEC/3 *", # 15th of every 3rd month (quarterly)
]
for expr in complex_expressions:
@@ -272,13 +273,13 @@ class TestErrorHandlingEnhanced(unittest.TestCase):
def test_invalid_enhanced_syntax(self):
"""Test that invalid enhanced syntax raises appropriate errors."""
invalid_expressions = [
"0 12 * JANUARY *", # Full month name
"0 12 * * MONDAY", # Full day name
"0 12 32 JAN *", # Invalid day with valid month
"0 12 * * MON-SUN-FRI", # Invalid range syntax
"0 12 * JAN- *", # Incomplete range
"0 12 * * ,MON", # Invalid list syntax
"@INVALID", # Invalid predefined
"0 12 * JANUARY *", # Full month name
"0 12 * * MONDAY", # Full day name
"0 12 32 JAN *", # Invalid day with valid month
"0 12 * * MON-SUN-FRI", # Invalid range syntax
"0 12 * JAN- *", # Incomplete range
"0 12 * * ,MON", # Invalid list syntax
"@INVALID", # Invalid predefined
]
for expr in invalid_expressions:
@@ -290,9 +291,9 @@ class TestErrorHandlingEnhanced(unittest.TestCase):
"""Test boundary values work with enhanced syntax."""
# Valid boundary expressions
valid_expressions = [
"0 0 1 JAN *", # Minimum: January 1st midnight
"59 23 31 DEC *", # Maximum: December 31st 23:59
"0 12 29 FEB *", # Leap year boundary
"0 0 1 JAN *", # Minimum: January 1st midnight
"59 23 31 DEC *", # Maximum: December 31st 23:59
"0 12 29 FEB *", # Leap year boundary
]
for expr in valid_expressions:
@@ -318,11 +319,11 @@ class TestPerformanceEnhanced(unittest.TestCase):
import time
complex_expressions = [
"*/5 9-17 * * MON-FRI", # Every 5 min, weekdays, business hours
"0 9 * JAN-MAR MON-FRI", # Q1 weekdays at 9 AM
"30 14 1,15 * * ", # 1st and 15th at 14:30
"0 12 ? * SUN", # Sundays at noon with ?
"@daily", # Predefined expression
"*/5 9-17 * * MON-FRI", # Every 5 min, weekdays, business hours
"0 9 * JAN-MAR MON-FRI", # Q1 weekdays at 9 AM
"30 14 1,15 * * ", # 1st and 15th at 14:30
"0 12 ? * SUN", # Sundays at noon with ?
"@daily", # Predefined expression
]
start_time = time.time()
@@ -375,12 +376,12 @@ class TestRegressionEnhanced(unittest.TestCase):
def test_traditional_syntax_still_works(self):
"""Ensure traditional cron syntax continues to work."""
traditional_expressions = [
"15 10 1 * *", # Monthly 1st at 10:15
"0 0 * * 0", # Weekly Sunday midnight
"*/5 * * * *", # Every 5 minutes
"0 9-17 * * 1-5", # Business hours weekdays
"30 14 * * 1", # Monday 14:30
"0 0 1,15 * *", # 1st and 15th midnight
"15 10 1 * *", # Monthly 1st at 10:15
"0 0 * * 0", # Weekly Sunday midnight
"*/5 * * * *", # Every 5 minutes
"0 9-17 * * 1-5", # Business hours weekdays
"30 14 * * 1", # Monday 14:30
"0 0 1,15 * *", # 1st and 15th midnight
]
for expr in traditional_expressions:
@@ -392,12 +393,12 @@ class TestRegressionEnhanced(unittest.TestCase):
def test_convert_12h_to_24h_unchanged(self):
"""Ensure convert_12h_to_24h function is unchanged."""
test_cases = [
("12:00 AM", (0, 0)), # Midnight
("12:00 PM", (12, 0)), # Noon
("1:30 AM", (1, 30)), # Early morning
("11:45 PM", (23, 45)), # Late evening
("6:15 AM", (6, 15)), # Morning
("3:30 PM", (15, 30)), # Afternoon
("12:00 AM", (0, 0)), # Midnight
("12:00 PM", (12, 0)), # Noon
("1:30 AM", (1, 30)), # Early morning
("11:45 PM", (23, 45)), # Late evening
("6:15 AM", (6, 15)), # Morning
("3:30 PM", (15, 30)), # Afternoon
]
for time_str, expected in test_cases:
@@ -407,4 +408,4 @@ class TestRegressionEnhanced(unittest.TestCase):
if __name__ == "__main__":
unittest.main()
unittest.main()