Compare commits

..

21 Commits

Author SHA1 Message Date
dependabot[bot]
56a8c8d5d3 chore(deps): bump google-auth in /api in the google group
Bumps the google group in /api with 1 update: [google-auth](https://github.com/googleapis/google-auth-library-python).


Updates `google-auth` from 2.49.0 to 2.49.1
- [Release notes](https://github.com/googleapis/google-auth-library-python/releases)
- [Changelog](https://github.com/googleapis/google-auth-library-python/blob/main/CHANGELOG.md)
- [Commits](https://github.com/googleapis/google-auth-library-python/commits)

---
updated-dependencies:
- dependency-name: google-auth
  dependency-version: 2.49.1
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: google
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-16 03:12:40 +00:00
Asuka Minato
dd39fcd9bc ci: Simplify nltk data download in Dockerfile (#33495) 2026-03-16 12:06:20 +09:00
dependabot[bot]
3c587097cd chore(deps): bump the python-packages group in /api with 13 updates (#33484)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-16 11:28:42 +09:00
dependabot[bot]
6a3fcc0a7b chore(deps): bump the llm group across 1 directory with 2 updates (#33491)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-16 11:23:51 +09:00
dependabot[bot]
8d3f2f56d9 chore(deps): bump the storage group in /api with 2 updates (#33481)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-16 11:10:07 +09:00
非法操作
09dad78a5d chore: add indexes for human_input_forms query patterns (#32849)
Co-authored-by: QuantumGhost <obelisk.reg+git@gmail.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Bond Zhu <37842169+MRZHUH@users.noreply.github.com>
2026-03-16 10:10:03 +08:00
dependabot[bot]
c71ecd2fe0 chore(deps-dev): update faker requirement from ~=40.8.0 to ~=40.11.0 in /api in the dev group (#33482)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-16 11:09:41 +09:00
dependabot[bot]
808d186156 chore(deps): bump litellm from 1.82.1 to 1.82.2 in /api in the llm group (#33480)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-16 11:08:28 +09:00
dependabot[bot]
ec0a01a568 chore(deps): bump the github-actions-dependencies group with 4 updates (#33485)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-16 11:07:42 +09:00
dependabot[bot]
ac23a0409e chore(deps): bump the storage group in /api with 2 updates (#33488)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-16 11:07:11 +09:00
wangxiaolei
6ef69ff880 refactor: llm decouple code executor module (#33400)
Co-authored-by: Byron.wang <byron@dify.ai>
2026-03-16 10:06:14 +08:00
dependabot[bot]
a6163f80d1 chore(deps): bump sqlalchemy from 2.0.44 to 2.0.48 in /api in the database group (#33487)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-16 09:57:51 +08:00
eason
551df6ee9c fix: use parameterized queries to prevent SQL injection in vector stores (#33421)
Co-authored-by: easonysliu <easonysliu@tencent.com>
Co-authored-by: Claude (claude-opus-4-6) <noreply@anthropic.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-16 09:41:57 +08:00
-LAN-
101d6d4d04 feat: Remove GPT-4 special-casing from default model selection (#33458)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-16 03:09:20 +08:00
Mahmoud Hamdy
b09a75aae0 fix(api): resolve type errors in service API wraps tests (#33467) 2026-03-16 01:20:42 +09:00
Tyson Cung
98d9fb4aff fix: downgrade image download failure log from ERROR to WARNING (#33429) 2026-03-15 23:04:09 +08:00
Ye Ding
f795d24151 feat: Add Hologres as a VDB & FullText DB choice (#32830)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Crazywoola <100913391+crazywoola@users.noreply.github.com>
2026-03-15 20:55:02 +08:00
-LAN-
0fa7548346 fix: update INTERNAL_FILES_URL example default for Docker Desktop (#33447)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
2026-03-15 19:27:46 +08:00
RickDamon
ac8021fe27 fix: add doc_type to Weaviate properties and default Vector attributes (#33398)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-15 19:25:24 +08:00
lif
f21288df5a fix: preserve custom icons in exported DSL (#33424)
Signed-off-by: majiayu000 <1835304752@qq.com>
2026-03-15 18:52:41 +08:00
-LAN-
fb41b215c8 refactor(api): move workflow knowledge nodes and trigger nodes (#33445) 2026-03-15 15:24:59 +08:00
555 changed files with 9682 additions and 17068 deletions

View File

@@ -27,7 +27,7 @@ jobs:
persist-credentials: false
- name: Setup UV and Python
uses: astral-sh/setup-uv@6ee6290f1cbc4156c0bdd66691b2c144ef8df19a # v7.4.0
uses: astral-sh/setup-uv@e06108dd0aef18192324c70427afc47652e63a82 # v7.5.0
with:
enable-cache: true
python-version: ${{ matrix.python-version }}

View File

@@ -39,7 +39,7 @@ jobs:
with:
python-version: "3.11"
- uses: astral-sh/setup-uv@6ee6290f1cbc4156c0bdd66691b2c144ef8df19a # v7.4.0
- uses: astral-sh/setup-uv@e06108dd0aef18192324c70427afc47652e63a82 # v7.5.0
- name: Generate Docker Compose
if: steps.docker-compose-changes.outputs.any_changed == 'true'

View File

@@ -113,7 +113,7 @@ jobs:
context: "web"
steps:
- name: Download digests
uses: actions/download-artifact@70fc10c6e5e1ce46ad2ea6f2b72d43f7d47b13c3 # v8.0.0
uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8.0.1
with:
path: /tmp/digests
pattern: digests-${{ matrix.context }}-*

View File

@@ -19,7 +19,7 @@ jobs:
persist-credentials: false
- name: Setup UV and Python
uses: astral-sh/setup-uv@6ee6290f1cbc4156c0bdd66691b2c144ef8df19a # v7.4.0
uses: astral-sh/setup-uv@e06108dd0aef18192324c70427afc47652e63a82 # v7.5.0
with:
enable-cache: true
python-version: "3.12"
@@ -69,7 +69,7 @@ jobs:
persist-credentials: false
- name: Setup UV and Python
uses: astral-sh/setup-uv@6ee6290f1cbc4156c0bdd66691b2c144ef8df19a # v7.4.0
uses: astral-sh/setup-uv@e06108dd0aef18192324c70427afc47652e63a82 # v7.5.0
with:
enable-cache: true
python-version: "3.12"

View File

@@ -28,7 +28,7 @@ jobs:
migration-changed: ${{ steps.changes.outputs.migration }}
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 # v3.0.2
- uses: dorny/paths-filter@fbd0ab8f3e69293af611ebaee6363fc25e6d187d # v4.0.1
id: changes
with:
filters: |

View File

@@ -22,7 +22,7 @@ jobs:
fetch-depth: 0
- name: Setup Python & UV
uses: astral-sh/setup-uv@6ee6290f1cbc4156c0bdd66691b2c144ef8df19a # v7.4.0
uses: astral-sh/setup-uv@e06108dd0aef18192324c70427afc47652e63a82 # v7.5.0
with:
enable-cache: true

View File

@@ -33,7 +33,7 @@ jobs:
- name: Setup UV and Python
if: steps.changed-files.outputs.any_changed == 'true'
uses: astral-sh/setup-uv@6ee6290f1cbc4156c0bdd66691b2c144ef8df19a # v7.4.0
uses: astral-sh/setup-uv@e06108dd0aef18192324c70427afc47652e63a82 # v7.5.0
with:
enable-cache: false
python-version: "3.12"

View File

@@ -120,7 +120,7 @@ jobs:
- name: Run Claude Code for Translation Sync
if: steps.detect_changes.outputs.CHANGED_FILES != ''
uses: anthropics/claude-code-action@26ec041249acb0a944c0a47b6c0c13f05dbc5b44 # v1.0.70
uses: anthropics/claude-code-action@cd77b50d2b0808657f8e6774085c8bf54484351c # v1.0.72
with:
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}
github_token: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -31,7 +31,7 @@ jobs:
remove_tool_cache: true
- name: Setup UV and Python
uses: astral-sh/setup-uv@6ee6290f1cbc4156c0bdd66691b2c144ef8df19a # v7.4.0
uses: astral-sh/setup-uv@e06108dd0aef18192324c70427afc47652e63a82 # v7.5.0
with:
enable-cache: true
python-version: ${{ matrix.python-version }}

View File

@@ -26,8 +26,8 @@ jobs:
strategy:
fail-fast: false
matrix:
shardIndex: [1, 2, 3, 4, 5, 6]
shardTotal: [6]
shardIndex: [1, 2, 3, 4]
shardTotal: [4]
defaults:
run:
shell: bash
@@ -77,7 +77,7 @@ jobs:
uses: ./.github/actions/setup-web
- name: Download blob reports
uses: actions/download-artifact@70fc10c6e5e1ce46ad2ea6f2b72d43f7d47b13c3 # v8.0.0
uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8.0.1
with:
path: web/.vitest-reports
pattern: blob-report-*

3
.gitignore vendored
View File

@@ -237,3 +237,6 @@ scripts/stress-test/reports/
# settings
*.local.json
*.local.md
# Code Agent Folder
.qoder/*

View File

@@ -22,10 +22,10 @@ APP_WEB_URL=http://localhost:3000
# Files URL
FILES_URL=http://localhost:5001
# INTERNAL_FILES_URL is used for plugin daemon communication within Docker network.
# Set this to the internal Docker service URL for proper plugin file access.
# Example: INTERNAL_FILES_URL=http://api:5001
INTERNAL_FILES_URL=http://127.0.0.1:5001
# INTERNAL_FILES_URL is used by services running in Docker to reach the API file endpoints.
# For Docker Desktop (Mac/Windows), use http://host.docker.internal:5001 when the API runs on the host.
# For Docker Compose on Linux, use http://api:5001 when the API runs inside the Docker network.
INTERNAL_FILES_URL=http://host.docker.internal:5001
# TRIGGER URL
TRIGGER_URL=http://localhost:5001
@@ -180,7 +180,7 @@ CONSOLE_CORS_ALLOW_ORIGINS=http://localhost:3000,*
COOKIE_DOMAIN=
# Vector database configuration
# Supported values are `weaviate`, `oceanbase`, `qdrant`, `milvus`, `myscale`, `relyt`, `pgvector`, `pgvecto-rs`, `chroma`, `opensearch`, `oracle`, `tencent`, `elasticsearch`, `elasticsearch-ja`, `analyticdb`, `couchbase`, `vikingdb`, `opengauss`, `tablestore`,`vastbase`,`tidb`,`tidb_on_qdrant`,`baidu`,`lindorm`,`huawei_cloud`,`upstash`, `matrixone`.
# Supported values are `weaviate`, `oceanbase`, `qdrant`, `milvus`, `myscale`, `relyt`, `pgvector`, `pgvecto-rs`, `chroma`, `opensearch`, `oracle`, `tencent`, `elasticsearch`, `elasticsearch-ja`, `analyticdb`, `couchbase`, `vikingdb`, `opengauss`, `tablestore`,`vastbase`,`tidb`,`tidb_on_qdrant`,`baidu`,`lindorm`,`huawei_cloud`,`upstash`, `matrixone`, `hologres`.
VECTOR_STORE=weaviate
# Prefix used to create collection name in vector database
VECTOR_INDEX_NAME_PREFIX=Vector_index
@@ -217,6 +217,20 @@ COUCHBASE_PASSWORD=password
COUCHBASE_BUCKET_NAME=Embeddings
COUCHBASE_SCOPE_NAME=_default
# Hologres configuration
# access_key_id is used as the PG username, access_key_secret is used as the PG password
HOLOGRES_HOST=
HOLOGRES_PORT=80
HOLOGRES_DATABASE=
HOLOGRES_ACCESS_KEY_ID=
HOLOGRES_ACCESS_KEY_SECRET=
HOLOGRES_SCHEMA=public
HOLOGRES_TOKENIZER=jieba
HOLOGRES_DISTANCE_METHOD=Cosine
HOLOGRES_BASE_QUANTIZATION_TYPE=rabitq
HOLOGRES_MAX_DEGREE=64
HOLOGRES_EF_CONSTRUCTION=400
# Milvus configuration
MILVUS_URI=http://127.0.0.1:19530
MILVUS_TOKEN=

View File

@@ -96,7 +96,6 @@ ignore_imports =
dify_graph.nodes.tool.tool_node -> core.callback_handler.workflow_tool_callback_handler
dify_graph.nodes.tool.tool_node -> core.tools.tool_engine
dify_graph.nodes.tool.tool_node -> core.tools.tool_manager
dify_graph.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.app.app_config.entities
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.advanced_prompt_transform
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.simple_prompt_transform
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> dify_graph.model_runtime.model_providers.__base.large_language_model
@@ -104,7 +103,6 @@ ignore_imports =
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.model_manager
dify_graph.nodes.question_classifier.question_classifier_node -> core.model_manager
dify_graph.nodes.tool.tool_node -> core.tools.utils.message_transformer
dify_graph.nodes.llm.node -> core.helper.code_executor
dify_graph.nodes.llm.node -> core.llm_generator.output_parser.errors
dify_graph.nodes.llm.node -> core.llm_generator.output_parser.structured_output
dify_graph.nodes.llm.node -> core.model_manager
@@ -116,7 +114,6 @@ ignore_imports =
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.utils.prompt_message_util
dify_graph.nodes.question_classifier.entities -> core.prompt.entities.advanced_prompt_entities
dify_graph.nodes.question_classifier.question_classifier_node -> core.prompt.utils.prompt_message_util
dify_graph.nodes.knowledge_index.entities -> core.rag.retrieval.retrieval_methods
dify_graph.nodes.llm.node -> models.dataset
dify_graph.nodes.llm.file_saver -> core.tools.signature
dify_graph.nodes.llm.file_saver -> core.tools.tool_file_manager

View File

@@ -97,7 +97,7 @@ ENV PATH="${VIRTUAL_ENV}/bin:${PATH}"
# Download nltk data
RUN mkdir -p /usr/local/share/nltk_data \
&& NLTK_DATA=/usr/local/share/nltk_data python -c "import nltk; from unstructured.nlp.tokenize import download_nltk_packages; nltk.download('punkt'); nltk.download('averaged_perceptron_tagger'); nltk.download('stopwords'); download_nltk_packages()" \
&& NLTK_DATA=/usr/local/share/nltk_data python -c "import nltk; nltk.download('punkt'); nltk.download('averaged_perceptron_tagger'); nltk.download('stopwords')" \
&& chmod -R 755 /usr/local/share/nltk_data
ENV TIKTOKEN_CACHE_DIR=/app/api/.tiktoken_cache

View File

@@ -160,6 +160,7 @@ def migrate_knowledge_vector_database():
}
lower_collection_vector_types = {
VectorType.ANALYTICDB,
VectorType.HOLOGRES,
VectorType.CHROMA,
VectorType.MYSCALE,
VectorType.PGVECTO_RS,

View File

@@ -26,6 +26,7 @@ from .vdb.chroma_config import ChromaConfig
from .vdb.clickzetta_config import ClickzettaConfig
from .vdb.couchbase_config import CouchbaseConfig
from .vdb.elasticsearch_config import ElasticsearchConfig
from .vdb.hologres_config import HologresConfig
from .vdb.huawei_cloud_config import HuaweiCloudConfig
from .vdb.iris_config import IrisVectorConfig
from .vdb.lindorm_config import LindormConfig
@@ -347,6 +348,7 @@ class MiddlewareConfig(
AnalyticdbConfig,
ChromaConfig,
ClickzettaConfig,
HologresConfig,
HuaweiCloudConfig,
IrisVectorConfig,
MilvusConfig,

View File

@@ -0,0 +1,68 @@
from holo_search_sdk.types import BaseQuantizationType, DistanceType, TokenizerType
from pydantic import Field
from pydantic_settings import BaseSettings
class HologresConfig(BaseSettings):
"""
Configuration settings for Hologres vector database.
Hologres is compatible with PostgreSQL protocol.
access_key_id is used as the PostgreSQL username,
and access_key_secret is used as the PostgreSQL password.
"""
HOLOGRES_HOST: str | None = Field(
description="Hostname or IP address of the Hologres instance.",
default=None,
)
HOLOGRES_PORT: int = Field(
description="Port number for connecting to the Hologres instance.",
default=80,
)
HOLOGRES_DATABASE: str | None = Field(
description="Name of the Hologres database to connect to.",
default=None,
)
HOLOGRES_ACCESS_KEY_ID: str | None = Field(
description="Alibaba Cloud AccessKey ID, also used as the PostgreSQL username.",
default=None,
)
HOLOGRES_ACCESS_KEY_SECRET: str | None = Field(
description="Alibaba Cloud AccessKey Secret, also used as the PostgreSQL password.",
default=None,
)
HOLOGRES_SCHEMA: str = Field(
description="Schema name in the Hologres database.",
default="public",
)
HOLOGRES_TOKENIZER: TokenizerType = Field(
description="Tokenizer for full-text search index (e.g., 'jieba', 'ik', 'standard', 'simple').",
default="jieba",
)
HOLOGRES_DISTANCE_METHOD: DistanceType = Field(
description="Distance method for vector index (e.g., 'Cosine', 'Euclidean', 'InnerProduct').",
default="Cosine",
)
HOLOGRES_BASE_QUANTIZATION_TYPE: BaseQuantizationType = Field(
description="Base quantization type for vector index (e.g., 'rabitq', 'sq8', 'fp16', 'fp32').",
default="rabitq",
)
HOLOGRES_MAX_DEGREE: int = Field(
description="Max degree (M) parameter for HNSW vector index.",
default=64,
)
HOLOGRES_EF_CONSTRUCTION: int = Field(
description="ef_construction parameter for HNSW vector index.",
default=400,
)

View File

@@ -25,7 +25,8 @@ from controllers.console.wraps import (
)
from core.ops.ops_trace_manager import OpsTraceManager
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from dify_graph.enums import NodeType, WorkflowExecutionStatus
from core.trigger.constants import TRIGGER_NODE_TYPES
from dify_graph.enums import WorkflowExecutionStatus
from dify_graph.file import helpers as file_helpers
from extensions.ext_database import db
from libs.login import current_account_with_tenant, login_required
@@ -508,11 +509,7 @@ class AppListApi(Resource):
.scalars()
.all()
)
trigger_node_types = {
NodeType.TRIGGER_WEBHOOK,
NodeType.TRIGGER_SCHEDULE,
NodeType.TRIGGER_PLUGIN,
}
trigger_node_types = TRIGGER_NODE_TYPES
for workflow in draft_workflows:
node_id = None
try:

View File

@@ -22,6 +22,7 @@ from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY
from core.app.entities.app_invoke_entities import InvokeFrom
from core.helper.trace_id_helper import get_external_trace_id
from core.plugin.impl.exc import PluginInvokeError
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
from core.trigger.debug.event_selectors import (
TriggerDebugEvent,
TriggerDebugEventPoller,
@@ -1209,7 +1210,7 @@ class DraftWorkflowTriggerNodeApi(Resource):
node_type: NodeType = draft_workflow.get_node_type_from_node_config(node_config)
event: TriggerDebugEvent | None = None
# for schedule trigger, when run single node, just execute directly
if node_type == NodeType.TRIGGER_SCHEDULE:
if node_type == TRIGGER_SCHEDULE_NODE_TYPE:
event = TriggerDebugEvent(
workflow_args={},
node_id=node_id,

View File

@@ -263,6 +263,7 @@ def _get_retrieval_methods_by_vector_type(vector_type: str | None, is_mock: bool
VectorType.BAIDU,
VectorType.ALIBABACLOUD_MYSQL,
VectorType.IRIS,
VectorType.HOLOGRES,
}
semantic_methods = {"retrieval_method": [RetrievalMethod.SEMANTIC_SEARCH.value]}

View File

@@ -3,7 +3,7 @@ import time
from collections.abc import Callable
from enum import StrEnum, auto
from functools import wraps
from typing import Concatenate, ParamSpec, TypeVar, cast
from typing import Concatenate, ParamSpec, TypeVar, cast, overload
from flask import current_app, request
from flask_login import user_logged_in
@@ -44,10 +44,22 @@ class FetchUserArg(BaseModel):
required: bool = False
def validate_app_token(view: Callable[P, R] | None = None, *, fetch_user_arg: FetchUserArg | None = None):
def decorator(view_func: Callable[P, R]):
@overload
def validate_app_token(view: Callable[P, R]) -> Callable[P, R]: ...
@overload
def validate_app_token(
view: None = None, *, fetch_user_arg: FetchUserArg | None = None
) -> Callable[[Callable[P, R]], Callable[P, R]]: ...
def validate_app_token(
view: Callable[P, R] | None = None, *, fetch_user_arg: FetchUserArg | None = None
) -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]:
def decorator(view_func: Callable[P, R]) -> Callable[P, R]:
@wraps(view_func)
def decorated_view(*args: P.args, **kwargs: P.kwargs):
def decorated_view(*args: P.args, **kwargs: P.kwargs) -> R:
api_token = validate_and_get_api_token("app")
app_model = db.session.query(App).where(App.id == api_token.app_id).first()
@@ -213,10 +225,20 @@ def cloud_edition_billing_rate_limit_check(resource: str, api_token_type: str):
return interceptor
def validate_dataset_token(view: Callable[Concatenate[T, P], R] | None = None):
def decorator(view: Callable[Concatenate[T, P], R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
@overload
def validate_dataset_token(view: Callable[Concatenate[T, P], R]) -> Callable[P, R]: ...
@overload
def validate_dataset_token(view: None = None) -> Callable[[Callable[Concatenate[T, P], R]], Callable[P, R]]: ...
def validate_dataset_token(
view: Callable[Concatenate[T, P], R] | None = None,
) -> Callable[P, R] | Callable[[Callable[Concatenate[T, P], R]], Callable[P, R]]:
def decorator(view_func: Callable[Concatenate[T, P], R]) -> Callable[P, R]:
@wraps(view_func)
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
api_token = validate_and_get_api_token("dataset")
# get url path dataset_id from positional args or kwargs
@@ -287,7 +309,7 @@ def validate_dataset_token(view: Callable[Concatenate[T, P], R] | None = None):
raise Unauthorized("Tenant owner account does not exist.")
else:
raise Unauthorized("Tenant does not exist.")
return view(api_token.tenant_id, *args, **kwargs)
return view_func(api_token.tenant_id, *args, **kwargs) # type: ignore[arg-type]
return decorated

View File

@@ -69,7 +69,7 @@ from dify_graph.entities.pause_reason import HumanInputRequired
from dify_graph.enums import WorkflowExecutionStatus
from dify_graph.model_runtime.entities.llm_entities import LLMUsage
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from dify_graph.nodes import NodeType
from dify_graph.nodes import BuiltinNodeTypes
from dify_graph.repositories.draft_variable_repository import DraftVariableSaverFactory
from dify_graph.runtime import GraphRuntimeState
from dify_graph.system_variable import SystemVariable
@@ -357,7 +357,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
) -> Generator[StreamResponse, None, None]:
"""Handle node succeeded events."""
# Record files if it's an answer node or end node
if event.node_type in [NodeType.ANSWER, NodeType.END, NodeType.LLM]:
if event.node_type in [BuiltinNodeTypes.ANSWER, BuiltinNodeTypes.END, BuiltinNodeTypes.LLM]:
self._recorded_files.extend(
self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {})
)

View File

@@ -48,12 +48,13 @@ from core.app.entities.task_entities import (
from core.plugin.impl.datasource import PluginDatasourceManager
from core.tools.entities.tool_entities import ToolProviderType
from core.tools.tool_manager import ToolManager
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from core.trigger.trigger_manager import TriggerManager
from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.entities.pause_reason import HumanInputRequired
from dify_graph.entities.workflow_start_reason import WorkflowStartReason
from dify_graph.enums import (
NodeType,
BuiltinNodeTypes,
SystemVariableKey,
WorkflowExecutionStatus,
WorkflowNodeExecutionMetadataKey,
@@ -442,7 +443,7 @@ class WorkflowResponseConverter:
event: QueueNodeStartedEvent,
task_id: str,
) -> NodeStartStreamResponse | None:
if event.node_type in {NodeType.ITERATION, NodeType.LOOP}:
if event.node_type in {BuiltinNodeTypes.ITERATION, BuiltinNodeTypes.LOOP}:
return None
run_id = self._ensure_workflow_run_id()
snapshot = self._store_snapshot(event)
@@ -464,13 +465,13 @@ class WorkflowResponseConverter:
)
try:
if event.node_type == NodeType.TOOL:
if event.node_type == BuiltinNodeTypes.TOOL:
response.data.extras["icon"] = ToolManager.get_tool_icon(
tenant_id=self._application_generate_entity.app_config.tenant_id,
provider_type=ToolProviderType(event.provider_type),
provider_id=event.provider_id,
)
elif event.node_type == NodeType.DATASOURCE:
elif event.node_type == BuiltinNodeTypes.DATASOURCE:
manager = PluginDatasourceManager()
provider_entity = manager.fetch_datasource_provider(
self._application_generate_entity.app_config.tenant_id,
@@ -479,7 +480,7 @@ class WorkflowResponseConverter:
response.data.extras["icon"] = provider_entity.declaration.identity.generate_datasource_icon_url(
self._application_generate_entity.app_config.tenant_id
)
elif event.node_type == NodeType.TRIGGER_PLUGIN:
elif event.node_type == TRIGGER_PLUGIN_NODE_TYPE:
response.data.extras["icon"] = TriggerManager.get_trigger_plugin_icon(
self._application_generate_entity.app_config.tenant_id,
event.provider_id,
@@ -496,7 +497,7 @@ class WorkflowResponseConverter:
event: QueueNodeSucceededEvent | QueueNodeFailedEvent | QueueNodeExceptionEvent,
task_id: str,
) -> NodeFinishStreamResponse | None:
if event.node_type in {NodeType.ITERATION, NodeType.LOOP}:
if event.node_type in {BuiltinNodeTypes.ITERATION, BuiltinNodeTypes.LOOP}:
return None
run_id = self._ensure_workflow_run_id()
snapshot = self._pop_snapshot(event.node_execution_id)
@@ -554,7 +555,7 @@ class WorkflowResponseConverter:
event: QueueNodeRetryEvent,
task_id: str,
) -> NodeRetryStreamResponse | None:
if event.node_type in {NodeType.ITERATION, NodeType.LOOP}:
if event.node_type in {BuiltinNodeTypes.ITERATION, BuiltinNodeTypes.LOOP}:
return None
run_id = self._ensure_workflow_run_id()
@@ -612,7 +613,7 @@ class WorkflowResponseConverter:
data=IterationNodeStartStreamResponse.Data(
id=event.node_id,
node_id=event.node_id,
node_type=event.node_type.value,
node_type=event.node_type,
title=event.node_title,
created_at=int(time.time()),
extras={},
@@ -635,7 +636,7 @@ class WorkflowResponseConverter:
data=IterationNodeNextStreamResponse.Data(
id=event.node_id,
node_id=event.node_id,
node_type=event.node_type.value,
node_type=event.node_type,
title=event.node_title,
index=event.index,
created_at=int(time.time()),
@@ -662,7 +663,7 @@ class WorkflowResponseConverter:
data=IterationNodeCompletedStreamResponse.Data(
id=event.node_id,
node_id=event.node_id,
node_type=event.node_type.value,
node_type=event.node_type,
title=event.node_title,
outputs=new_outputs,
outputs_truncated=outputs_truncated,
@@ -692,7 +693,7 @@ class WorkflowResponseConverter:
data=LoopNodeStartStreamResponse.Data(
id=event.node_id,
node_id=event.node_id,
node_type=event.node_type.value,
node_type=event.node_type,
title=event.node_title,
created_at=int(time.time()),
extras={},
@@ -715,7 +716,7 @@ class WorkflowResponseConverter:
data=LoopNodeNextStreamResponse.Data(
id=event.node_id,
node_id=event.node_id,
node_type=event.node_type.value,
node_type=event.node_type,
title=event.node_title,
index=event.index,
# The `pre_loop_output` field is not utilized by the frontend.
@@ -744,7 +745,7 @@ class WorkflowResponseConverter:
data=LoopNodeCompletedStreamResponse.Data(
id=event.node_id,
node_id=event.node_id,
node_type=event.node_type.value,
node_type=event.node_type,
title=event.node_title,
outputs=new_outputs,
outputs_truncated=outputs_truncated,

View File

@@ -12,7 +12,7 @@ from core.app.entities.app_invoke_entities import (
build_dify_run_context,
)
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.workflow.node_factory import DifyNodeFactory
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id
from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.entities.graph_init_params import GraphInitParams
from dify_graph.enums import WorkflowType
@@ -274,6 +274,8 @@ class PipelineRunner(WorkflowBasedAppRunner):
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)
if start_node_id is None:
start_node_id = get_default_root_node_id(graph_config)
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=start_node_id)
if not graph:

View File

@@ -32,8 +32,8 @@ from core.app.entities.queue_entities import (
QueueWorkflowStartedEvent,
QueueWorkflowSucceededEvent,
)
from core.workflow.node_factory import DifyNodeFactory
from core.workflow.node_resolution import resolve_workflow_node_class
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id, resolve_workflow_node_class
from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDictAdapter
@@ -140,6 +140,9 @@ class WorkflowBasedAppRunner:
graph_runtime_state=graph_runtime_state,
)
if root_node_id is None:
root_node_id = get_default_root_node_id(graph_config)
# init graph
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=root_node_id)
@@ -505,7 +508,9 @@ class WorkflowBasedAppRunner:
elif isinstance(event, NodeRunRetrieverResourceEvent):
self._publish_event(
QueueRetrieverResourcesEvent(
retriever_resources=event.retriever_resources,
retriever_resources=[
RetrievalSourceMetadata.model_validate(resource) for resource in event.retriever_resources
],
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
)

View File

@@ -9,9 +9,8 @@ from core.app.entities.agent_strategy import AgentStrategyInfo
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from dify_graph.entities.pause_reason import PauseReason
from dify_graph.entities.workflow_start_reason import WorkflowStartReason
from dify_graph.enums import WorkflowNodeExecutionMetadataKey
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
from dify_graph.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
from dify_graph.nodes import NodeType
class QueueEvent(StrEnum):

View File

@@ -2,7 +2,7 @@ import logging
from dify_graph.constants import CONVERSATION_VARIABLE_NODE_ID
from dify_graph.conversation_variable_updater import ConversationVariableUpdater
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_events import GraphEngineEvent, NodeRunSucceededEvent
from dify_graph.nodes.variable_assigner.common import helpers as common_helpers
@@ -22,7 +22,7 @@ class ConversationVariablePersistenceLayer(GraphEngineLayer):
def on_event(self, event: GraphEngineEvent) -> None:
if not isinstance(event, NodeRunSucceededEvent):
return
if event.node_type != NodeType.VARIABLE_ASSIGNER:
if event.node_type != BuiltinNodeTypes.VARIABLE_ASSIGNER:
return
if self.graph_runtime_state is None:
return

View File

@@ -12,7 +12,7 @@ from typing_extensions import override
from core.app.llm import deduct_llm_quota, ensure_llm_quota_available
from core.errors.error import QuotaExceededError
from core.model_manager import ModelInstance
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes
from dify_graph.graph_engine.entities.commands import AbortCommand, CommandType
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_events import GraphEngineEvent, GraphNodeEventBase
@@ -113,11 +113,11 @@ class LLMQuotaLayer(GraphEngineLayer):
def _extract_model_instance(node: Node) -> ModelInstance | None:
try:
match node.node_type:
case NodeType.LLM:
case BuiltinNodeTypes.LLM:
return cast("LLMNode", node).model_instance
case NodeType.PARAMETER_EXTRACTOR:
case BuiltinNodeTypes.PARAMETER_EXTRACTOR:
return cast("ParameterExtractorNode", node).model_instance
case NodeType.QUESTION_CLASSIFIER:
case BuiltinNodeTypes.QUESTION_CLASSIFIER:
return cast("QuestionClassifierNode", node).model_instance
case _:
return None

View File

@@ -16,7 +16,7 @@ from opentelemetry.trace import Span, SpanKind, Tracer, get_tracer, set_span_in_
from typing_extensions import override
from configs import dify_config
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_events import GraphNodeEventBase
from dify_graph.nodes.base.node import Node
@@ -74,16 +74,13 @@ class ObservabilityLayer(GraphEngineLayer):
def _build_parser_registry(self) -> None:
"""Initialize parser registry for node types."""
self._parsers = {
NodeType.TOOL: ToolNodeOTelParser(),
NodeType.LLM: LLMNodeOTelParser(),
NodeType.KNOWLEDGE_RETRIEVAL: RetrievalNodeOTelParser(),
BuiltinNodeTypes.TOOL: ToolNodeOTelParser(),
BuiltinNodeTypes.LLM: LLMNodeOTelParser(),
BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL: RetrievalNodeOTelParser(),
}
def _get_parser(self, node: Node) -> NodeOTelParser:
node_type = getattr(node, "node_type", None)
if isinstance(node_type, NodeType):
return self._parsers.get(node_type, self._default_parser)
return self._default_parser
return self._parsers.get(node.node_type, self._default_parser)
@override
def on_graph_start(self) -> None:

View File

@@ -24,12 +24,12 @@ from core.datasource.utils.message_transformer import DatasourceFileMessageTrans
from core.datasource.website_crawl.website_crawl_provider import WebsiteCrawlDatasourcePluginProviderController
from core.db.session_factory import session_factory
from core.plugin.impl.datasource import PluginDatasourceManager
from core.workflow.nodes.datasource.entities import DatasourceParameter, OnlineDriveDownloadFileParam
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import WorkflowNodeExecutionMetadataKey
from dify_graph.file import File
from dify_graph.file.enums import FileTransferMethod, FileType
from dify_graph.node_events import NodeRunResult, StreamChunkEvent, StreamCompletedEvent
from dify_graph.repositories.datasource_manager_protocol import DatasourceParameter, OnlineDriveDownloadFileParam
from factories import file_factory
from models.model import UploadFile
from models.tools import ToolFile

View File

@@ -58,7 +58,7 @@ from core.ops.entities.trace_entity import (
)
from core.repositories import DifyCoreRepositoryFactory
from dify_graph.entities import WorkflowNodeExecution
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
from extensions.ext_database import db
from models import WorkflowNodeExecutionTriggeredFrom
@@ -302,11 +302,11 @@ class AliyunDataTrace(BaseTraceInstance):
self, node_execution: WorkflowNodeExecution, trace_info: WorkflowTraceInfo, trace_metadata: TraceMetadata
):
try:
if node_execution.node_type == NodeType.LLM:
if node_execution.node_type == BuiltinNodeTypes.LLM:
node_span = self.build_workflow_llm_span(trace_info, node_execution, trace_metadata)
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
elif node_execution.node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
node_span = self.build_workflow_retrieval_span(trace_info, node_execution, trace_metadata)
elif node_execution.node_type == NodeType.TOOL:
elif node_execution.node_type == BuiltinNodeTypes.TOOL:
node_span = self.build_workflow_tool_span(trace_info, node_execution, trace_metadata)
else:
node_span = self.build_workflow_task_span(trace_info, node_execution, trace_metadata)

View File

@@ -155,8 +155,8 @@ def wrap_span_metadata(metadata, **kwargs):
return metadata
# Mapping from NodeType string values to OpenInference span kinds.
# NodeType values not listed here default to CHAIN.
# Mapping from built-in node type strings to OpenInference span kinds.
# Node types not listed here default to CHAIN.
_NODE_TYPE_TO_SPAN_KIND: dict[str, OpenInferenceSpanKindValues] = {
"llm": OpenInferenceSpanKindValues.LLM,
"knowledge-retrieval": OpenInferenceSpanKindValues.RETRIEVER,
@@ -168,7 +168,7 @@ _NODE_TYPE_TO_SPAN_KIND: dict[str, OpenInferenceSpanKindValues] = {
def _get_node_span_kind(node_type: str) -> OpenInferenceSpanKindValues:
"""Return the OpenInference span kind for a given workflow node type.
Covers every ``NodeType`` enum value. Nodes that do not have a
Covers every built-in node type string. Nodes that do not have a
specialised span kind (e.g. ``start``, ``end``, ``if-else``,
``code``, ``loop``, ``iteration``, etc.) are mapped to ``CHAIN``.
"""

View File

@@ -28,7 +28,7 @@ from core.ops.langfuse_trace.entities.langfuse_trace_entity import (
)
from core.ops.utils import filter_none_values
from core.repositories import DifyCoreRepositoryFactory
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes
from extensions.ext_database import db
from models import EndUser, WorkflowNodeExecutionTriggeredFrom
from models.enums import MessageStatus
@@ -141,7 +141,7 @@ class LangFuseDataTrace(BaseTraceInstance):
node_name = node_execution.title
node_type = node_execution.node_type
status = node_execution.status
if node_type == NodeType.LLM:
if node_type == BuiltinNodeTypes.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else:
inputs = node_execution.inputs or {}

View File

@@ -28,7 +28,7 @@ from core.ops.langsmith_trace.entities.langsmith_trace_entity import (
)
from core.ops.utils import filter_none_values, generate_dotted_order
from core.repositories import DifyCoreRepositoryFactory
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
from extensions.ext_database import db
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
@@ -163,7 +163,7 @@ class LangSmithDataTrace(BaseTraceInstance):
node_name = node_execution.title
node_type = node_execution.node_type
status = node_execution.status
if node_type == NodeType.LLM:
if node_type == BuiltinNodeTypes.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else:
inputs = node_execution.inputs or {}
@@ -197,7 +197,7 @@ class LangSmithDataTrace(BaseTraceInstance):
"ls_model_name": process_data.get("model_name", ""),
}
)
elif node_type == NodeType.KNOWLEDGE_RETRIEVAL:
elif node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
run_type = LangSmithRunType.retriever
else:
run_type = LangSmithRunType.tool

View File

@@ -23,7 +23,7 @@ from core.ops.entities.trace_entity import (
TraceTaskName,
WorkflowTraceInfo,
)
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes
from extensions.ext_database import db
from models import EndUser
from models.workflow import WorkflowNodeExecutionModel
@@ -145,10 +145,10 @@ class MLflowDataTrace(BaseTraceInstance):
"app_name": node.title,
}
if node.node_type in (NodeType.LLM, NodeType.QUESTION_CLASSIFIER):
if node.node_type in (BuiltinNodeTypes.LLM, BuiltinNodeTypes.QUESTION_CLASSIFIER):
inputs, llm_attributes = self._parse_llm_inputs_and_attributes(node)
attributes.update(llm_attributes)
elif node.node_type == NodeType.HTTP_REQUEST:
elif node.node_type == BuiltinNodeTypes.HTTP_REQUEST:
inputs = node.process_data # contains request URL
if not inputs:
@@ -180,9 +180,9 @@ class MLflowDataTrace(BaseTraceInstance):
# End node span
finished_at = node.created_at + timedelta(seconds=node.elapsed_time)
outputs = json.loads(node.outputs) if node.outputs else {}
if node.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
if node.node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
outputs = self._parse_knowledge_retrieval_outputs(outputs)
elif node.node_type == NodeType.LLM:
elif node.node_type == BuiltinNodeTypes.LLM:
outputs = outputs.get("text", outputs)
node_span.end(
outputs=outputs,
@@ -471,13 +471,13 @@ class MLflowDataTrace(BaseTraceInstance):
def _get_node_span_type(self, node_type: str) -> str:
"""Map Dify node types to MLflow span types"""
node_type_mapping = {
NodeType.LLM: SpanType.LLM,
NodeType.QUESTION_CLASSIFIER: SpanType.LLM,
NodeType.KNOWLEDGE_RETRIEVAL: SpanType.RETRIEVER,
NodeType.TOOL: SpanType.TOOL,
NodeType.CODE: SpanType.TOOL,
NodeType.HTTP_REQUEST: SpanType.TOOL,
NodeType.AGENT: SpanType.AGENT,
BuiltinNodeTypes.LLM: SpanType.LLM,
BuiltinNodeTypes.QUESTION_CLASSIFIER: SpanType.LLM,
BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL: SpanType.RETRIEVER,
BuiltinNodeTypes.TOOL: SpanType.TOOL,
BuiltinNodeTypes.CODE: SpanType.TOOL,
BuiltinNodeTypes.HTTP_REQUEST: SpanType.TOOL,
BuiltinNodeTypes.AGENT: SpanType.AGENT,
}
return node_type_mapping.get(node_type, "CHAIN") # type: ignore[arg-type,call-overload]

View File

@@ -23,7 +23,7 @@ from core.ops.entities.trace_entity import (
WorkflowTraceInfo,
)
from core.repositories import DifyCoreRepositoryFactory
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
from extensions.ext_database import db
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
@@ -187,7 +187,7 @@ class OpikDataTrace(BaseTraceInstance):
node_name = node_execution.title
node_type = node_execution.node_type
status = node_execution.status
if node_type == NodeType.LLM:
if node_type == BuiltinNodeTypes.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else:
inputs = node_execution.inputs or {}

View File

@@ -27,7 +27,7 @@ from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from dify_graph.entities.workflow_node_execution import (
WorkflowNodeExecution,
)
from dify_graph.nodes import NodeType
from dify_graph.nodes import BuiltinNodeTypes
from extensions.ext_database import db
from models import Account, App, TenantAccountJoin, WorkflowNodeExecutionTriggeredFrom
@@ -179,7 +179,7 @@ class TencentDataTrace(BaseTraceInstance):
if node_span:
self.trace_client.add_span(node_span)
if node_execution.node_type == NodeType.LLM:
if node_execution.node_type == BuiltinNodeTypes.LLM:
self._record_llm_metrics(node_execution)
except Exception:
logger.exception("[Tencent APM] Failed to process node execution: %s", node_execution.id)
@@ -192,15 +192,15 @@ class TencentDataTrace(BaseTraceInstance):
) -> SpanData | None:
"""Build span for different node types"""
try:
if node_execution.node_type == NodeType.LLM:
if node_execution.node_type == BuiltinNodeTypes.LLM:
return TencentSpanBuilder.build_workflow_llm_span(
trace_id, workflow_span_id, trace_info, node_execution
)
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
elif node_execution.node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
return TencentSpanBuilder.build_workflow_retrieval_span(
trace_id, workflow_span_id, trace_info, node_execution
)
elif node_execution.node_type == NodeType.TOOL:
elif node_execution.node_type == BuiltinNodeTypes.TOOL:
return TencentSpanBuilder.build_workflow_tool_span(
trace_id, workflow_span_id, trace_info, node_execution
)

View File

@@ -31,7 +31,7 @@ from core.ops.entities.trace_entity import (
)
from core.ops.weave_trace.entities.weave_trace_entity import WeaveTraceModel
from core.repositories import DifyCoreRepositoryFactory
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
from extensions.ext_database import db
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
@@ -175,7 +175,7 @@ class WeaveDataTrace(BaseTraceInstance):
node_name = node_execution.title
node_type = node_execution.node_type
status = node_execution.status
if node_type == NodeType.LLM:
if node_type == BuiltinNodeTypes.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else:
inputs = node_execution.inputs or {}

View File

@@ -1,5 +1,5 @@
from core.plugin.backwards_invocation.base import BaseBackwardsInvocation
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes
from dify_graph.nodes.parameter_extractor.entities import (
ModelConfig as ParameterExtractorModelConfig,
)
@@ -52,7 +52,7 @@ class PluginNodeBackwardsInvocation(BaseBackwardsInvocation):
instruction=instruction, # instruct with variables are not supported
)
node_data_dict = node_data.model_dump()
node_data_dict["type"] = NodeType.PARAMETER_EXTRACTOR
node_data_dict["type"] = BuiltinNodeTypes.PARAMETER_EXTRACTOR
execution = workflow_service.run_free_workflow_node(
node_data_dict,
tenant_id=tenant_id,

View File

@@ -305,9 +305,7 @@ class ProviderManager:
available_models = provider_configurations.get_models(model_type=model_type, only_active=True)
if available_models:
available_model = next(
(model for model in available_models if model.model == "gpt-4"), available_models[0]
)
available_model = available_models[0]
default_model = TenantDefaultModel(
tenant_id=tenant_id,

View File

@@ -0,0 +1,361 @@
import json
import logging
import time
from typing import Any
import holo_search_sdk as holo # type: ignore
from holo_search_sdk.types import BaseQuantizationType, DistanceType, TokenizerType
from psycopg import sql as psql
from pydantic import BaseModel, model_validator
from configs import dify_config
from core.rag.datasource.vdb.vector_base import BaseVector
from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory
from core.rag.datasource.vdb.vector_type import VectorType
from core.rag.embedding.embedding_base import Embeddings
from core.rag.models.document import Document
from extensions.ext_redis import redis_client
from models.dataset import Dataset
logger = logging.getLogger(__name__)
class HologresVectorConfig(BaseModel):
"""
Configuration for Hologres vector database connection.
In Hologres, access_key_id is used as the PostgreSQL username,
and access_key_secret is used as the PostgreSQL password.
"""
host: str
port: int = 80
database: str
access_key_id: str
access_key_secret: str
schema_name: str = "public"
tokenizer: TokenizerType = "jieba"
distance_method: DistanceType = "Cosine"
base_quantization_type: BaseQuantizationType = "rabitq"
max_degree: int = 64
ef_construction: int = 400
@model_validator(mode="before")
@classmethod
def validate_config(cls, values: dict):
if not values.get("host"):
raise ValueError("config HOLOGRES_HOST is required")
if not values.get("database"):
raise ValueError("config HOLOGRES_DATABASE is required")
if not values.get("access_key_id"):
raise ValueError("config HOLOGRES_ACCESS_KEY_ID is required")
if not values.get("access_key_secret"):
raise ValueError("config HOLOGRES_ACCESS_KEY_SECRET is required")
return values
class HologresVector(BaseVector):
"""
Hologres vector storage implementation using holo-search-sdk.
Supports semantic search (vector), full-text search, and hybrid search.
"""
def __init__(self, collection_name: str, config: HologresVectorConfig):
super().__init__(collection_name)
self._config = config
self._client = self._init_client(config)
self.table_name = f"embedding_{collection_name}".lower()
def _init_client(self, config: HologresVectorConfig):
"""Initialize and return a holo-search-sdk client."""
client = holo.connect(
host=config.host,
port=config.port,
database=config.database,
access_key_id=config.access_key_id,
access_key_secret=config.access_key_secret,
schema=config.schema_name,
)
client.connect()
return client
def get_type(self) -> str:
return VectorType.HOLOGRES
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
"""Create collection table with vector and full-text indexes, then add texts."""
dimension = len(embeddings[0])
self._create_collection(dimension)
self.add_texts(texts, embeddings)
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
"""Add texts with embeddings to the collection using batch upsert."""
if not documents:
return []
pks: list[str] = []
batch_size = 100
for i in range(0, len(documents), batch_size):
batch_docs = documents[i : i + batch_size]
batch_embeddings = embeddings[i : i + batch_size]
values = []
column_names = ["id", "text", "meta", "embedding"]
for j, doc in enumerate(batch_docs):
doc_id = doc.metadata.get("doc_id", "") if doc.metadata else ""
pks.append(doc_id)
values.append(
[
doc_id,
doc.page_content,
json.dumps(doc.metadata or {}),
batch_embeddings[j],
]
)
table = self._client.open_table(self.table_name)
table.upsert_multi(
index_column="id",
values=values,
column_names=column_names,
update=True,
update_columns=["text", "meta", "embedding"],
)
return pks
def text_exists(self, id: str) -> bool:
"""Check if a text with the given doc_id exists in the collection."""
if not self._client.check_table_exist(self.table_name):
return False
result = self._client.execute(
psql.SQL("SELECT 1 FROM {} WHERE id = {} LIMIT 1").format(
psql.Identifier(self.table_name), psql.Literal(id)
),
fetch_result=True,
)
return bool(result)
def get_ids_by_metadata_field(self, key: str, value: str) -> list[str] | None:
"""Get document IDs by metadata field key and value."""
result = self._client.execute(
psql.SQL("SELECT id FROM {} WHERE meta->>{} = {}").format(
psql.Identifier(self.table_name), psql.Literal(key), psql.Literal(value)
),
fetch_result=True,
)
if result:
return [row[0] for row in result]
return None
def delete_by_ids(self, ids: list[str]):
"""Delete documents by their doc_id list."""
if not ids:
return
if not self._client.check_table_exist(self.table_name):
return
self._client.execute(
psql.SQL("DELETE FROM {} WHERE id IN ({})").format(
psql.Identifier(self.table_name),
psql.SQL(", ").join(psql.Literal(id) for id in ids),
)
)
def delete_by_metadata_field(self, key: str, value: str):
"""Delete documents by metadata field key and value."""
if not self._client.check_table_exist(self.table_name):
return
self._client.execute(
psql.SQL("DELETE FROM {} WHERE meta->>{} = {}").format(
psql.Identifier(self.table_name), psql.Literal(key), psql.Literal(value)
)
)
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
"""Search for documents by vector similarity."""
if not self._client.check_table_exist(self.table_name):
return []
top_k = kwargs.get("top_k", 4)
score_threshold = float(kwargs.get("score_threshold") or 0.0)
table = self._client.open_table(self.table_name)
query = (
table.search_vector(
vector=query_vector,
column="embedding",
distance_method=self._config.distance_method,
output_name="distance",
)
.select(["id", "text", "meta"])
.limit(top_k)
)
# Apply document_ids_filter if provided
document_ids_filter = kwargs.get("document_ids_filter")
if document_ids_filter:
filter_sql = psql.SQL("meta->>'document_id' IN ({})").format(
psql.SQL(", ").join(psql.Literal(id) for id in document_ids_filter)
)
query = query.where(filter_sql)
results = query.fetchall()
return self._process_vector_results(results, score_threshold)
def _process_vector_results(self, results: list, score_threshold: float) -> list[Document]:
"""Process vector search results into Document objects."""
docs = []
for row in results:
# row format: (distance, id, text, meta)
# distance is first because search_vector() adds the computed column before selected columns
distance = row[0]
text = row[2]
meta = row[3]
if isinstance(meta, str):
meta = json.loads(meta)
# Convert distance to similarity score (consistent with pgvector)
score = 1 - distance
meta["score"] = score
if score >= score_threshold:
docs.append(Document(page_content=text, metadata=meta))
return docs
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
"""Search for documents by full-text search."""
if not self._client.check_table_exist(self.table_name):
return []
top_k = kwargs.get("top_k", 4)
table = self._client.open_table(self.table_name)
search_query = table.search_text(
column="text",
expression=query,
return_score=True,
return_score_name="score",
return_all_columns=True,
).limit(top_k)
# Apply document_ids_filter if provided
document_ids_filter = kwargs.get("document_ids_filter")
if document_ids_filter:
filter_sql = psql.SQL("meta->>'document_id' IN ({})").format(
psql.SQL(", ").join(psql.Literal(id) for id in document_ids_filter)
)
search_query = search_query.where(filter_sql)
results = search_query.fetchall()
return self._process_full_text_results(results)
def _process_full_text_results(self, results: list) -> list[Document]:
"""Process full-text search results into Document objects."""
docs = []
for row in results:
# row format: (id, text, meta, embedding, score)
text = row[1]
meta = row[2]
score = row[-1] # score is the last column from return_score
if isinstance(meta, str):
meta = json.loads(meta)
meta["score"] = score
docs.append(Document(page_content=text, metadata=meta))
return docs
def delete(self):
"""Delete the entire collection table."""
if self._client.check_table_exist(self.table_name):
self._client.drop_table(self.table_name)
def _create_collection(self, dimension: int):
"""Create the collection table with vector and full-text indexes."""
lock_name = f"vector_indexing_lock_{self._collection_name}"
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
return
if not self._client.check_table_exist(self.table_name):
# Create table via SQL with CHECK constraint for vector dimension
create_table_sql = psql.SQL("""
CREATE TABLE IF NOT EXISTS {} (
id TEXT PRIMARY KEY,
text TEXT NOT NULL,
meta JSONB NOT NULL,
embedding float4[] NOT NULL
CHECK (array_ndims(embedding) = 1
AND array_length(embedding, 1) = {})
);
""").format(psql.Identifier(self.table_name), psql.Literal(dimension))
self._client.execute(create_table_sql)
# Wait for table to be fully ready before creating indexes
max_wait_seconds = 30
poll_interval = 2
for _ in range(max_wait_seconds // poll_interval):
if self._client.check_table_exist(self.table_name):
break
time.sleep(poll_interval)
else:
raise RuntimeError(f"Table {self.table_name} was not ready after {max_wait_seconds}s")
# Open table and set vector index
table = self._client.open_table(self.table_name)
table.set_vector_index(
column="embedding",
distance_method=self._config.distance_method,
base_quantization_type=self._config.base_quantization_type,
max_degree=self._config.max_degree,
ef_construction=self._config.ef_construction,
use_reorder=self._config.base_quantization_type == "rabitq",
)
# Create full-text search index
table.create_text_index(
index_name=f"ft_idx_{self._collection_name}",
column="text",
tokenizer=self._config.tokenizer,
)
redis_client.set(collection_exist_cache_key, 1, ex=3600)
class HologresVectorFactory(AbstractVectorFactory):
"""Factory class for creating HologresVector instances."""
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> HologresVector:
if dataset.index_struct_dict:
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
collection_name = class_prefix
else:
dataset_id = dataset.id
collection_name = Dataset.gen_collection_name_by_id(dataset_id)
dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.HOLOGRES, collection_name))
return HologresVector(
collection_name=collection_name,
config=HologresVectorConfig(
host=dify_config.HOLOGRES_HOST or "",
port=dify_config.HOLOGRES_PORT,
database=dify_config.HOLOGRES_DATABASE or "",
access_key_id=dify_config.HOLOGRES_ACCESS_KEY_ID or "",
access_key_secret=dify_config.HOLOGRES_ACCESS_KEY_SECRET or "",
schema_name=dify_config.HOLOGRES_SCHEMA,
tokenizer=dify_config.HOLOGRES_TOKENIZER,
distance_method=dify_config.HOLOGRES_DISTANCE_METHOD,
base_quantization_type=dify_config.HOLOGRES_BASE_QUANTIZATION_TYPE,
max_degree=dify_config.HOLOGRES_MAX_DEGREE,
ef_construction=dify_config.HOLOGRES_EF_CONSTRUCTION,
),
)

View File

@@ -135,8 +135,8 @@ class PGVectoRS(BaseVector):
def get_ids_by_metadata_field(self, key: str, value: str):
result = None
with Session(self._client) as session:
select_statement = sql_text(f"SELECT id FROM {self._collection_name} WHERE meta->>'{key}' = '{value}'; ")
result = session.execute(select_statement).fetchall()
select_statement = sql_text(f"SELECT id FROM {self._collection_name} WHERE meta->>:key = :value")
result = session.execute(select_statement, {"key": key, "value": value}).fetchall()
if result:
return [item[0] for item in result]
else:
@@ -172,9 +172,9 @@ class PGVectoRS(BaseVector):
def text_exists(self, id: str) -> bool:
with Session(self._client) as session:
select_statement = sql_text(
f"SELECT id FROM {self._collection_name} WHERE meta->>'doc_id' = '{id}' limit 1; "
f"SELECT id FROM {self._collection_name} WHERE meta->>'doc_id' = :doc_id limit 1"
)
result = session.execute(select_statement).fetchall()
result = session.execute(select_statement, {"doc_id": id}).fetchall()
return len(result) > 0
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:

View File

@@ -154,10 +154,8 @@ class RelytVector(BaseVector):
def get_ids_by_metadata_field(self, key: str, value: str):
result = None
with Session(self.client) as session:
select_statement = sql_text(
f"""SELECT id FROM "{self._collection_name}" WHERE metadata->>'{key}' = '{value}'; """
)
result = session.execute(select_statement).fetchall()
select_statement = sql_text(f"""SELECT id FROM "{self._collection_name}" WHERE metadata->>:key = :value""")
result = session.execute(select_statement, {"key": key, "value": value}).fetchall()
if result:
return [item[0] for item in result]
else:
@@ -201,11 +199,10 @@ class RelytVector(BaseVector):
def delete_by_ids(self, ids: list[str]):
with Session(self.client) as session:
ids_str = ",".join(f"'{doc_id}'" for doc_id in ids)
select_statement = sql_text(
f"""SELECT id FROM "{self._collection_name}" WHERE metadata->>'doc_id' in ({ids_str}); """
f"""SELECT id FROM "{self._collection_name}" WHERE metadata->>'doc_id' = ANY(:doc_ids)"""
)
result = session.execute(select_statement).fetchall()
result = session.execute(select_statement, {"doc_ids": ids}).fetchall()
if result:
ids = [item[0] for item in result]
self.delete_by_uuids(ids)
@@ -218,9 +215,9 @@ class RelytVector(BaseVector):
def text_exists(self, id: str) -> bool:
with Session(self.client) as session:
select_statement = sql_text(
f"""SELECT id FROM "{self._collection_name}" WHERE metadata->>'doc_id' = '{id}' limit 1; """
f"""SELECT id FROM "{self._collection_name}" WHERE metadata->>'doc_id' = :doc_id limit 1"""
)
result = session.execute(select_statement).fetchall()
result = session.execute(select_statement, {"doc_id": id}).fetchall()
return len(result) > 0
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:

View File

@@ -38,7 +38,7 @@ class AbstractVectorFactory(ABC):
class Vector:
def __init__(self, dataset: Dataset, attributes: list | None = None):
if attributes is None:
attributes = ["doc_id", "dataset_id", "document_id", "doc_hash"]
attributes = ["doc_id", "dataset_id", "document_id", "doc_hash", "doc_type"]
self._dataset = dataset
self._embeddings = self._get_embeddings()
self._attributes = attributes
@@ -191,6 +191,10 @@ class Vector:
from core.rag.datasource.vdb.iris.iris_vector import IrisVectorFactory
return IrisVectorFactory
case VectorType.HOLOGRES:
from core.rag.datasource.vdb.hologres.hologres_vector import HologresVectorFactory
return HologresVectorFactory
case _:
raise ValueError(f"Vector store {vector_type} is not supported.")

View File

@@ -34,3 +34,4 @@ class VectorType(StrEnum):
MATRIXONE = "matrixone"
CLICKZETTA = "clickzetta"
IRIS = "iris"
HOLOGRES = "hologres"

View File

@@ -196,6 +196,7 @@ class WeaviateVector(BaseVector):
),
wc.Property(name="document_id", data_type=wc.DataType.TEXT),
wc.Property(name="doc_id", data_type=wc.DataType.TEXT),
wc.Property(name="doc_type", data_type=wc.DataType.TEXT),
wc.Property(name="chunk_index", data_type=wc.DataType.INT),
],
vector_config=wc.Configure.Vectors.self_provided(),
@@ -225,6 +226,8 @@ class WeaviateVector(BaseVector):
to_add.append(wc.Property(name="document_id", data_type=wc.DataType.TEXT))
if "doc_id" not in existing:
to_add.append(wc.Property(name="doc_id", data_type=wc.DataType.TEXT))
if "doc_type" not in existing:
to_add.append(wc.Property(name="doc_type", data_type=wc.DataType.TEXT))
if "chunk_index" not in existing:
to_add.append(wc.Property(name="chunk_index", data_type=wc.DataType.INT))

View File

@@ -9,8 +9,8 @@ from flask import current_app
from sqlalchemy import delete, func, select
from core.db.session_factory import session_factory
from dify_graph.nodes.knowledge_index.exc import KnowledgeIndexNodeError
from dify_graph.repositories.index_processor_protocol import Preview, PreviewItem, QaPreview
from core.workflow.nodes.knowledge_index.exc import KnowledgeIndexNodeError
from core.workflow.nodes.knowledge_index.protocols import Preview, PreviewItem, QaPreview
from models.dataset import Dataset, Document, DocumentSegment
from .index_processor_factory import IndexProcessorFactory

View File

@@ -294,7 +294,7 @@ class BaseIndexProcessor(ABC):
logging.warning("Error downloading image from %s: %s", image_url, str(e))
return None
except Exception:
logging.exception("Unexpected error downloading image from %s", image_url)
logging.warning("Unexpected error downloading image from %s", image_url, exc_info=True)
return None
def _download_tool_file(self, tool_file_id: str, current_user: Account) -> str | None:

View File

@@ -56,18 +56,18 @@ from core.rag.retrieval.template_prompts import (
)
from core.tools.signature import sign_upload_file
from core.tools.utils.dataset_retriever.dataset_retriever_base_tool import DatasetRetrieverBaseTool
from dify_graph.file import File, FileTransferMethod, FileType
from dify_graph.model_runtime.entities.llm_entities import LLMMode, LLMResult, LLMUsage
from dify_graph.model_runtime.entities.message_entities import PromptMessage, PromptMessageRole, PromptMessageTool
from dify_graph.model_runtime.entities.model_entities import ModelFeature, ModelType
from dify_graph.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from dify_graph.nodes.knowledge_retrieval import exc
from dify_graph.repositories.rag_retrieval_protocol import (
from core.workflow.nodes.knowledge_retrieval import exc
from core.workflow.nodes.knowledge_retrieval.retrieval import (
KnowledgeRetrievalRequest,
Source,
SourceChildChunk,
SourceMetadata,
)
from dify_graph.file import File, FileTransferMethod, FileType
from dify_graph.model_runtime.entities.llm_entities import LLMMode, LLMResult, LLMUsage
from dify_graph.model_runtime.entities.message_entities import PromptMessage, PromptMessageRole, PromptMessageTool
from dify_graph.model_runtime.entities.model_entities import ModelFeature, ModelType
from dify_graph.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.json_in_md_parser import parse_and_check_json_markdown

View File

@@ -18,7 +18,7 @@ from tenacity import before_sleep_log, retry, retry_if_exception, stop_after_att
from configs import dify_config
from dify_graph.entities import WorkflowNodeExecution
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.enums import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from dify_graph.repositories.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository
from dify_graph.workflow_type_encoder import WorkflowRuntimeTypeConverter
@@ -146,7 +146,7 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository)
index=db_model.index,
predecessor_node_id=db_model.predecessor_node_id,
node_id=db_model.node_id,
node_type=NodeType(db_model.node_type),
node_type=db_model.node_type,
title=db_model.title,
inputs=inputs,
process_data=process_data,

View File

@@ -116,6 +116,7 @@ class ToolParameterConfigurationManager:
return a deep copy of parameters with decrypted values
"""
parameters = self._deep_copy(parameters)
cache = ToolParameterCache(
tenant_id=self.tenant_id,

View File

@@ -3,7 +3,7 @@ from typing import Any
from core.tools.entities.tool_entities import WorkflowToolParameterConfiguration
from core.tools.errors import WorkflowToolHumanInputNotSupportedError
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes
from dify_graph.nodes.base.entities import OutputVariableEntity
from dify_graph.variables.input_entities import VariableEntity
@@ -51,7 +51,7 @@ class WorkflowToolConfigurationUtils:
def ensure_no_human_input_nodes(cls, graph: Mapping[str, Any]) -> None:
nodes = graph.get("nodes", [])
for node in nodes:
if node.get("data", {}).get("type") == NodeType.HUMAN_INPUT:
if node.get("data", {}).get("type") == BuiltinNodeTypes.HUMAN_INPUT:
raise WorkflowToolHumanInputNotSupportedError()
@classmethod

View File

@@ -0,0 +1,18 @@
from typing import Final
TRIGGER_WEBHOOK_NODE_TYPE: Final[str] = "trigger-webhook"
TRIGGER_SCHEDULE_NODE_TYPE: Final[str] = "trigger-schedule"
TRIGGER_PLUGIN_NODE_TYPE: Final[str] = "trigger-plugin"
TRIGGER_INFO_METADATA_KEY: Final[str] = "trigger_info"
TRIGGER_NODE_TYPES: Final[frozenset[str]] = frozenset(
{
TRIGGER_WEBHOOK_NODE_TYPE,
TRIGGER_SCHEDULE_NODE_TYPE,
TRIGGER_PLUGIN_NODE_TYPE,
}
)
def is_trigger_node_type(node_type: str) -> bool:
return node_type in TRIGGER_NODE_TYPES

View File

@@ -11,6 +11,11 @@ from typing import Any
from pydantic import BaseModel
from core.plugin.entities.request import TriggerInvokeEventResponse
from core.trigger.constants import (
TRIGGER_PLUGIN_NODE_TYPE,
TRIGGER_SCHEDULE_NODE_TYPE,
TRIGGER_WEBHOOK_NODE_TYPE,
)
from core.trigger.debug.event_bus import TriggerDebugEventBus
from core.trigger.debug.events import (
PluginTriggerDebugEvent,
@@ -19,10 +24,9 @@ from core.trigger.debug.events import (
build_plugin_pool_key,
build_webhook_pool_key,
)
from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData
from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig
from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.enums import NodeType
from dify_graph.nodes.trigger_plugin.entities import TriggerEventNodeData
from dify_graph.nodes.trigger_schedule.entities import ScheduleConfig
from extensions.ext_redis import redis_client
from libs.datetime_utils import ensure_naive_utc, naive_utc_now
from libs.schedule_utils import calculate_next_run_at
@@ -206,21 +210,19 @@ def create_event_poller(
if not node_config:
raise ValueError("Node data not found for node %s", node_id)
node_type = draft_workflow.get_node_type_from_node_config(node_config)
match node_type:
case NodeType.TRIGGER_PLUGIN:
return PluginTriggerDebugEventPoller(
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
)
case NodeType.TRIGGER_WEBHOOK:
return WebhookTriggerDebugEventPoller(
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
)
case NodeType.TRIGGER_SCHEDULE:
return ScheduleTriggerDebugEventPoller(
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
)
case _:
raise ValueError("unable to create event poller for node type %s", node_type)
if node_type == TRIGGER_PLUGIN_NODE_TYPE:
return PluginTriggerDebugEventPoller(
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
)
if node_type == TRIGGER_WEBHOOK_NODE_TYPE:
return WebhookTriggerDebugEventPoller(
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
)
if node_type == TRIGGER_SCHEDULE_NODE_TYPE:
return ScheduleTriggerDebugEventPoller(
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
)
raise ValueError("unable to create event poller for node type %s", node_type)
def select_trigger_debug_events(

View File

@@ -1,4 +1 @@
from .node_factory import DifyNodeFactory
from .workflow_entry import WorkflowEntry
__all__ = ["DifyNodeFactory", "WorkflowEntry"]
"""Core workflow package."""

View File

@@ -1,4 +1,7 @@
from collections.abc import Callable, Mapping
import importlib
import pkgutil
from collections.abc import Callable, Iterator, Mapping, MutableMapping
from functools import lru_cache
from typing import TYPE_CHECKING, Any, TypeAlias, cast, final
from sqlalchemy import select
@@ -8,7 +11,6 @@ from typing_extensions import override
from configs import dify_config
from core.app.entities.app_invoke_entities import DifyRunContext
from core.app.llm.model_access import build_dify_model_access
from core.datasource.datasource_manager import DatasourceManager
from core.helper.code_executor.code_executor import (
CodeExecutionError,
CodeExecutor,
@@ -17,12 +19,9 @@ from core.helper.ssrf_proxy import ssrf_proxy
from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance
from core.prompt.entities.advanced_prompt_entities import MemoryConfig
from core.rag.index_processor.index_processor import IndexProcessor
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
from core.rag.summary_index.summary_index import SummaryIndex
from core.repositories.human_input_repository import HumanInputFormRepositoryImpl
from core.tools.tool_file_manager import ToolFileManager
from core.workflow.node_resolution import resolve_workflow_node_class
from core.trigger.constants import TRIGGER_NODE_TYPES
from core.workflow.nodes.agent.message_transformer import AgentMessageTransformer
from core.workflow.nodes.agent.plugin_strategy_adapter import (
PluginAgentStrategyPresentationProvider,
@@ -32,7 +31,7 @@ from core.workflow.nodes.agent.runtime_support import AgentRuntimeSupport
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.entities.graph_config import NodeConfigDict, NodeConfigDictAdapter
from dify_graph.entities.graph_init_params import DIFY_RUN_CONTEXT_KEY
from dify_graph.enums import NodeType, SystemVariableKey
from dify_graph.enums import BuiltinNodeTypes, NodeType, SystemVariableKey
from dify_graph.file.file_manager import file_manager
from dify_graph.graph.graph import NodeFactory
from dify_graph.model_runtime.entities.model_entities import ModelType
@@ -46,6 +45,7 @@ from dify_graph.nodes.document_extractor import UnstructuredApiConfig
from dify_graph.nodes.http_request import build_http_request_config
from dify_graph.nodes.llm.entities import LLMNodeData
from dify_graph.nodes.llm.exc import LLMModeRequiredError, ModelNotExistError
from dify_graph.nodes.llm.protocols import TemplateRenderer
from dify_graph.nodes.parameter_extractor.entities import ParameterExtractorNodeData
from dify_graph.nodes.question_classifier.entities import QuestionClassifierNodeData
from dify_graph.nodes.template_transform.template_renderer import (
@@ -59,6 +59,135 @@ if TYPE_CHECKING:
from dify_graph.entities import GraphInitParams
from dify_graph.runtime import GraphRuntimeState
LATEST_VERSION = "latest"
_START_NODE_TYPES: frozenset[NodeType] = frozenset(
(BuiltinNodeTypes.START, BuiltinNodeTypes.DATASOURCE, *TRIGGER_NODE_TYPES)
)
def _import_node_package(package_name: str, *, excluded_modules: frozenset[str] = frozenset()) -> None:
package = importlib.import_module(package_name)
for _, module_name, _ in pkgutil.walk_packages(package.__path__, package.__name__ + "."):
if module_name in excluded_modules:
continue
importlib.import_module(module_name)
@lru_cache(maxsize=1)
def register_nodes() -> None:
"""Import production node modules so they self-register with ``Node``."""
_import_node_package("dify_graph.nodes")
_import_node_package("core.workflow.nodes")
def get_node_type_classes_mapping() -> Mapping[NodeType, Mapping[str, type[Node]]]:
"""Return a read-only snapshot of the current production node registry.
The workflow layer owns node bootstrap because it must compose built-in
`dify_graph.nodes.*` implementations with workflow-local nodes under
`core.workflow.nodes.*`. Keeping this import side effect here avoids
reintroducing registry bootstrapping into lower-level graph primitives.
"""
register_nodes()
return Node.get_node_type_classes_mapping()
def resolve_workflow_node_class(*, node_type: NodeType, node_version: str) -> type[Node]:
node_mapping = get_node_type_classes_mapping().get(node_type)
if not node_mapping:
raise ValueError(f"No class mapping found for node type: {node_type}")
latest_node_class = node_mapping.get(LATEST_VERSION)
matched_node_class = node_mapping.get(node_version)
node_class = matched_node_class or latest_node_class
if not node_class:
raise ValueError(f"No latest version class found for node type: {node_type}")
return node_class
def is_start_node_type(node_type: NodeType) -> bool:
"""Return True when the node type can serve as a workflow entry point."""
return node_type in _START_NODE_TYPES
def get_default_root_node_id(graph_config: Mapping[str, Any]) -> str:
"""Resolve the default entry node for a persisted top-level workflow graph.
This workflow-layer helper depends on start-node semantics defined by
`is_start_node_type`, so it intentionally lives next to the node registry
instead of in the raw `dify_graph.entities.graph_config` schema module.
"""
nodes = graph_config.get("nodes")
if not isinstance(nodes, list):
raise ValueError("nodes in workflow graph must be a list")
for node in nodes:
if not isinstance(node, Mapping):
continue
if node.get("type") == "custom-note":
continue
node_id = node.get("id")
data = node.get("data")
if not isinstance(node_id, str) or not isinstance(data, Mapping):
continue
node_type = data.get("type")
if isinstance(node_type, str) and is_start_node_type(node_type):
return node_id
raise ValueError("Unable to determine default root node ID from workflow graph")
class _LazyNodeTypeClassesMapping(MutableMapping[NodeType, Mapping[str, type[Node]]]):
"""Mutable dict-like view over the current node registry."""
def __init__(self) -> None:
self._cached_snapshot: dict[NodeType, Mapping[str, type[Node]]] = {}
self._cached_version = -1
self._deleted: set[NodeType] = set()
self._overrides: dict[NodeType, Mapping[str, type[Node]]] = {}
def _snapshot(self) -> dict[NodeType, Mapping[str, type[Node]]]:
current_version = Node.get_registry_version()
if self._cached_version != current_version:
self._cached_snapshot = dict(get_node_type_classes_mapping())
self._cached_version = current_version
if not self._deleted and not self._overrides:
return self._cached_snapshot
snapshot = {key: value for key, value in self._cached_snapshot.items() if key not in self._deleted}
snapshot.update(self._overrides)
return snapshot
def __getitem__(self, key: NodeType) -> Mapping[str, type[Node]]:
return self._snapshot()[key]
def __setitem__(self, key: NodeType, value: Mapping[str, type[Node]]) -> None:
self._deleted.discard(key)
self._overrides[key] = value
def __delitem__(self, key: NodeType) -> None:
if key in self._overrides:
del self._overrides[key]
return
if key in self._cached_snapshot:
self._deleted.add(key)
return
raise KeyError(key)
def __iter__(self) -> Iterator[NodeType]:
return iter(self._snapshot())
def __len__(self) -> int:
return len(self._snapshot())
# Keep the canonical node-class mapping in the workflow layer that also bootstraps
# legacy `core.workflow.nodes.*` registrations.
NODE_TYPE_CLASSES_MAPPING: MutableMapping[NodeType, Mapping[str, type[Node]]] = _LazyNodeTypeClassesMapping()
LLMCompatibleNodeData: TypeAlias = LLMNodeData | QuestionClassifierNodeData | ParameterExtractorNodeData
@@ -100,6 +229,16 @@ class DefaultWorkflowCodeExecutor:
return isinstance(error, CodeExecutionError)
class DefaultLLMTemplateRenderer(TemplateRenderer):
def render_jinja2(self, *, template: str, inputs: Mapping[str, Any]) -> str:
result = CodeExecutor.execute_workflow_code_template(
language=CodeLanguage.JINJA2,
code=template,
inputs=inputs,
)
return str(result.get("result", ""))
@final
class DifyNodeFactory(NodeFactory):
"""
@@ -126,11 +265,11 @@ class DifyNodeFactory(NodeFactory):
max_object_array_length=dify_config.CODE_MAX_OBJECT_ARRAY_LENGTH,
)
self._template_renderer = CodeExecutorJinja2TemplateRenderer(code_executor=self._code_executor)
self._llm_template_renderer: TemplateRenderer = DefaultLLMTemplateRenderer()
self._template_transform_max_output_length = dify_config.TEMPLATE_TRANSFORM_MAX_LENGTH
self._http_request_http_client = ssrf_proxy
self._http_request_tool_file_manager_factory = ToolFileManager
self._http_request_file_manager = file_manager
self._rag_retrieval = DatasetRetrieval()
self._document_extractor_unstructured_api_config = UnstructuredApiConfig(
api_url=dify_config.UNSTRUCTURED_API_URL,
api_key=dify_config.UNSTRUCTURED_API_KEY or "",
@@ -177,56 +316,46 @@ class DifyNodeFactory(NodeFactory):
node_class = self._resolve_node_class(node_type=node_data.type, node_version=str(node_data.version))
node_type = node_data.type
node_init_kwargs_factories: Mapping[NodeType, Callable[[], dict[str, object]]] = {
NodeType.CODE: lambda: {
BuiltinNodeTypes.CODE: lambda: {
"code_executor": self._code_executor,
"code_limits": self._code_limits,
},
NodeType.TEMPLATE_TRANSFORM: lambda: {
BuiltinNodeTypes.TEMPLATE_TRANSFORM: lambda: {
"template_renderer": self._template_renderer,
"max_output_length": self._template_transform_max_output_length,
},
NodeType.HTTP_REQUEST: lambda: {
BuiltinNodeTypes.HTTP_REQUEST: lambda: {
"http_request_config": self._http_request_config,
"http_client": self._http_request_http_client,
"tool_file_manager_factory": self._http_request_tool_file_manager_factory,
"file_manager": self._http_request_file_manager,
},
NodeType.HUMAN_INPUT: lambda: {
BuiltinNodeTypes.HUMAN_INPUT: lambda: {
"form_repository": HumanInputFormRepositoryImpl(tenant_id=self._dify_context.tenant_id),
},
NodeType.KNOWLEDGE_INDEX: lambda: {
"index_processor": IndexProcessor(),
"summary_index_service": SummaryIndex(),
},
NodeType.LLM: lambda: self._build_llm_compatible_node_init_kwargs(
BuiltinNodeTypes.LLM: lambda: self._build_llm_compatible_node_init_kwargs(
node_class=node_class,
node_data=node_data,
include_http_client=True,
),
NodeType.DATASOURCE: lambda: {
"datasource_manager": DatasourceManager,
},
NodeType.KNOWLEDGE_RETRIEVAL: lambda: {
"rag_retrieval": self._rag_retrieval,
},
NodeType.DOCUMENT_EXTRACTOR: lambda: {
BuiltinNodeTypes.DOCUMENT_EXTRACTOR: lambda: {
"unstructured_api_config": self._document_extractor_unstructured_api_config,
"http_client": self._http_request_http_client,
},
NodeType.QUESTION_CLASSIFIER: lambda: self._build_llm_compatible_node_init_kwargs(
BuiltinNodeTypes.QUESTION_CLASSIFIER: lambda: self._build_llm_compatible_node_init_kwargs(
node_class=node_class,
node_data=node_data,
include_http_client=True,
),
NodeType.PARAMETER_EXTRACTOR: lambda: self._build_llm_compatible_node_init_kwargs(
BuiltinNodeTypes.PARAMETER_EXTRACTOR: lambda: self._build_llm_compatible_node_init_kwargs(
node_class=node_class,
node_data=node_data,
include_http_client=False,
),
NodeType.TOOL: lambda: {
BuiltinNodeTypes.TOOL: lambda: {
"tool_file_manager_factory": self._http_request_tool_file_manager_factory(),
},
NodeType.AGENT: lambda: {
BuiltinNodeTypes.AGENT: lambda: {
"strategy_resolver": self._agent_strategy_resolver,
"presentation_provider": self._agent_strategy_presentation_provider,
"runtime_support": self._agent_runtime_support,
@@ -274,6 +403,8 @@ class DifyNodeFactory(NodeFactory):
model_instance=model_instance,
),
}
if validated_node_data.type in {BuiltinNodeTypes.LLM, BuiltinNodeTypes.QUESTION_CLASSIFIER}:
node_init_kwargs["template_renderer"] = self._llm_template_renderer
if include_http_client:
node_init_kwargs["http_client"] = self._http_request_http_client
return node_init_kwargs

View File

@@ -1,42 +0,0 @@
from __future__ import annotations
from collections.abc import Mapping
from importlib import import_module
from dify_graph.enums import NodeType
from dify_graph.nodes.base.node import Node
from dify_graph.nodes.node_mapping import LATEST_VERSION, get_node_type_classes_mapping
_WORKFLOW_NODE_MODULES = ("core.workflow.nodes.agent",)
_workflow_nodes_registered = False
def ensure_workflow_nodes_registered() -> None:
"""Import workflow-local node modules so they can register with `Node.__init_subclass__`."""
global _workflow_nodes_registered
if _workflow_nodes_registered:
return
for module_name in _WORKFLOW_NODE_MODULES:
import_module(module_name)
_workflow_nodes_registered = True
def get_workflow_node_type_classes_mapping() -> Mapping[NodeType, Mapping[str, type[Node]]]:
ensure_workflow_nodes_registered()
return get_node_type_classes_mapping()
def resolve_workflow_node_class(*, node_type: NodeType, node_version: str) -> type[Node]:
node_mapping = get_workflow_node_type_classes_mapping().get(node_type)
if not node_mapping:
raise ValueError(f"No class mapping found for node type: {node_type}")
latest_node_class = node_mapping.get(LATEST_VERSION)
matched_node_class = node_mapping.get(node_version)
node_class = matched_node_class or latest_node_class
if not node_class:
raise ValueError(f"No latest version class found for node type: {node_type}")
return node_class

View File

@@ -0,0 +1 @@
"""Workflow node implementations that remain under the legacy core.workflow namespace."""

View File

@@ -4,7 +4,7 @@ from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any
from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.enums import NodeType, SystemVariableKey, WorkflowNodeExecutionStatus
from dify_graph.enums import BuiltinNodeTypes, SystemVariableKey, WorkflowNodeExecutionStatus
from dify_graph.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent
from dify_graph.nodes.base.node import Node
from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser
@@ -24,7 +24,7 @@ if TYPE_CHECKING:
class AgentNode(Node[AgentNodeData]):
node_type = NodeType.AGENT
node_type = BuiltinNodeTypes.AGENT
_strategy_resolver: AgentStrategyResolver
_presentation_provider: AgentStrategyPresentationProvider

View File

@@ -6,11 +6,11 @@ from pydantic import BaseModel
from core.prompt.entities.advanced_prompt_entities import MemoryConfig
from core.tools.entities.tool_entities import ToolSelector
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes, NodeType
class AgentNodeData(BaseNodeData):
type: NodeType = NodeType.AGENT
type: NodeType = BuiltinNodeTypes.AGENT
agent_strategy_provider_name: str
agent_strategy_name: str
agent_strategy_label: str

View File

@@ -8,7 +8,7 @@ from sqlalchemy.orm import Session
from core.tools.entities.tool_entities import ToolInvokeMessage
from core.tools.utils.message_transformer import ToolFileMessageTransformer
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.enums import BuiltinNodeTypes, NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.file import File, FileTransferMethod
from dify_graph.model_runtime.entities.llm_entities import LLMUsage, LLMUsageMetadata
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
@@ -123,7 +123,7 @@ class AgentMessageTransformer:
)
elif message.type == ToolInvokeMessage.MessageType.JSON:
assert isinstance(message.message, ToolInvokeMessage.JsonMessage)
if node_type == NodeType.AGENT:
if node_type == BuiltinNodeTypes.AGENT:
if isinstance(message.message.json_object, dict):
msg_metadata: dict[str, Any] = message.message.json_object.pop("execution_metadata", {})
llm_usage = LLMUsage.from_metadata(cast(LLMUsageMetadata, msg_metadata))

View File

@@ -0,0 +1 @@
"""Datasource workflow node package."""

View File

@@ -1,22 +1,17 @@
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any
from core.datasource.datasource_manager import DatasourceManager
from core.datasource.entities.datasource_entities import DatasourceProviderType
from core.plugin.impl.exc import PluginDaemonClientSideError
from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType, SystemVariableKey
from dify_graph.enums import BuiltinNodeTypes, NodeExecutionType, SystemVariableKey, WorkflowNodeExecutionMetadataKey
from dify_graph.node_events import NodeRunResult, StreamCompletedEvent
from dify_graph.nodes.base.node import Node
from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser
from dify_graph.repositories.datasource_manager_protocol import (
DatasourceManagerProtocol,
DatasourceParameter,
OnlineDriveDownloadFileParam,
)
from ...entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from .entities import DatasourceNodeData
from .entities import DatasourceNodeData, DatasourceParameter, OnlineDriveDownloadFileParam
from .exc import DatasourceNodeError
if TYPE_CHECKING:
@@ -29,7 +24,7 @@ class DatasourceNode(Node[DatasourceNodeData]):
Datasource Node
"""
node_type = NodeType.DATASOURCE
node_type = BuiltinNodeTypes.DATASOURCE
execution_type = NodeExecutionType.ROOT
def __init__(
@@ -38,7 +33,6 @@ class DatasourceNode(Node[DatasourceNodeData]):
config: NodeConfigDict,
graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState",
datasource_manager: DatasourceManagerProtocol,
):
super().__init__(
id=id,
@@ -46,7 +40,7 @@ class DatasourceNode(Node[DatasourceNodeData]):
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)
self.datasource_manager = datasource_manager
self.datasource_manager = DatasourceManager
def populate_start_event(self, event) -> None:
event.provider_id = f"{self.node_data.plugin_id}/{self.node_data.provider_name}"

View File

@@ -4,7 +4,7 @@ from pydantic import BaseModel, field_validator
from pydantic_core.core_schema import ValidationInfo
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes, NodeType
class DatasourceEntity(BaseModel):
@@ -17,7 +17,7 @@ class DatasourceEntity(BaseModel):
class DatasourceNodeData(BaseNodeData, DatasourceEntity):
type: NodeType = NodeType.DATASOURCE
type: NodeType = BuiltinNodeTypes.DATASOURCE
class DatasourceInput(BaseModel):
# TODO: check this type
@@ -42,3 +42,14 @@ class DatasourceNodeData(BaseNodeData, DatasourceEntity):
return typ
datasource_parameters: dict[str, DatasourceInput] | None = None
class DatasourceParameter(BaseModel):
workspace_id: str
page_id: str
type: str
class OnlineDriveDownloadFileParam(BaseModel):
id: str
bucket: str

View File

@@ -1,25 +1,10 @@
from collections.abc import Generator
from typing import Any, Protocol
from pydantic import BaseModel
from dify_graph.file import File
from dify_graph.node_events import StreamChunkEvent, StreamCompletedEvent
class DatasourceParameter(BaseModel):
workspace_id: str
page_id: str
type: str
class OnlineDriveDownloadFileParam(BaseModel):
id: str
bucket: str
class DatasourceFinal(BaseModel):
data: dict[str, Any] | None = None
from .entities import DatasourceParameter, OnlineDriveDownloadFileParam
class DatasourceManagerProtocol(Protocol):

View File

@@ -0,0 +1,5 @@
"""Knowledge index workflow node package."""
KNOWLEDGE_INDEX_NODE_TYPE = "knowledge-index"
__all__ = ["KNOWLEDGE_INDEX_NODE_TYPE"]

View File

@@ -3,6 +3,7 @@ from typing import Literal, Union
from pydantic import BaseModel
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.workflow.nodes.knowledge_index import KNOWLEDGE_INDEX_NODE_TYPE
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
@@ -156,7 +157,7 @@ class KnowledgeIndexNodeData(BaseNodeData):
Knowledge index Node Data.
"""
type: NodeType = NodeType.KNOWLEDGE_INDEX
type: NodeType = KNOWLEDGE_INDEX_NODE_TYPE
chunk_structure: str
index_chunk_variable_selector: list[str]
indexing_technique: str | None = None

View File

@@ -2,14 +2,15 @@ import logging
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any
from core.rag.index_processor.index_processor import IndexProcessor
from core.rag.summary_index.summary_index import SummaryIndex
from core.workflow.nodes.knowledge_index import KNOWLEDGE_INDEX_NODE_TYPE
from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType, SystemVariableKey
from dify_graph.enums import NodeExecutionType, SystemVariableKey
from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node
from dify_graph.nodes.base.template import Template
from dify_graph.repositories.index_processor_protocol import IndexProcessorProtocol
from dify_graph.repositories.summary_index_service_protocol import SummaryIndexServiceProtocol
from .entities import KnowledgeIndexNodeData
from .exc import (
@@ -25,7 +26,7 @@ _INVOKE_FROM_DEBUGGER = "debugger"
class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
node_type = NodeType.KNOWLEDGE_INDEX
node_type = KNOWLEDGE_INDEX_NODE_TYPE
execution_type = NodeExecutionType.RESPONSE
def __init__(
@@ -34,12 +35,10 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
config: NodeConfigDict,
graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState",
index_processor: IndexProcessorProtocol,
summary_index_service: SummaryIndexServiceProtocol,
) -> None:
super().__init__(id, config, graph_init_params, graph_runtime_state)
self.index_processor = index_processor
self.summary_index_service = summary_index_service
self.index_processor = IndexProcessor()
self.summary_index_service = SummaryIndex()
def _run(self) -> NodeRunResult: # type: ignore
node_data = self.node_data

View File

@@ -5,21 +5,21 @@ from pydantic import BaseModel, Field
class PreviewItem(BaseModel):
content: str | None = Field(None)
child_chunks: list[str] | None = Field(None)
summary: str | None = Field(None)
content: str | None = Field(default=None)
child_chunks: list[str] | None = Field(default=None)
summary: str | None = Field(default=None)
class QaPreview(BaseModel):
answer: str | None = Field(None)
question: str | None = Field(None)
answer: str | None = Field(default=None)
question: str | None = Field(default=None)
class Preview(BaseModel):
chunk_structure: str
parent_mode: str | None = Field(None)
preview: list[PreviewItem] = Field([])
qa_preview: list[QaPreview] = Field([])
parent_mode: str | None = Field(default=None)
preview: list[PreviewItem] = Field(default_factory=list)
qa_preview: list[QaPreview] = Field(default_factory=list)
total_segments: int
@@ -39,3 +39,9 @@ class IndexProcessorProtocol(Protocol):
def get_preview_output(
self, chunks: Any, dataset_id: str, document_id: str, chunk_structure: str, summary_index_setting: dict | None
) -> Preview: ...
class SummaryIndexServiceProtocol(Protocol):
def generate_and_vectorize_summary(
self, dataset_id: str, document_id: str, is_preview: bool, summary_index_setting: dict | None = None
) -> None: ...

View File

@@ -0,0 +1 @@
"""Knowledge retrieval workflow node package."""

View File

@@ -4,7 +4,7 @@ from typing import Literal
from pydantic import BaseModel, Field
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.nodes.llm.entities import ModelConfig, VisionConfig
@@ -114,7 +114,7 @@ class KnowledgeRetrievalNodeData(BaseNodeData):
Knowledge retrieval Node Data.
"""
type: NodeType = NodeType.KNOWLEDGE_RETRIEVAL
type: NodeType = BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL
query_variable_selector: list[str] | None | str = None
query_attachment_selector: list[str] | None | str = None
dataset_ids: list[str]

View File

@@ -1,12 +1,19 @@
"""Knowledge retrieval workflow node implementation.
This node now lives under ``core.workflow.nodes`` and is discovered directly by
the workflow node registry.
"""
import logging
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, Literal
from core.app.app_config.entities import DatasetRetrieveConfigEntity
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.enums import (
NodeType,
BuiltinNodeTypes,
WorkflowNodeExecutionMetadataKey,
WorkflowNodeExecutionStatus,
)
@@ -15,7 +22,6 @@ from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base import LLMUsageTrackingMixin
from dify_graph.nodes.base.node import Node
from dify_graph.repositories.rag_retrieval_protocol import KnowledgeRetrievalRequest, RAGRetrievalProtocol, Source
from dify_graph.variables import (
ArrayFileSegment,
FileSegment,
@@ -32,6 +38,7 @@ from .exc import (
KnowledgeRetrievalNodeError,
RateLimitExceededError,
)
from .retrieval import KnowledgeRetrievalRequest, Source
if TYPE_CHECKING:
from dify_graph.file.models import File
@@ -41,7 +48,7 @@ logger = logging.getLogger(__name__)
class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeData]):
node_type = NodeType.KNOWLEDGE_RETRIEVAL
node_type = BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL
# Instance attributes specific to LLMNode.
# Output variable for file
@@ -53,7 +60,6 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
config: NodeConfigDict,
graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState",
rag_retrieval: RAGRetrievalProtocol,
):
super().__init__(
id=id,
@@ -63,7 +69,7 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
)
# LLM file outputs, used for MultiModal outputs.
self._file_outputs = []
self._rag_retrieval = rag_retrieval
self._rag_retrieval = DatasetRetrieval()
@classmethod
def version(cls):

View File

@@ -3,9 +3,10 @@ from typing import Any, Literal, Protocol
from pydantic import BaseModel, Field
from dify_graph.model_runtime.entities import LLMUsage
from dify_graph.nodes.knowledge_retrieval.entities import MetadataFilteringCondition
from dify_graph.nodes.llm.entities import ModelConfig
from .entities import MetadataFilteringCondition
class SourceChildChunk(BaseModel):
id: str = Field(default="", description="Child chunk ID")
@@ -28,7 +29,7 @@ class SourceMetadata(BaseModel):
segment_id: str | None = Field(default=None, description="Segment unique identifier")
retriever_from: str = Field(default="workflow", description="Retriever source context")
score: float = Field(default=0.0, description="Retrieval relevance score")
child_chunks: list[SourceChildChunk] = Field(default=[], description="List of child chunks")
child_chunks: list[SourceChildChunk] = Field(default_factory=list, description="List of child chunks")
segment_hit_count: int | None = Field(default=0, description="Number of times segment was retrieved")
segment_word_count: int | None = Field(default=0, description="Word count of the segment")
segment_position: int | None = Field(default=0, description="Position of segment in document")
@@ -81,28 +82,7 @@ class KnowledgeRetrievalRequest(BaseModel):
class RAGRetrievalProtocol(Protocol):
"""Protocol for RAG-based knowledge retrieval implementations.
Implementations of this protocol handle knowledge retrieval from datasets
including rate limiting, dataset filtering, and document retrieval.
"""
@property
def llm_usage(self) -> LLMUsage:
"""Return accumulated LLM usage for retrieval operations."""
...
def llm_usage(self) -> LLMUsage: ...
def knowledge_retrieval(self, request: KnowledgeRetrievalRequest) -> list[Source]:
"""Retrieve knowledge from datasets based on the provided request.
Args:
request: Knowledge retrieval request with search parameters
Returns:
List of sources matching the search criteria
Raises:
RateLimitExceededError: If rate limit is exceeded
ModelNotExistError: If specified model doesn't exist
"""
...
def knowledge_retrieval(self, request: KnowledgeRetrievalRequest) -> list[Source]: ...

View File

@@ -3,16 +3,18 @@ from typing import Any, Literal, Union
from pydantic import BaseModel, Field, ValidationInfo, field_validator
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from core.trigger.entities.entities import EventParameter
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
from dify_graph.nodes.trigger_plugin.exc import TriggerEventParameterError
from .exc import TriggerEventParameterError
class TriggerEventNodeData(BaseNodeData):
"""Plugin trigger node data"""
type: NodeType = NodeType.TRIGGER_PLUGIN
type: NodeType = TRIGGER_PLUGIN_NODE_TYPE
class TriggerEventInput(BaseModel):
value: Union[Any, list[str]]

View File

@@ -1,8 +1,10 @@
from collections.abc import Mapping
from typing import Any, cast
from core.trigger.constants import TRIGGER_INFO_METADATA_KEY, TRIGGER_PLUGIN_NODE_TYPE
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, WorkflowNodeExecutionMetadataKey
from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node
@@ -10,7 +12,7 @@ from .entities import TriggerEventNodeData
class TriggerEventNode(Node[TriggerEventNodeData]):
node_type = NodeType.TRIGGER_PLUGIN
node_type = TRIGGER_PLUGIN_NODE_TYPE
execution_type = NodeExecutionType.ROOT
@classmethod
@@ -44,8 +46,8 @@ class TriggerEventNode(Node[TriggerEventNodeData]):
"""
# Get trigger data passed when workflow was triggered
metadata = {
WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: {
metadata: dict[WorkflowNodeExecutionMetadataKey, Any] = {
cast(WorkflowNodeExecutionMetadataKey, TRIGGER_INFO_METADATA_KEY): {
"provider_id": self.node_data.provider_id,
"event_name": self.node_data.event_name,
"plugin_unique_identifier": self.node_data.plugin_unique_identifier,

View File

@@ -0,0 +1,3 @@
from .trigger_schedule_node import TriggerScheduleNode
__all__ = ["TriggerScheduleNode"]

View File

@@ -2,6 +2,7 @@ from typing import Literal, Union
from pydantic import BaseModel, Field
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
@@ -11,7 +12,7 @@ class TriggerScheduleNodeData(BaseNodeData):
Trigger Schedule Node Data
"""
type: NodeType = NodeType.TRIGGER_SCHEDULE
type: NodeType = TRIGGER_SCHEDULE_NODE_TYPE
mode: str = Field(default="visual", description="Schedule mode: visual or cron")
frequency: str | None = Field(default=None, description="Frequency for visual mode: hourly, daily, weekly, monthly")
cron_expression: str | None = Field(default=None, description="Cron expression for cron mode")

View File

@@ -1,15 +1,17 @@
from collections.abc import Mapping
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType
from dify_graph.enums import NodeExecutionType
from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node
from dify_graph.nodes.trigger_schedule.entities import TriggerScheduleNodeData
from .entities import TriggerScheduleNodeData
class TriggerScheduleNode(Node[TriggerScheduleNodeData]):
node_type = NodeType.TRIGGER_SCHEDULE
node_type = TRIGGER_SCHEDULE_NODE_TYPE
execution_type = NodeExecutionType.ROOT
@classmethod
@@ -19,7 +21,7 @@ class TriggerScheduleNode(Node[TriggerScheduleNodeData]):
@classmethod
def get_default_config(cls, filters: Mapping[str, object] | None = None) -> Mapping[str, object]:
return {
"type": "trigger-schedule",
"type": TRIGGER_SCHEDULE_NODE_TYPE,
"config": {
"mode": "visual",
"frequency": "daily",

View File

@@ -3,6 +3,7 @@ from enum import StrEnum
from pydantic import BaseModel, Field, field_validator
from core.trigger.constants import TRIGGER_WEBHOOK_NODE_TYPE
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
from dify_graph.variables.types import SegmentType
@@ -93,7 +94,7 @@ class WebhookData(BaseNodeData):
class SyncMode(StrEnum):
SYNC = "async" # only support
type: NodeType = NodeType.TRIGGER_WEBHOOK
type: NodeType = TRIGGER_WEBHOOK_NODE_TYPE
method: Method = Method.GET
content_type: ContentType = Field(default=ContentType.JSON)
headers: Sequence[WebhookParameter] = Field(default_factory=list)

View File

@@ -2,9 +2,10 @@ import logging
from collections.abc import Mapping
from typing import Any
from core.trigger.constants import TRIGGER_WEBHOOK_NODE_TYPE
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType
from dify_graph.enums import NodeExecutionType
from dify_graph.file import FileTransferMethod
from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node
@@ -19,7 +20,7 @@ logger = logging.getLogger(__name__)
class TriggerWebhookNode(Node[WebhookData]):
node_type = NodeType.TRIGGER_WEBHOOK
node_type = TRIGGER_WEBHOOK_NODE_TYPE
execution_type = NodeExecutionType.ROOT
@classmethod

View File

@@ -8,8 +8,7 @@ from core.app.apps.exc import GenerateTaskStoppedError
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_dify_run_context
from core.app.workflow.layers.llm_quota import LLMQuotaLayer
from core.app.workflow.layers.observability import ObservabilityLayer
from core.workflow.node_factory import DifyNodeFactory
from core.workflow.node_resolution import resolve_workflow_node_class
from core.workflow.node_factory import DifyNodeFactory, resolve_workflow_node_class
from dify_graph.constants import ENVIRONMENT_VARIABLE_NODE_ID
from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDictAdapter
@@ -22,7 +21,7 @@ from dify_graph.graph_engine.layers import DebugLoggingLayer, ExecutionLimitsLay
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_engine.protocols.command_channel import CommandChannel
from dify_graph.graph_events import GraphEngineEvent, GraphNodeEventBase, GraphRunFailedEvent
from dify_graph.nodes import NodeType
from dify_graph.nodes import BuiltinNodeTypes
from dify_graph.nodes.base.node import Node
from dify_graph.runtime import ChildGraphNotFoundError, GraphRuntimeState, VariablePool
from dify_graph.system_variable import SystemVariable
@@ -253,7 +252,7 @@ class WorkflowEntry:
variable_mapping=variable_mapping,
user_inputs=user_inputs,
)
if node_type != NodeType.DATASOURCE:
if node_type != BuiltinNodeTypes.DATASOURCE:
cls.mapping_user_inputs_to_variable_pool(
variable_mapping=variable_mapping,
user_inputs=user_inputs,
@@ -303,7 +302,7 @@ class WorkflowEntry:
"height": node_height,
"type": "custom",
"data": {
"type": NodeType.START,
"type": BuiltinNodeTypes.START,
"title": "Start",
"desc": "Start",
},
@@ -339,8 +338,8 @@ class WorkflowEntry:
# Create a minimal graph for single node execution
graph_dict = cls._create_single_node_graph(node_id, node_data)
node_type = NodeType(node_data.get("type", ""))
if node_type not in {NodeType.PARAMETER_EXTRACTOR, NodeType.QUESTION_CLASSIFIER}:
node_type = node_data.get("type", "")
if node_type not in {BuiltinNodeTypes.PARAMETER_EXTRACTOR, BuiltinNodeTypes.QUESTION_CLASSIFIER}:
raise ValueError(f"Node type {node_type} not supported")
node_cls = resolve_workflow_node_class(node_type=node_type, node_version="1")

View File

@@ -113,7 +113,7 @@ The codebase enforces strict layering via import-linter:
1. Create node class in `nodes/<node_type>/`
1. Inherit from `BaseNode` or appropriate base class
1. Implement `_run()` method
1. Register in `nodes/node_mapping.py`
1. Ensure the node module is importable under `nodes/<node_type>/`
1. Add tests in `tests/unit_tests/dify_graph/nodes/`
### Implementing a Custom Layer

View File

@@ -121,6 +121,8 @@ class DefaultValue(BaseModel):
class BaseNodeData(ABC, BaseModel):
# Raw graph payloads are first validated through `NodeConfigDictAdapter`, where
# `node["data"]` is typed as `BaseNodeData` before the concrete node class is known.
# `type` therefore accepts downstream string node kinds; unknown node implementations
# are rejected later when the node factory resolves the node registry.
# At that boundary, node-specific fields are still "extra" relative to this shared DTO,
# and persisted templates/workflows also carry undeclared compatibility keys such as
# `selected`, `params`, `paramSchemas`, and `datasource_label`. Keep extras permissive

View File

@@ -48,7 +48,7 @@ class WorkflowNodeExecution(BaseModel):
index: int # Sequence number for ordering in trace visualization
predecessor_node_id: str | None = None # ID of the node that executed before this one
node_id: str # ID of the node being executed
node_type: NodeType # Type of node (e.g., start, llm, knowledge)
node_type: NodeType # Type of node (e.g., start, llm, downstream response node)
title: str # Display title of the node
# Execution data

View File

@@ -1,4 +1,5 @@
from enum import StrEnum
from typing import ClassVar, TypeAlias
class NodeState(StrEnum):
@@ -33,56 +34,71 @@ class SystemVariableKey(StrEnum):
INVOKE_FROM = "invoke_from"
class NodeType(StrEnum):
START = "start"
END = "end"
ANSWER = "answer"
LLM = "llm"
KNOWLEDGE_RETRIEVAL = "knowledge-retrieval"
KNOWLEDGE_INDEX = "knowledge-index"
IF_ELSE = "if-else"
CODE = "code"
TEMPLATE_TRANSFORM = "template-transform"
QUESTION_CLASSIFIER = "question-classifier"
HTTP_REQUEST = "http-request"
TOOL = "tool"
DATASOURCE = "datasource"
VARIABLE_AGGREGATOR = "variable-aggregator"
LEGACY_VARIABLE_AGGREGATOR = "variable-assigner" # TODO: Merge this into VARIABLE_AGGREGATOR in the database.
LOOP = "loop"
LOOP_START = "loop-start"
LOOP_END = "loop-end"
ITERATION = "iteration"
ITERATION_START = "iteration-start" # Fake start node for iteration.
PARAMETER_EXTRACTOR = "parameter-extractor"
VARIABLE_ASSIGNER = "assigner"
DOCUMENT_EXTRACTOR = "document-extractor"
LIST_OPERATOR = "list-operator"
AGENT = "agent"
TRIGGER_WEBHOOK = "trigger-webhook"
TRIGGER_SCHEDULE = "trigger-schedule"
TRIGGER_PLUGIN = "trigger-plugin"
HUMAN_INPUT = "human-input"
NodeType: TypeAlias = str
@property
def is_trigger_node(self) -> bool:
"""Check if this node type is a trigger node."""
return self in [
NodeType.TRIGGER_WEBHOOK,
NodeType.TRIGGER_SCHEDULE,
NodeType.TRIGGER_PLUGIN,
]
@property
def is_start_node(self) -> bool:
"""Check if this node type can serve as a workflow entry point."""
return self in [
NodeType.START,
NodeType.DATASOURCE,
NodeType.TRIGGER_WEBHOOK,
NodeType.TRIGGER_SCHEDULE,
NodeType.TRIGGER_PLUGIN,
]
class BuiltinNodeTypes:
"""Built-in node type string constants.
`node_type` values are plain strings throughout the graph runtime. This namespace
only exposes the built-in values shipped by `dify_graph`; downstream packages can
use additional strings without extending this class.
"""
START: ClassVar[NodeType] = "start"
END: ClassVar[NodeType] = "end"
ANSWER: ClassVar[NodeType] = "answer"
LLM: ClassVar[NodeType] = "llm"
KNOWLEDGE_RETRIEVAL: ClassVar[NodeType] = "knowledge-retrieval"
IF_ELSE: ClassVar[NodeType] = "if-else"
CODE: ClassVar[NodeType] = "code"
TEMPLATE_TRANSFORM: ClassVar[NodeType] = "template-transform"
QUESTION_CLASSIFIER: ClassVar[NodeType] = "question-classifier"
HTTP_REQUEST: ClassVar[NodeType] = "http-request"
TOOL: ClassVar[NodeType] = "tool"
DATASOURCE: ClassVar[NodeType] = "datasource"
VARIABLE_AGGREGATOR: ClassVar[NodeType] = "variable-aggregator"
LEGACY_VARIABLE_AGGREGATOR: ClassVar[NodeType] = "variable-assigner"
LOOP: ClassVar[NodeType] = "loop"
LOOP_START: ClassVar[NodeType] = "loop-start"
LOOP_END: ClassVar[NodeType] = "loop-end"
ITERATION: ClassVar[NodeType] = "iteration"
ITERATION_START: ClassVar[NodeType] = "iteration-start"
PARAMETER_EXTRACTOR: ClassVar[NodeType] = "parameter-extractor"
VARIABLE_ASSIGNER: ClassVar[NodeType] = "assigner"
DOCUMENT_EXTRACTOR: ClassVar[NodeType] = "document-extractor"
LIST_OPERATOR: ClassVar[NodeType] = "list-operator"
AGENT: ClassVar[NodeType] = "agent"
HUMAN_INPUT: ClassVar[NodeType] = "human-input"
BUILT_IN_NODE_TYPES: tuple[NodeType, ...] = (
BuiltinNodeTypes.START,
BuiltinNodeTypes.END,
BuiltinNodeTypes.ANSWER,
BuiltinNodeTypes.LLM,
BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL,
BuiltinNodeTypes.IF_ELSE,
BuiltinNodeTypes.CODE,
BuiltinNodeTypes.TEMPLATE_TRANSFORM,
BuiltinNodeTypes.QUESTION_CLASSIFIER,
BuiltinNodeTypes.HTTP_REQUEST,
BuiltinNodeTypes.TOOL,
BuiltinNodeTypes.DATASOURCE,
BuiltinNodeTypes.VARIABLE_AGGREGATOR,
BuiltinNodeTypes.LEGACY_VARIABLE_AGGREGATOR,
BuiltinNodeTypes.LOOP,
BuiltinNodeTypes.LOOP_START,
BuiltinNodeTypes.LOOP_END,
BuiltinNodeTypes.ITERATION,
BuiltinNodeTypes.ITERATION_START,
BuiltinNodeTypes.PARAMETER_EXTRACTOR,
BuiltinNodeTypes.VARIABLE_ASSIGNER,
BuiltinNodeTypes.DOCUMENT_EXTRACTOR,
BuiltinNodeTypes.LIST_OPERATOR,
BuiltinNodeTypes.AGENT,
BuiltinNodeTypes.HUMAN_INPUT,
)
class NodeExecutionType(StrEnum):
@@ -236,7 +252,6 @@ class WorkflowNodeExecutionMetadataKey(StrEnum):
CURRENCY = "currency"
TOOL_INFO = "tool_info"
AGENT_LOG = "agent_log"
TRIGGER_INFO = "trigger_info"
ITERATION_ID = "iteration_id"
ITERATION_INDEX = "iteration_index"
LOOP_ID = "loop_id"

View File

@@ -83,50 +83,6 @@ class Graph:
return node_configs_map
@classmethod
def _find_root_node_id(
cls,
node_configs_map: Mapping[str, NodeConfigDict],
edge_configs: Sequence[Mapping[str, object]],
root_node_id: str | None = None,
) -> str:
"""
Find the root node ID if not specified.
:param node_configs_map: mapping of node ID to node config
:param edge_configs: list of edge configurations
:param root_node_id: explicitly specified root node ID
:return: determined root node ID
"""
if root_node_id:
if root_node_id not in node_configs_map:
raise ValueError(f"Root node id {root_node_id} not found in the graph")
return root_node_id
# Find nodes with no incoming edges
nodes_with_incoming: set[str] = set()
for edge_config in edge_configs:
target = edge_config.get("target")
if isinstance(target, str):
nodes_with_incoming.add(target)
root_candidates = [nid for nid in node_configs_map if nid not in nodes_with_incoming]
# Prefer START node if available
start_node_id = None
for nid in root_candidates:
node_data = node_configs_map[nid]["data"]
if node_data.type.is_start_node:
start_node_id = nid
break
root_node_id = start_node_id or (root_candidates[0] if root_candidates else None)
if not root_node_id:
raise ValueError("Unable to determine root node ID")
return root_node_id
@classmethod
def _build_edges(
cls, edge_configs: list[dict[str, object]]
@@ -301,15 +257,15 @@ class Graph:
*,
graph_config: Mapping[str, object],
node_factory: NodeFactory,
root_node_id: str | None = None,
root_node_id: str,
skip_validation: bool = False,
) -> Graph:
"""
Initialize graph
Initialize a graph with an explicit execution entry point.
:param graph_config: graph config containing nodes and edges
:param node_factory: factory for creating node instances from config data
:param root_node_id: root node id
:param root_node_id: active root node id
:return: graph instance
"""
# Parse configs
@@ -327,8 +283,8 @@ class Graph:
# Parse node configurations
node_configs_map = cls._parse_node_configs(node_configs)
# Find root node
root_node_id = cls._find_root_node_id(node_configs_map, edge_configs, root_node_id)
if root_node_id not in node_configs_map:
raise ValueError(f"Root node id {root_node_id} not found in the graph")
# Build edges
edges, in_edges, out_edges = cls._build_edges(edge_configs)

View File

@@ -4,7 +4,7 @@ from collections.abc import Sequence
from dataclasses import dataclass
from typing import TYPE_CHECKING, Protocol
from dify_graph.enums import NodeExecutionType, NodeType
from dify_graph.enums import BuiltinNodeTypes, NodeExecutionType, NodeType
if TYPE_CHECKING:
from .graph import Graph
@@ -71,7 +71,7 @@ class _RootNodeValidator:
"""Validates root node invariants."""
invalid_root_code: str = "INVALID_ROOT"
container_entry_types: tuple[NodeType, ...] = (NodeType.ITERATION_START, NodeType.LOOP_START)
container_entry_types: tuple[NodeType, ...] = (BuiltinNodeTypes.ITERATION_START, BuiltinNodeTypes.LOOP_START)
def validate(self, graph: Graph) -> Sequence[GraphValidationIssue]:
root_node = graph.root_node
@@ -86,7 +86,7 @@ class _RootNodeValidator:
)
return issues
node_type = getattr(root_node, "node_type", None)
node_type = root_node.node_type
if root_node.execution_type != NodeExecutionType.ROOT and node_type not in self.container_entry_types:
issues.append(
GraphValidationIssue(
@@ -114,45 +114,9 @@ class GraphValidator:
raise GraphValidationError(issues)
@dataclass(frozen=True, slots=True)
class _TriggerStartExclusivityValidator:
"""Ensures trigger nodes do not coexist with UserInput (start) nodes."""
conflict_code: str = "TRIGGER_START_NODE_CONFLICT"
def validate(self, graph: Graph) -> Sequence[GraphValidationIssue]:
start_node_id: str | None = None
trigger_node_ids: list[str] = []
for node in graph.nodes.values():
node_type = getattr(node, "node_type", None)
if not isinstance(node_type, NodeType):
continue
if node_type == NodeType.START:
start_node_id = node.id
elif node_type.is_trigger_node:
trigger_node_ids.append(node.id)
if start_node_id and trigger_node_ids:
trigger_list = ", ".join(trigger_node_ids)
return [
GraphValidationIssue(
code=self.conflict_code,
message=(
f"UserInput (start) node '{start_node_id}' cannot coexist with trigger nodes: {trigger_list}."
),
node_id=start_node_id,
)
]
return []
_DEFAULT_RULES: tuple[GraphValidationRule, ...] = (
_EdgeEndpointValidator(),
_RootNodeValidator(),
_TriggerStartExclusivityValidator(),
)

View File

@@ -6,5 +6,6 @@ of responses based on upstream node outputs and constants.
"""
from .coordinator import ResponseStreamCoordinator
from .session import RESPONSE_SESSION_NODE_TYPES
__all__ = ["ResponseStreamCoordinator"]
__all__ = ["RESPONSE_SESSION_NODE_TYPES", "ResponseStreamCoordinator"]

View File

@@ -3,19 +3,34 @@ Internal response session management for response coordinator.
This module contains the private ResponseSession class used internally
by ResponseStreamCoordinator to manage streaming sessions.
`RESPONSE_SESSION_NODE_TYPES` is intentionally mutable so downstream applications
can opt additional response-capable node types into session creation without
patching the coordinator.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol, cast
from dify_graph.nodes.answer.answer_node import AnswerNode
from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.nodes.base.template import Template
from dify_graph.nodes.end.end_node import EndNode
from dify_graph.nodes.knowledge_index import KnowledgeIndexNode
from dify_graph.runtime.graph_runtime_state import NodeProtocol
class _ResponseSessionNodeProtocol(NodeProtocol, Protocol):
"""Structural contract required from nodes that can open a response session."""
def get_streaming_template(self) -> Template: ...
RESPONSE_SESSION_NODE_TYPES: list[NodeType] = [
BuiltinNodeTypes.ANSWER,
BuiltinNodeTypes.END,
]
@dataclass
class ResponseSession:
"""
@@ -33,10 +48,9 @@ class ResponseSession:
"""
Create a ResponseSession from a response-capable node.
The parameter is typed as `NodeProtocol` because the graph is exposed behind a protocol at the runtime layer,
but at runtime this must be an `AnswerNode`, `EndNode`, or `KnowledgeIndexNode` that provides:
- `id: str`
- `get_streaming_template() -> Template`
The parameter is typed as `NodeProtocol` because the graph is exposed behind a protocol at the runtime layer.
At runtime this must be a node whose `node_type` is listed in `RESPONSE_SESSION_NODE_TYPES`
and which implements `get_streaming_template()`.
Args:
node: Node from the materialized workflow graph.
@@ -47,11 +61,22 @@ class ResponseSession:
Raises:
TypeError: If node is not a supported response node type.
"""
if not isinstance(node, AnswerNode | EndNode | KnowledgeIndexNode):
raise TypeError("ResponseSession.from_node only supports AnswerNode, EndNode, or KnowledgeIndexNode")
if node.node_type not in RESPONSE_SESSION_NODE_TYPES:
supported_node_types = ", ".join(RESPONSE_SESSION_NODE_TYPES)
raise TypeError(
"ResponseSession.from_node only supports node types in "
f"RESPONSE_SESSION_NODE_TYPES: {supported_node_types}"
)
response_node = cast(_ResponseSessionNodeProtocol, node)
try:
template = response_node.get_streaming_template()
except AttributeError as exc:
raise TypeError("ResponseSession.from_node requires get_streaming_template() on response nodes") from exc
return cls(
node_id=node.id,
template=node.get_streaming_template(),
template=template,
)
def is_complete(self) -> bool:

View File

@@ -1,9 +1,9 @@
from collections.abc import Sequence
from collections.abc import Mapping, Sequence
from datetime import datetime
from typing import Any
from pydantic import Field
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from dify_graph.entities.pause_reason import PauseReason
from dify_graph.file import File
from dify_graph.model_runtime.entities.llm_entities import LLMUsage
@@ -13,7 +13,7 @@ from .base import NodeEventBase
class RunRetrieverResourceEvent(NodeEventBase):
retriever_resources: Sequence[RetrievalSourceMetadata] = Field(..., description="retriever resources")
retriever_resources: Sequence[Mapping[str, Any]] = Field(..., description="retriever resources")
context: str = Field(..., description="context")
context_files: list[File] | None = Field(default=None, description="context files")

Some files were not shown because too many files have changed in this diff Show More