mirror of
https://github.com/langgenius/dify.git
synced 2025-12-30 02:57:19 +00:00
Compare commits
5 Commits
feat/agent
...
fix/surfac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6b7ff0aeb | ||
|
|
16aa9254ad | ||
|
|
1ddaece735 | ||
|
|
9e990c5ccd | ||
|
|
b61fd8fcff |
@@ -4,12 +4,11 @@ from typing import Any
|
||||
|
||||
from flask import make_response, redirect, request
|
||||
from flask_restx import Resource, reqparse
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
from sqlalchemy.orm import Session
|
||||
from werkzeug.exceptions import BadRequest, Forbidden
|
||||
|
||||
from configs import dify_config
|
||||
from constants import HIDDEN_VALUE, UNKNOWN_VALUE
|
||||
from controllers.web.error import NotFoundError
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
@@ -44,6 +43,12 @@ class TriggerSubscriptionUpdateRequest(BaseModel):
|
||||
parameters: Mapping[str, Any] | None = Field(default=None, description="The parameters for the subscription")
|
||||
properties: Mapping[str, Any] | None = Field(default=None, description="The properties for the subscription")
|
||||
|
||||
@model_validator(mode="after")
|
||||
def check_at_least_one_field(self):
|
||||
if all(v is None for v in (self.name, self.credentials, self.parameters, self.properties)):
|
||||
raise ValueError("At least one of name, credentials, parameters, or properties must be provided")
|
||||
return self
|
||||
|
||||
|
||||
class TriggerSubscriptionVerifyRequest(BaseModel):
|
||||
"""Request payload for verifying subscription credentials."""
|
||||
@@ -333,7 +338,7 @@ class TriggerSubscriptionUpdateApi(Resource):
|
||||
user = current_user
|
||||
assert user.current_tenant_id is not None
|
||||
|
||||
args = TriggerSubscriptionUpdateRequest.model_validate(console_ns.payload)
|
||||
request = TriggerSubscriptionUpdateRequest.model_validate(console_ns.payload)
|
||||
|
||||
subscription = TriggerProviderService.get_subscription_by_id(
|
||||
tenant_id=user.current_tenant_id,
|
||||
@@ -345,50 +350,32 @@ class TriggerSubscriptionUpdateApi(Resource):
|
||||
provider_id = TriggerProviderID(subscription.provider_id)
|
||||
|
||||
try:
|
||||
# rename only
|
||||
if (
|
||||
args.name is not None
|
||||
and args.credentials is None
|
||||
and args.parameters is None
|
||||
and args.properties is None
|
||||
):
|
||||
# For rename only, just update the name
|
||||
rename = request.name is not None and not any((request.credentials, request.parameters, request.properties))
|
||||
# When credential type is UNAUTHORIZED, it indicates the subscription was manually created
|
||||
# For Manually created subscription, they dont have credentials, parameters
|
||||
# They only have name and properties(which is input by user)
|
||||
manually_created = subscription.credential_type == CredentialType.UNAUTHORIZED
|
||||
if rename or manually_created:
|
||||
TriggerProviderService.update_trigger_subscription(
|
||||
tenant_id=user.current_tenant_id,
|
||||
subscription_id=subscription_id,
|
||||
name=args.name,
|
||||
name=request.name,
|
||||
properties=request.properties,
|
||||
)
|
||||
return 200
|
||||
|
||||
# rebuild for create automatically by the provider
|
||||
match subscription.credential_type:
|
||||
case CredentialType.UNAUTHORIZED:
|
||||
TriggerProviderService.update_trigger_subscription(
|
||||
tenant_id=user.current_tenant_id,
|
||||
subscription_id=subscription_id,
|
||||
name=args.name,
|
||||
properties=args.properties,
|
||||
)
|
||||
return 200
|
||||
case CredentialType.API_KEY | CredentialType.OAUTH2:
|
||||
if args.credentials:
|
||||
new_credentials: dict[str, Any] = {
|
||||
key: value if value != HIDDEN_VALUE else subscription.credentials.get(key, UNKNOWN_VALUE)
|
||||
for key, value in args.credentials.items()
|
||||
}
|
||||
else:
|
||||
new_credentials = subscription.credentials
|
||||
|
||||
TriggerProviderService.rebuild_trigger_subscription(
|
||||
tenant_id=user.current_tenant_id,
|
||||
name=args.name,
|
||||
provider_id=provider_id,
|
||||
subscription_id=subscription_id,
|
||||
credentials=new_credentials,
|
||||
parameters=args.parameters or subscription.parameters,
|
||||
)
|
||||
return 200
|
||||
case _:
|
||||
raise BadRequest("Invalid credential type")
|
||||
# For the rest cases(API_KEY, OAUTH2)
|
||||
# we need to call third party provider(e.g. GitHub) to rebuild the subscription
|
||||
TriggerProviderService.rebuild_trigger_subscription(
|
||||
tenant_id=user.current_tenant_id,
|
||||
name=request.name,
|
||||
provider_id=provider_id,
|
||||
subscription_id=subscription_id,
|
||||
credentials=request.credentials or subscription.credentials,
|
||||
parameters=request.parameters or subscription.parameters,
|
||||
)
|
||||
return 200
|
||||
except ValueError as e:
|
||||
raise BadRequest(str(e))
|
||||
except Exception as e:
|
||||
|
||||
@@ -853,7 +853,7 @@ class TriggerProviderService:
|
||||
"""
|
||||
Create a subscription builder for rebuilding an existing subscription.
|
||||
|
||||
This method creates a builder pre-filled with data from the rebuild request,
|
||||
This method rebuild the subscription by call DELETE and CREATE API of the third party provider(e.g. GitHub)
|
||||
keeping the same subscription_id and endpoint_id so the webhook URL remains unchanged.
|
||||
|
||||
:param tenant_id: Tenant ID
|
||||
@@ -868,111 +868,50 @@ class TriggerProviderService:
|
||||
if not provider_controller:
|
||||
raise ValueError(f"Provider {provider_id} not found")
|
||||
|
||||
# Use distributed lock to prevent race conditions on the same subscription
|
||||
lock_key = f"trigger_subscription_rebuild_lock:{tenant_id}_{subscription_id}"
|
||||
with redis_client.lock(lock_key, timeout=20):
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
try:
|
||||
# Get subscription within the transaction
|
||||
subscription: TriggerSubscription | None = (
|
||||
session.query(TriggerSubscription).filter_by(tenant_id=tenant_id, id=subscription_id).first()
|
||||
)
|
||||
if not subscription:
|
||||
raise ValueError(f"Subscription {subscription_id} not found")
|
||||
subscription = TriggerProviderService.get_subscription_by_id(
|
||||
tenant_id=tenant_id,
|
||||
subscription_id=subscription_id,
|
||||
)
|
||||
if not subscription:
|
||||
raise ValueError(f"Subscription {subscription_id} not found")
|
||||
|
||||
credential_type = CredentialType.of(subscription.credential_type)
|
||||
if credential_type not in [CredentialType.OAUTH2, CredentialType.API_KEY]:
|
||||
raise ValueError("Credential type not supported for rebuild")
|
||||
credential_type = CredentialType.of(subscription.credential_type)
|
||||
if credential_type not in {CredentialType.OAUTH2, CredentialType.API_KEY}:
|
||||
raise ValueError(f"Credential type {credential_type} not supported for auto creation")
|
||||
|
||||
# Decrypt existing credentials for merging
|
||||
credential_encrypter, _ = create_trigger_provider_encrypter_for_subscription(
|
||||
tenant_id=tenant_id,
|
||||
controller=provider_controller,
|
||||
subscription=subscription,
|
||||
)
|
||||
decrypted_credentials = dict(credential_encrypter.decrypt(subscription.credentials))
|
||||
# Delete the previous subscription
|
||||
user_id = subscription.user_id
|
||||
unsubscribe_result = TriggerManager.unsubscribe_trigger(
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
provider_id=provider_id,
|
||||
subscription=subscription.to_entity(),
|
||||
credentials=subscription.credentials,
|
||||
credential_type=credential_type,
|
||||
)
|
||||
if not unsubscribe_result.success:
|
||||
raise ValueError(f"Failed to delete previous subscription: {unsubscribe_result.message}")
|
||||
|
||||
# Merge credentials: if caller passed HIDDEN_VALUE, retain existing decrypted value
|
||||
merged_credentials: dict[str, Any] = {
|
||||
key: value if value != HIDDEN_VALUE else decrypted_credentials.get(key, UNKNOWN_VALUE)
|
||||
for key, value in credentials.items()
|
||||
}
|
||||
|
||||
user_id = subscription.user_id
|
||||
|
||||
# TODO: Trying to invoke update api of the plugin trigger provider
|
||||
|
||||
# FALLBACK: If the update api is not implemented,
|
||||
# delete the previous subscription and create a new one
|
||||
|
||||
# Unsubscribe the previous subscription (external call, but we'll handle errors)
|
||||
try:
|
||||
TriggerManager.unsubscribe_trigger(
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
provider_id=provider_id,
|
||||
subscription=subscription.to_entity(),
|
||||
credentials=decrypted_credentials,
|
||||
credential_type=credential_type,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception("Error unsubscribing trigger during rebuild", exc_info=e)
|
||||
# Continue anyway - the subscription might already be deleted externally
|
||||
|
||||
# Create a new subscription with the same subscription_id and endpoint_id (external call)
|
||||
new_subscription: TriggerSubscriptionEntity = TriggerManager.subscribe_trigger(
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
provider_id=provider_id,
|
||||
endpoint=generate_plugin_trigger_endpoint_url(subscription.endpoint_id),
|
||||
parameters=parameters,
|
||||
credentials=merged_credentials,
|
||||
credential_type=credential_type,
|
||||
)
|
||||
|
||||
# Update the subscription in the same transaction
|
||||
# Inline update logic to reuse the same session
|
||||
if name is not None and name != subscription.name:
|
||||
existing = (
|
||||
session.query(TriggerSubscription)
|
||||
.filter_by(tenant_id=tenant_id, provider_id=str(provider_id), name=name)
|
||||
.first()
|
||||
)
|
||||
if existing and existing.id != subscription.id:
|
||||
raise ValueError(f"Subscription name '{name}' already exists for this provider")
|
||||
subscription.name = name
|
||||
|
||||
# Update parameters
|
||||
subscription.parameters = dict(parameters)
|
||||
|
||||
# Update credentials with merged (and encrypted) values
|
||||
subscription.credentials = dict(credential_encrypter.encrypt(merged_credentials))
|
||||
|
||||
# Update properties
|
||||
if new_subscription.properties:
|
||||
properties_encrypter, _ = create_provider_encrypter(
|
||||
tenant_id=tenant_id,
|
||||
config=provider_controller.get_properties_schema(),
|
||||
cache=NoOpProviderCredentialCache(),
|
||||
)
|
||||
subscription.properties = dict(properties_encrypter.encrypt(dict(new_subscription.properties)))
|
||||
|
||||
# Update expiration timestamp
|
||||
if new_subscription.expires_at is not None:
|
||||
subscription.expires_at = new_subscription.expires_at
|
||||
|
||||
# Commit the transaction
|
||||
session.commit()
|
||||
|
||||
# Clear subscription cache
|
||||
delete_cache_for_subscription(
|
||||
tenant_id=tenant_id,
|
||||
provider_id=subscription.provider_id,
|
||||
subscription_id=subscription.id,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# Rollback on any error
|
||||
session.rollback()
|
||||
logger.exception("Failed to rebuild trigger subscription", exc_info=e)
|
||||
raise
|
||||
# Create a new subscription with the same subscription_id and endpoint_id
|
||||
new_credentials: dict[str, Any] = {
|
||||
key: value if value != HIDDEN_VALUE else subscription.credentials.get(key, UNKNOWN_VALUE)
|
||||
for key, value in credentials.items()
|
||||
}
|
||||
new_subscription: TriggerSubscriptionEntity = TriggerManager.subscribe_trigger(
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
provider_id=provider_id,
|
||||
endpoint=generate_plugin_trigger_endpoint_url(subscription.endpoint_id),
|
||||
parameters=parameters,
|
||||
credentials=new_credentials,
|
||||
credential_type=credential_type,
|
||||
)
|
||||
TriggerProviderService.update_trigger_subscription(
|
||||
tenant_id=tenant_id,
|
||||
subscription_id=subscription.id,
|
||||
name=name,
|
||||
parameters=parameters,
|
||||
credentials=new_credentials,
|
||||
properties=new_subscription.properties,
|
||||
expires_at=new_subscription.expires_at,
|
||||
)
|
||||
|
||||
@@ -474,64 +474,6 @@ class TestTriggerProviderService:
|
||||
assert subscription.name == original_name
|
||||
assert subscription.parameters == original_parameters
|
||||
|
||||
def test_rebuild_trigger_subscription_unsubscribe_error_continues(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test that unsubscribe errors are handled gracefully and operation continues.
|
||||
|
||||
This test verifies:
|
||||
- Unsubscribe errors are caught and logged but don't stop the rebuild
|
||||
- Rebuild continues even if unsubscribe fails
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = self._create_test_account_and_tenant(
|
||||
db_session_with_containers, mock_external_service_dependencies
|
||||
)
|
||||
|
||||
provider_id = TriggerProviderID("test_org/test_plugin/test_provider")
|
||||
credential_type = CredentialType.API_KEY
|
||||
|
||||
original_credentials = {"api_key": "original-key"}
|
||||
subscription = self._create_test_subscription(
|
||||
db_session_with_containers,
|
||||
tenant.id,
|
||||
account.id,
|
||||
provider_id,
|
||||
credential_type,
|
||||
original_credentials,
|
||||
mock_external_service_dependencies,
|
||||
)
|
||||
|
||||
# Make unsubscribe_trigger raise an error (should be caught and continue)
|
||||
mock_external_service_dependencies["trigger_manager"].unsubscribe_trigger.side_effect = ValueError(
|
||||
"Unsubscribe failed"
|
||||
)
|
||||
|
||||
new_subscription_entity = TriggerSubscriptionEntity(
|
||||
endpoint=subscription.endpoint_id,
|
||||
parameters={},
|
||||
properties={},
|
||||
expires_at=-1,
|
||||
)
|
||||
mock_external_service_dependencies["trigger_manager"].subscribe_trigger.return_value = new_subscription_entity
|
||||
|
||||
# Execute rebuild - should succeed despite unsubscribe error
|
||||
TriggerProviderService.rebuild_trigger_subscription(
|
||||
tenant_id=tenant.id,
|
||||
provider_id=provider_id,
|
||||
subscription_id=subscription.id,
|
||||
credentials={"api_key": "new-key"},
|
||||
parameters={},
|
||||
)
|
||||
|
||||
# Verify subscribe was still called (operation continued)
|
||||
mock_external_service_dependencies["trigger_manager"].subscribe_trigger.assert_called_once()
|
||||
|
||||
# Verify subscription was updated
|
||||
db.session.refresh(subscription)
|
||||
assert subscription.parameters == {}
|
||||
|
||||
def test_rebuild_trigger_subscription_subscription_not_found(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
@@ -558,34 +500,6 @@ class TestTriggerProviderService:
|
||||
parameters={},
|
||||
)
|
||||
|
||||
def test_rebuild_trigger_subscription_provider_not_found(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test error when provider is not found.
|
||||
|
||||
This test verifies:
|
||||
- Proper error is raised when provider doesn't exist
|
||||
"""
|
||||
fake = Faker()
|
||||
account, tenant = self._create_test_account_and_tenant(
|
||||
db_session_with_containers, mock_external_service_dependencies
|
||||
)
|
||||
|
||||
provider_id = TriggerProviderID("non_existent_org/non_existent_plugin/non_existent_provider")
|
||||
|
||||
# Make get_trigger_provider return None
|
||||
mock_external_service_dependencies["trigger_manager"].get_trigger_provider.return_value = None
|
||||
|
||||
with pytest.raises(ValueError, match="Provider.*not found"):
|
||||
TriggerProviderService.rebuild_trigger_subscription(
|
||||
tenant_id=tenant.id,
|
||||
provider_id=provider_id,
|
||||
subscription_id=fake.uuid4(),
|
||||
credentials={},
|
||||
parameters={},
|
||||
)
|
||||
|
||||
def test_rebuild_trigger_subscription_unsupported_credential_type(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
|
||||
Reference in New Issue
Block a user