diff --git a/.codex/skills/component-refactoring b/.codex/skills/component-refactoring deleted file mode 120000 index 53ae67e2f2..0000000000 --- a/.codex/skills/component-refactoring +++ /dev/null @@ -1 +0,0 @@ -../../.agents/skills/component-refactoring \ No newline at end of file diff --git a/.codex/skills/frontend-code-review b/.codex/skills/frontend-code-review deleted file mode 120000 index 55654ffbd7..0000000000 --- a/.codex/skills/frontend-code-review +++ /dev/null @@ -1 +0,0 @@ -../../.agents/skills/frontend-code-review \ No newline at end of file diff --git a/.codex/skills/frontend-testing b/.codex/skills/frontend-testing deleted file mode 120000 index 092cec7745..0000000000 --- a/.codex/skills/frontend-testing +++ /dev/null @@ -1 +0,0 @@ -../../.agents/skills/frontend-testing \ No newline at end of file diff --git a/.codex/skills/orpc-contract-first b/.codex/skills/orpc-contract-first deleted file mode 120000 index da47b335c7..0000000000 --- a/.codex/skills/orpc-contract-first +++ /dev/null @@ -1 +0,0 @@ -../../.agents/skills/orpc-contract-first \ No newline at end of file diff --git a/.github/workflows/autofix.yml b/.github/workflows/autofix.yml index 4a8c61e7d2..4571fd1cd1 100644 --- a/.github/workflows/autofix.yml +++ b/.github/workflows/autofix.yml @@ -79,29 +79,6 @@ jobs: find . -name "*.py" -type f -exec sed -i.bak -E 's/"([^"]+)" \| None/Optional["\1"]/g; s/'"'"'([^'"'"']+)'"'"' \| None/Optional['"'"'\1'"'"']/g' {} \; find . -name "*.py.bak" -type f -delete - - name: Install pnpm - uses: pnpm/action-setup@v4 - with: - package_json_file: web/package.json - run_install: false - - - name: Setup Node.js - uses: actions/setup-node@v6 - with: - node-version: 24 - cache: pnpm - cache-dependency-path: ./web/pnpm-lock.yaml - - - name: Install web dependencies - run: | - cd web - pnpm install --frozen-lockfile - - - name: ESLint autofix - run: | - cd web - pnpm lint:fix || true - # mdformat breaks YAML front matter in markdown files. Add --exclude for directories containing YAML front matter. - name: mdformat run: | diff --git a/.github/workflows/deploy-hitl.yml b/.github/workflows/deploy-hitl.yml index 7d5f0a22e7..6a4702d2da 100644 --- a/.github/workflows/deploy-hitl.yml +++ b/.github/workflows/deploy-hitl.yml @@ -4,8 +4,7 @@ on: workflow_run: workflows: ["Build and Push API & Web"] branches: - - "feat/hitl-frontend" - - "feat/hitl-backend" + - "feat/hitl" types: - completed @@ -14,10 +13,7 @@ jobs: runs-on: ubuntu-latest if: | github.event.workflow_run.conclusion == 'success' && - ( - github.event.workflow_run.head_branch == 'feat/hitl-frontend' || - github.event.workflow_run.head_branch == 'feat/hitl-backend' - ) + github.event.workflow_run.head_branch == 'feat/hitl' steps: - name: Deploy to server uses: appleboy/ssh-action@v1 diff --git a/api/.importlinter b/api/.importlinter index 9dad254560..fb66df7334 100644 --- a/api/.importlinter +++ b/api/.importlinter @@ -102,6 +102,8 @@ forbidden_modules = core.trigger core.variables ignore_imports = + core.workflow.nodes.agent.agent_node -> core.db.session_factory + core.workflow.nodes.agent.agent_node -> models.tools core.workflow.nodes.loop.loop_node -> core.app.workflow.node_factory core.workflow.graph_engine.command_channels.redis_channel -> extensions.ext_redis core.workflow.workflow_entry -> core.app.workflow.layers.observability @@ -136,7 +138,6 @@ ignore_imports = core.workflow.nodes.llm.llm_utils -> models.provider core.workflow.nodes.llm.llm_utils -> services.credit_pool_service core.workflow.nodes.llm.node -> core.tools.signature - core.workflow.nodes.template_transform.template_transform_node -> configs core.workflow.nodes.tool.tool_node -> core.callback_handler.workflow_tool_callback_handler core.workflow.nodes.tool.tool_node -> core.tools.tool_engine core.workflow.nodes.tool.tool_node -> core.tools.tool_manager diff --git a/api/commands.py b/api/commands.py index c685f647dd..ad5550b369 100644 --- a/api/commands.py +++ b/api/commands.py @@ -740,8 +740,10 @@ def upgrade_db(): click.echo(click.style("Database migration successful!", fg="green")) - except Exception: + except Exception as e: logger.exception("Failed to execute database migration") + click.echo(click.style(f"Database migration failed: {e}", fg="red")) + raise SystemExit(1) finally: lock.release() else: diff --git a/api/controllers/console/app/app.py b/api/controllers/console/app/app.py index 02bbb71da7..8aa8b3961a 100644 --- a/api/controllers/console/app/app.py +++ b/api/controllers/console/app/app.py @@ -1,3 +1,4 @@ +import logging import uuid from datetime import datetime from enum import StrEnum @@ -56,6 +57,8 @@ ALLOW_CREATE_APP_MODES = ["chat", "agent-chat", "advanced-chat", "workflow", "co register_enum_models(console_ns, IconType) +_logger = logging.getLogger(__name__) + class RuntimeType(StrEnum): CLASSIC = "classic" @@ -513,6 +516,7 @@ class AppListApi(Resource): select(Workflow).where( Workflow.version == Workflow.VERSION_DRAFT, Workflow.app_id.in_(workflow_capable_app_ids), + Workflow.tenant_id == current_tenant_id, ) ) .scalars() @@ -528,12 +532,14 @@ class AppListApi(Resource): if workflow.get_feature(WorkflowFeatures.SANDBOX).enabled: sandbox_app_ids.add(str(workflow.app_id)) + node_id = None try: - for _, node_data in workflow.walk_nodes(): + for node_id, node_data in workflow.walk_nodes(): if node_data.get("type") in trigger_node_types: draft_trigger_app_ids.add(str(workflow.app_id)) break except Exception: + _logger.exception("error while walking nodes, workflow_id=%s, node_id=%s", workflow.id, node_id) continue for app in app_pagination.items: diff --git a/api/core/app/workflow/node_factory.py b/api/core/app/workflow/node_factory.py index a5773bbef8..6717be3ae6 100644 --- a/api/core/app/workflow/node_factory.py +++ b/api/core/app/workflow/node_factory.py @@ -47,6 +47,7 @@ class DifyNodeFactory(NodeFactory): code_providers: Sequence[type[CodeNodeProvider]] | None = None, code_limits: CodeNodeLimits | None = None, template_renderer: Jinja2TemplateRenderer | None = None, + template_transform_max_output_length: int | None = None, http_request_http_client: HttpClientProtocol | None = None, http_request_tool_file_manager_factory: Callable[[], ToolFileManager] = ToolFileManager, http_request_file_manager: FileManagerProtocol | None = None, @@ -68,6 +69,9 @@ class DifyNodeFactory(NodeFactory): max_object_array_length=dify_config.CODE_MAX_OBJECT_ARRAY_LENGTH, ) self._template_renderer = template_renderer or CodeExecutorJinja2TemplateRenderer() + self._template_transform_max_output_length = ( + template_transform_max_output_length or dify_config.TEMPLATE_TRANSFORM_MAX_LENGTH + ) self._http_request_http_client = http_request_http_client or ssrf_proxy self._http_request_tool_file_manager_factory = http_request_tool_file_manager_factory self._http_request_file_manager = http_request_file_manager or file_manager @@ -122,6 +126,7 @@ class DifyNodeFactory(NodeFactory): graph_init_params=self.graph_init_params, graph_runtime_state=self.graph_runtime_state, template_renderer=self._template_renderer, + max_output_length=self._template_transform_max_output_length, ) if node_type == NodeType.HTTP_REQUEST: diff --git a/api/core/helper/marketplace.py b/api/core/helper/marketplace.py index 25dc4ba9ed..d7b6e82062 100644 --- a/api/core/helper/marketplace.py +++ b/api/core/helper/marketplace.py @@ -6,7 +6,8 @@ from yarl import URL from configs import dify_config from core.helper.download import download_with_size_limit -from core.plugin.entities.marketplace import MarketplacePluginDeclaration +from core.plugin.entities.marketplace import MarketplacePluginDeclaration, MarketplacePluginSnapshot +from extensions.ext_redis import redis_client marketplace_api_url = URL(str(dify_config.MARKETPLACE_API_URL)) logger = logging.getLogger(__name__) @@ -43,28 +44,37 @@ def batch_fetch_plugin_by_ids(plugin_ids: list[str]) -> list[dict]: return data.get("data", {}).get("plugins", []) -def batch_fetch_plugin_manifests_ignore_deserialization_error( - plugin_ids: list[str], -) -> Sequence[MarketplacePluginDeclaration]: - if len(plugin_ids) == 0: - return [] - - url = str(marketplace_api_url / "api/v1/plugins/batch") - response = httpx.post(url, json={"plugin_ids": plugin_ids}, headers={"X-Dify-Version": dify_config.project.version}) - response.raise_for_status() - result: list[MarketplacePluginDeclaration] = [] - for plugin in response.json()["data"]["plugins"]: - try: - result.append(MarketplacePluginDeclaration.model_validate(plugin)) - except Exception: - logger.exception( - "Failed to deserialize marketplace plugin manifest for %s", plugin.get("plugin_id", "unknown") - ) - - return result - - def record_install_plugin_event(plugin_unique_identifier: str): url = str(marketplace_api_url / "api/v1/stats/plugins/install_count") response = httpx.post(url, json={"unique_identifier": plugin_unique_identifier}) response.raise_for_status() + + +def fetch_global_plugin_manifest(cache_key_prefix: str, cache_ttl: int) -> None: + """ + Fetch all plugin manifests from marketplace and cache them in Redis. + This should be called once per check cycle to populate the instance-level cache. + + Args: + cache_key_prefix: Redis key prefix for caching plugin manifests + cache_ttl: Cache TTL in seconds + + Raises: + httpx.HTTPError: If the HTTP request fails + Exception: If any other error occurs during fetching or caching + """ + url = str(marketplace_api_url / "api/v1/dist/plugins/manifest.json") + response = httpx.get(url, headers={"X-Dify-Version": dify_config.project.version}, timeout=30) + response.raise_for_status() + + raw_json = response.json() + plugins_data = raw_json.get("plugins", []) + + # Parse and cache all plugin snapshots + for plugin_data in plugins_data: + plugin_snapshot = MarketplacePluginSnapshot.model_validate(plugin_data) + redis_client.setex( + name=f"{cache_key_prefix}{plugin_snapshot.plugin_id}", + time=cache_ttl, + value=plugin_snapshot.model_dump_json(), + ) diff --git a/api/core/plugin/entities/marketplace.py b/api/core/plugin/entities/marketplace.py index e0762619e6..cf1f7ff0dd 100644 --- a/api/core/plugin/entities/marketplace.py +++ b/api/core/plugin/entities/marketplace.py @@ -1,4 +1,4 @@ -from pydantic import BaseModel, Field, model_validator +from pydantic import BaseModel, Field, computed_field, model_validator from core.model_runtime.entities.provider_entities import ProviderEntity from core.plugin.entities.endpoint import EndpointProviderDeclaration @@ -48,3 +48,15 @@ class MarketplacePluginDeclaration(BaseModel): if "tool" in data and not data["tool"]: del data["tool"] return data + + +class MarketplacePluginSnapshot(BaseModel): + org: str + name: str + latest_version: str + latest_package_identifier: str + latest_package_url: str + + @computed_field + def plugin_id(self) -> str: + return f"{self.org}/{self.name}" diff --git a/api/core/variables/variables.py b/api/core/variables/variables.py index f941594800..681fc9c9c8 100644 --- a/api/core/variables/variables.py +++ b/api/core/variables/variables.py @@ -117,7 +117,7 @@ class ArrayPromptMessageVariable(ArrayPromptMessageSegment, ArrayVariable): class RAGPipelineVariable(BaseModel): belong_to_node_id: str = Field(description="belong to which node id, shared means public") - type: str = Field(description="variable type, text-input, paragraph, select, number, file, file-list") + type: str = Field(description="variable type, text-input, paragraph, select, number, file, file-list") label: str = Field(description="label") description: str | None = Field(description="description", default="") variable: str = Field(description="variable key", default="") diff --git a/api/core/workflow/nodes/agent/agent_node.py b/api/core/workflow/nodes/agent/agent_node.py index 5cb79e4bdd..20438f838e 100644 --- a/api/core/workflow/nodes/agent/agent_node.py +++ b/api/core/workflow/nodes/agent/agent_node.py @@ -2,7 +2,7 @@ from __future__ import annotations import json from collections.abc import Generator, Mapping, Sequence -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, Any, Union, cast from packaging.version import Version from pydantic import ValidationError @@ -11,6 +11,7 @@ from sqlalchemy.orm import Session from core.agent.entities import AgentToolEntity from core.agent.plugin_entities import AgentStrategyParameter +from core.db.session_factory import session_factory from core.file import File, FileTransferMethod from core.memory.base import BaseMemory from core.memory.node_token_buffer_memory import NodeTokenBufferMemory @@ -58,6 +59,12 @@ from factories import file_factory from factories.agent_factory import get_plugin_agent_strategy from models import ToolFile from models.model import Conversation +from models.tools import ( + ApiToolProvider, + BuiltinToolProvider, + MCPToolProvider, + WorkflowToolProvider, +) from services.tools.builtin_tools_manage_service import BuiltinToolManageService from .exc import ( @@ -272,7 +279,7 @@ class AgentNode(Node[AgentNodeData]): value = cast(list[dict[str, Any]], value) tool_value = [] for tool in value: - provider_type = ToolProviderType(tool.get("type", ToolProviderType.BUILT_IN)) + provider_type = self._infer_tool_provider_type(tool, self.tenant_id) setting_params = tool.get("settings", {}) parameters = tool.get("parameters", {}) manual_input_params = [key for key, value in parameters.items() if value is not None] @@ -921,3 +928,34 @@ class AgentNode(Node[AgentNodeData]): llm_usage=llm_usage, ) ) + + @staticmethod + def _infer_tool_provider_type(tool_config: dict[str, Any], tenant_id: str) -> ToolProviderType: + provider_type_str = tool_config.get("type") + if provider_type_str: + return ToolProviderType(provider_type_str) + + provider_id = tool_config.get("provider_name") + if not provider_id: + return ToolProviderType.BUILT_IN + + with session_factory.create_session() as session: + provider_map: dict[ + type[Union[WorkflowToolProvider, MCPToolProvider, ApiToolProvider, BuiltinToolProvider]], + ToolProviderType, + ] = { + WorkflowToolProvider: ToolProviderType.WORKFLOW, + MCPToolProvider: ToolProviderType.MCP, + ApiToolProvider: ToolProviderType.API, + BuiltinToolProvider: ToolProviderType.BUILT_IN, + } + + for provider_model, provider_type in provider_map.items(): + stmt = select(provider_model).where( + provider_model.id == provider_id, + provider_model.tenant_id == tenant_id, + ) + if session.scalar(stmt): + return provider_type + + raise AgentNodeError(f"Tool provider with ID '{provider_id}' not found.") diff --git a/api/core/workflow/nodes/template_transform/template_transform_node.py b/api/core/workflow/nodes/template_transform/template_transform_node.py index f7e0bccccf..3dc8afd9be 100644 --- a/api/core/workflow/nodes/template_transform/template_transform_node.py +++ b/api/core/workflow/nodes/template_transform/template_transform_node.py @@ -1,7 +1,6 @@ from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Any -from configs import dify_config from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus from core.workflow.node_events import NodeRunResult from core.workflow.nodes.base.node import Node @@ -16,12 +15,13 @@ if TYPE_CHECKING: from core.workflow.entities import GraphInitParams from core.workflow.runtime import GraphRuntimeState -MAX_TEMPLATE_TRANSFORM_OUTPUT_LENGTH = dify_config.TEMPLATE_TRANSFORM_MAX_LENGTH +DEFAULT_TEMPLATE_TRANSFORM_MAX_OUTPUT_LENGTH = 400_000 class TemplateTransformNode(Node[TemplateTransformNodeData]): node_type = NodeType.TEMPLATE_TRANSFORM _template_renderer: Jinja2TemplateRenderer + _max_output_length: int def __init__( self, @@ -31,6 +31,7 @@ class TemplateTransformNode(Node[TemplateTransformNodeData]): graph_runtime_state: "GraphRuntimeState", *, template_renderer: Jinja2TemplateRenderer | None = None, + max_output_length: int | None = None, ) -> None: super().__init__( id=id, @@ -40,6 +41,10 @@ class TemplateTransformNode(Node[TemplateTransformNodeData]): ) self._template_renderer = template_renderer or CodeExecutorJinja2TemplateRenderer() + if max_output_length is not None and max_output_length <= 0: + raise ValueError("max_output_length must be a positive integer") + self._max_output_length = max_output_length or DEFAULT_TEMPLATE_TRANSFORM_MAX_OUTPUT_LENGTH + @classmethod def get_default_config(cls, filters: Mapping[str, object] | None = None) -> Mapping[str, object]: """ @@ -69,11 +74,11 @@ class TemplateTransformNode(Node[TemplateTransformNodeData]): except TemplateRenderError as e: return NodeRunResult(inputs=variables, status=WorkflowNodeExecutionStatus.FAILED, error=str(e)) - if len(rendered) > MAX_TEMPLATE_TRANSFORM_OUTPUT_LENGTH: + if len(rendered) > self._max_output_length: return NodeRunResult( inputs=variables, status=WorkflowNodeExecutionStatus.FAILED, - error=f"Output length exceeds {MAX_TEMPLATE_TRANSFORM_OUTPUT_LENGTH} characters", + error=f"Output length exceeds {self._max_output_length} characters", ) return NodeRunResult( diff --git a/api/migrations/versions/2025_12_25_1039-7df29de0f6be_add_credit_pool.py b/api/migrations/versions/2025_12_25_1039-7df29de0f6be_add_credit_pool.py index e89fcee7e5..6a9bfd2be0 100644 --- a/api/migrations/versions/2025_12_25_1039-7df29de0f6be_add_credit_pool.py +++ b/api/migrations/versions/2025_12_25_1039-7df29de0f6be_add_credit_pool.py @@ -10,6 +10,10 @@ import models as models import sqlalchemy as sa from sqlalchemy.dialects import postgresql + +def _is_pg(conn): + return conn.dialect.name == "postgresql" + # revision identifiers, used by Alembic. revision = '7df29de0f6be' down_revision = '03ea244985ce' @@ -19,16 +23,31 @@ depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.create_table('tenant_credit_pools', - sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False), - sa.Column('tenant_id', models.types.StringUUID(), nullable=False), - sa.Column('pool_type', sa.String(length=40), server_default='trial', nullable=False), - sa.Column('quota_limit', sa.BigInteger(), nullable=False), - sa.Column('quota_used', sa.BigInteger(), nullable=False), - sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.PrimaryKeyConstraint('id', name='tenant_credit_pool_pkey') - ) + conn = op.get_bind() + + if _is_pg(conn): + op.create_table('tenant_credit_pools', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('pool_type', sa.String(length=40), server_default='trial', nullable=False), + sa.Column('quota_limit', sa.BigInteger(), nullable=False), + sa.Column('quota_used', sa.BigInteger(), nullable=False), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.PrimaryKeyConstraint('id', name='tenant_credit_pool_pkey') + ) + else: + # For MySQL and other databases, UUID should be generated at application level + op.create_table('tenant_credit_pools', + sa.Column('id', models.types.StringUUID(), nullable=False), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('pool_type', sa.String(length=40), server_default='trial', nullable=False), + sa.Column('quota_limit', sa.BigInteger(), nullable=False), + sa.Column('quota_used', sa.BigInteger(), nullable=False), + sa.Column('created_at', sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.Column('updated_at', sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.PrimaryKeyConstraint('id', name='tenant_credit_pool_pkey') + ) with op.batch_alter_table('tenant_credit_pools', schema=None) as batch_op: batch_op.create_index('tenant_credit_pool_pool_type_idx', ['pool_type'], unique=False) batch_op.create_index('tenant_credit_pool_tenant_id_idx', ['tenant_id'], unique=False) diff --git a/api/models/model.py b/api/models/model.py index 94661db9da..21fd45864c 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -2264,7 +2264,9 @@ class TenantCreditPool(TypeBase): sa.Index("tenant_credit_pool_pool_type_idx", "pool_type"), ) - id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=text("uuid_generate_v4()"), init=False) + id: Mapped[str] = mapped_column( + StringUUID, insert_default=lambda: str(uuid4()), default_factory=lambda: str(uuid4()), init=False + ) tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) pool_type: Mapped[str] = mapped_column(String(40), nullable=False, default="trial", server_default="trial") quota_limit: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0) diff --git a/api/schedule/check_upgradable_plugin_task.py b/api/schedule/check_upgradable_plugin_task.py index e91ce07be3..13d2f24ca0 100644 --- a/api/schedule/check_upgradable_plugin_task.py +++ b/api/schedule/check_upgradable_plugin_task.py @@ -1,16 +1,24 @@ +import logging import math import time import click import app +from core.helper.marketplace import fetch_global_plugin_manifest from extensions.ext_database import db from models.account import TenantPluginAutoUpgradeStrategy from tasks import process_tenant_plugin_autoupgrade_check_task as check_task +logger = logging.getLogger(__name__) + AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL = 15 * 60 # 15 minutes MAX_CONCURRENT_CHECK_TASKS = 20 +# Import cache constants from the task module +CACHE_REDIS_KEY_PREFIX = check_task.CACHE_REDIS_KEY_PREFIX +CACHE_REDIS_TTL = check_task.CACHE_REDIS_TTL + @app.celery.task(queue="plugin") def check_upgradable_plugin_task(): @@ -40,6 +48,22 @@ def check_upgradable_plugin_task(): ) # make sure all strategies are checked in this interval batch_interval_time = (AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL / batch_chunk_count) if batch_chunk_count > 0 else 0 + if total_strategies == 0: + click.echo(click.style("no strategies to process, skipping plugin manifest fetch.", fg="green")) + return + + # Fetch and cache all plugin manifests before processing tenants + # This reduces load on marketplace from 300k requests to 1 request per check cycle + logger.info("fetching global plugin manifest from marketplace") + try: + fetch_global_plugin_manifest(CACHE_REDIS_KEY_PREFIX, CACHE_REDIS_TTL) + logger.info("successfully fetched and cached global plugin manifest") + except Exception as e: + logger.exception("failed to fetch global plugin manifest") + click.echo(click.style(f"failed to fetch global plugin manifest: {e}", fg="red")) + click.echo(click.style("skipping plugin upgrade check for this cycle", fg="yellow")) + return + for i in range(0, total_strategies, MAX_CONCURRENT_CHECK_TASKS): batch_strategies = strategies[i : i + MAX_CONCURRENT_CHECK_TASKS] for strategy in batch_strategies: diff --git a/api/tasks/process_tenant_plugin_autoupgrade_check_task.py b/api/tasks/process_tenant_plugin_autoupgrade_check_task.py index b5e6508006..6ad04aab0d 100644 --- a/api/tasks/process_tenant_plugin_autoupgrade_check_task.py +++ b/api/tasks/process_tenant_plugin_autoupgrade_check_task.py @@ -6,8 +6,8 @@ import typing import click from celery import shared_task -from core.helper import marketplace -from core.helper.marketplace import MarketplacePluginDeclaration +from core.helper.marketplace import record_install_plugin_event +from core.plugin.entities.marketplace import MarketplacePluginSnapshot from core.plugin.entities.plugin import PluginInstallationSource from core.plugin.impl.plugin import PluginInstaller from extensions.ext_redis import redis_client @@ -16,7 +16,7 @@ from models.account import TenantPluginAutoUpgradeStrategy logger = logging.getLogger(__name__) RETRY_TIMES_OF_ONE_PLUGIN_IN_ONE_TENANT = 3 -CACHE_REDIS_KEY_PREFIX = "plugin_autoupgrade_check_task:cached_plugin_manifests:" +CACHE_REDIS_KEY_PREFIX = "plugin_autoupgrade_check_task:cached_plugin_snapshot:" CACHE_REDIS_TTL = 60 * 60 # 1 hour @@ -25,11 +25,11 @@ def _get_redis_cache_key(plugin_id: str) -> str: return f"{CACHE_REDIS_KEY_PREFIX}{plugin_id}" -def _get_cached_manifest(plugin_id: str) -> typing.Union[MarketplacePluginDeclaration, None, bool]: +def _get_cached_manifest(plugin_id: str) -> typing.Union[MarketplacePluginSnapshot, None, bool]: """ Get cached plugin manifest from Redis. Returns: - - MarketplacePluginDeclaration: if found in cache + - MarketplacePluginSnapshot: if found in cache - None: if cached as not found (marketplace returned no result) - False: if not in cache at all """ @@ -43,76 +43,31 @@ def _get_cached_manifest(plugin_id: str) -> typing.Union[MarketplacePluginDeclar if cached_json is None: return None - return MarketplacePluginDeclaration.model_validate(cached_json) + return MarketplacePluginSnapshot.model_validate(cached_json) except Exception: logger.exception("Failed to get cached manifest for plugin %s", plugin_id) return False -def _set_cached_manifest(plugin_id: str, manifest: typing.Union[MarketplacePluginDeclaration, None]) -> None: - """ - Cache plugin manifest in Redis. - Args: - plugin_id: The plugin ID - manifest: The manifest to cache, or None if not found in marketplace - """ - try: - key = _get_redis_cache_key(plugin_id) - if manifest is None: - # Cache the fact that this plugin was not found - redis_client.setex(key, CACHE_REDIS_TTL, json.dumps(None)) - else: - # Cache the manifest data - redis_client.setex(key, CACHE_REDIS_TTL, manifest.model_dump_json()) - except Exception: - # If Redis fails, continue without caching - # traceback.print_exc() - logger.exception("Failed to set cached manifest for plugin %s", plugin_id) - - def marketplace_batch_fetch_plugin_manifests( plugin_ids_plain_list: list[str], -) -> list[MarketplacePluginDeclaration]: - """Fetch plugin manifests with Redis caching support.""" - cached_manifests: dict[str, typing.Union[MarketplacePluginDeclaration, None]] = {} - not_cached_plugin_ids: list[str] = [] +) -> list[MarketplacePluginSnapshot]: + """ + Fetch plugin manifests from Redis cache only. + This function assumes fetch_global_plugin_manifest() has been called + to pre-populate the cache with all marketplace plugins. + """ + result: list[MarketplacePluginSnapshot] = [] # Check Redis cache for each plugin for plugin_id in plugin_ids_plain_list: cached_result = _get_cached_manifest(plugin_id) - if cached_result is False: - # Not in cache, need to fetch - not_cached_plugin_ids.append(plugin_id) - else: - # Either found manifest or cached as None (not found in marketplace) - # At this point, cached_result is either MarketplacePluginDeclaration or None - if isinstance(cached_result, bool): - # This should never happen due to the if condition above, but for type safety - continue - cached_manifests[plugin_id] = cached_result + if not isinstance(cached_result, MarketplacePluginSnapshot): + # cached_result is False (not in cache) or None (cached as not found) + logger.warning("plugin %s not found in cache, skipping", plugin_id) + continue - # Fetch uncached plugins from marketplace - if not_cached_plugin_ids: - manifests = marketplace.batch_fetch_plugin_manifests_ignore_deserialization_error(not_cached_plugin_ids) - - # Cache the fetched manifests - for manifest in manifests: - cached_manifests[manifest.plugin_id] = manifest - _set_cached_manifest(manifest.plugin_id, manifest) - - # Cache plugins that were not found in marketplace - fetched_plugin_ids = {manifest.plugin_id for manifest in manifests} - for plugin_id in not_cached_plugin_ids: - if plugin_id not in fetched_plugin_ids: - cached_manifests[plugin_id] = None - _set_cached_manifest(plugin_id, None) - - # Build result list from cached manifests - result: list[MarketplacePluginDeclaration] = [] - for plugin_id in plugin_ids_plain_list: - cached_manifest: typing.Union[MarketplacePluginDeclaration, None] = cached_manifests.get(plugin_id) - if cached_manifest is not None: - result.append(cached_manifest) + result.append(cached_result) return result @@ -211,7 +166,7 @@ def process_tenant_plugin_autoupgrade_check_task( # execute upgrade new_unique_identifier = manifest.latest_package_identifier - marketplace.record_install_plugin_event(new_unique_identifier) + record_install_plugin_event(new_unique_identifier) click.echo( click.style( f"Upgrade plugin: {original_unique_identifier} -> {new_unique_identifier}", diff --git a/api/tests/unit_tests/core/workflow/nodes/agent/__init__.py b/api/tests/unit_tests/core/workflow/nodes/agent/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/tests/unit_tests/core/workflow/nodes/agent/test_agent_node.py b/api/tests/unit_tests/core/workflow/nodes/agent/test_agent_node.py new file mode 100644 index 0000000000..a95892d0b6 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/agent/test_agent_node.py @@ -0,0 +1,197 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from core.tools.entities.tool_entities import ToolProviderType +from core.workflow.nodes.agent.agent_node import AgentNode + + +class TestInferToolProviderType: + """Test cases for AgentNode._infer_tool_provider_type method.""" + + def test_infer_type_from_config_workflow(self): + """Test inferring workflow provider type from config.""" + tool_config = { + "type": "workflow", + "provider_name": "workflow-provider-id", + } + tenant_id = "test-tenant" + + result = AgentNode._infer_tool_provider_type(tool_config, tenant_id) + + assert result == ToolProviderType.WORKFLOW + + def test_infer_type_from_config_builtin(self): + """Test inferring builtin provider type from config.""" + tool_config = { + "type": "builtin", + "provider_name": "builtin-provider-id", + } + tenant_id = "test-tenant" + + result = AgentNode._infer_tool_provider_type(tool_config, tenant_id) + + assert result == ToolProviderType.BUILT_IN + + def test_infer_type_from_config_api(self): + """Test inferring API provider type from config.""" + tool_config = { + "type": "api", + "provider_name": "api-provider-id", + } + tenant_id = "test-tenant" + + result = AgentNode._infer_tool_provider_type(tool_config, tenant_id) + + assert result == ToolProviderType.API + + def test_infer_type_from_config_mcp(self): + """Test inferring MCP provider type from config.""" + tool_config = { + "type": "mcp", + "provider_name": "mcp-provider-id", + } + tenant_id = "test-tenant" + + result = AgentNode._infer_tool_provider_type(tool_config, tenant_id) + + assert result == ToolProviderType.MCP + + def test_infer_type_invalid_config_value_raises_error(self): + """Test that invalid type value in config raises ValueError.""" + tool_config = { + "type": "invalid-type", + "provider_name": "workflow-provider-id", + } + tenant_id = "test-tenant" + + with pytest.raises(ValueError): + AgentNode._infer_tool_provider_type(tool_config, tenant_id) + + def test_infer_workflow_type_from_database(self): + """Test inferring workflow provider type from database.""" + tool_config = { + "provider_name": "workflow-provider-id", + } + tenant_id = "test-tenant" + + with patch("core.db.session_factory.session_factory.create_session") as mock_create_session: + mock_session = MagicMock() + mock_create_session.return_value.__enter__.return_value = mock_session + + # First query (WorkflowToolProvider) returns a result + mock_session.scalar.return_value = True + + result = AgentNode._infer_tool_provider_type(tool_config, tenant_id) + + assert result == ToolProviderType.WORKFLOW + # Should only query once (after finding WorkflowToolProvider) + assert mock_session.scalar.call_count == 1 + + def test_infer_mcp_type_from_database(self): + """Test inferring MCP provider type from database.""" + tool_config = { + "provider_name": "mcp-provider-id", + } + tenant_id = "test-tenant" + + with patch("core.db.session_factory.session_factory.create_session") as mock_create_session: + mock_session = MagicMock() + mock_create_session.return_value.__enter__.return_value = mock_session + + # First query (WorkflowToolProvider) returns None + # Second query (MCPToolProvider) returns a result + mock_session.scalar.side_effect = [None, True] + + result = AgentNode._infer_tool_provider_type(tool_config, tenant_id) + + assert result == ToolProviderType.MCP + assert mock_session.scalar.call_count == 2 + + def test_infer_api_type_from_database(self): + """Test inferring API provider type from database.""" + tool_config = { + "provider_name": "api-provider-id", + } + tenant_id = "test-tenant" + + with patch("core.db.session_factory.session_factory.create_session") as mock_create_session: + mock_session = MagicMock() + mock_create_session.return_value.__enter__.return_value = mock_session + + # First query (WorkflowToolProvider) returns None + # Second query (MCPToolProvider) returns None + # Third query (ApiToolProvider) returns a result + mock_session.scalar.side_effect = [None, None, True] + + result = AgentNode._infer_tool_provider_type(tool_config, tenant_id) + + assert result == ToolProviderType.API + assert mock_session.scalar.call_count == 3 + + def test_infer_builtin_type_from_database(self): + """Test inferring builtin provider type from database.""" + tool_config = { + "provider_name": "builtin-provider-id", + } + tenant_id = "test-tenant" + + with patch("core.db.session_factory.session_factory.create_session") as mock_create_session: + mock_session = MagicMock() + mock_create_session.return_value.__enter__.return_value = mock_session + + # First three queries return None + # Fourth query (BuiltinToolProvider) returns a result + mock_session.scalar.side_effect = [None, None, None, True] + + result = AgentNode._infer_tool_provider_type(tool_config, tenant_id) + + assert result == ToolProviderType.BUILT_IN + assert mock_session.scalar.call_count == 4 + + def test_infer_type_default_when_not_found(self): + """Test raising AgentNodeError when provider is not found in database.""" + tool_config = { + "provider_name": "unknown-provider-id", + } + tenant_id = "test-tenant" + + with patch("core.db.session_factory.session_factory.create_session") as mock_create_session: + mock_session = MagicMock() + mock_create_session.return_value.__enter__.return_value = mock_session + + # All queries return None + mock_session.scalar.return_value = None + + # Current implementation raises AgentNodeError when provider not found + from core.workflow.nodes.agent.exc import AgentNodeError + + with pytest.raises(AgentNodeError, match="Tool provider with ID 'unknown-provider-id' not found"): + AgentNode._infer_tool_provider_type(tool_config, tenant_id) + + def test_infer_type_default_when_no_provider_name(self): + """Test defaulting to BUILT_IN when provider_name is missing.""" + tool_config = {} + tenant_id = "test-tenant" + + result = AgentNode._infer_tool_provider_type(tool_config, tenant_id) + + assert result == ToolProviderType.BUILT_IN + + def test_infer_type_database_exception_propagates(self): + """Test that database exception propagates (current implementation doesn't catch it).""" + tool_config = { + "provider_name": "provider-id", + } + tenant_id = "test-tenant" + + with patch("core.db.session_factory.session_factory.create_session") as mock_create_session: + mock_session = MagicMock() + mock_create_session.return_value.__enter__.return_value = mock_session + + # Database query raises exception + mock_session.scalar.side_effect = Exception("Database error") + + # Current implementation doesn't catch exceptions, so it propagates + with pytest.raises(Exception, match="Database error"): + AgentNode._infer_tool_provider_type(tool_config, tenant_id) diff --git a/api/tests/unit_tests/core/workflow/nodes/template_transform/template_transform_node_spec.py b/api/tests/unit_tests/core/workflow/nodes/template_transform/template_transform_node_spec.py index 66d6c3c56b..61bdcbd250 100644 --- a/api/tests/unit_tests/core/workflow/nodes/template_transform/template_transform_node_spec.py +++ b/api/tests/unit_tests/core/workflow/nodes/template_transform/template_transform_node_spec.py @@ -217,7 +217,6 @@ class TestTemplateTransformNode: @patch( "core.workflow.nodes.template_transform.template_transform_node.CodeExecutorJinja2TemplateRenderer.render_template" ) - @patch("core.workflow.nodes.template_transform.template_transform_node.MAX_TEMPLATE_TRANSFORM_OUTPUT_LENGTH", 10) def test_run_output_length_exceeds_limit( self, mock_execute, basic_node_data, mock_graph, mock_graph_runtime_state, graph_init_params ): @@ -231,6 +230,7 @@ class TestTemplateTransformNode: graph_init_params=graph_init_params, graph=mock_graph, graph_runtime_state=mock_graph_runtime_state, + max_output_length=10, ) result = node._run() diff --git a/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/overview/time-range-picker/range-selector.tsx b/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/overview/time-range-picker/range-selector.tsx index 986170728f..9f6df25aa6 100644 --- a/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/overview/time-range-picker/range-selector.tsx +++ b/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/overview/time-range-picker/range-selector.tsx @@ -57,7 +57,7 @@ const RangeSelector: FC = ({ {selected && (