Compare commits

...

31 Commits

Author SHA1 Message Date
yyh
b5c5cb72f0 update 2026-03-15 22:54:36 +08:00
yyh
1863cdbff0 update 2026-03-15 22:48:45 +08:00
yyh
cc769b9dd8 update 2026-03-15 22:47:21 +08:00
yyh
6f8ff490a8 update 2026-03-15 22:42:13 +08:00
yyh
a9ff96113a update 2026-03-15 22:40:47 +08:00
yyh
0d549af0a9 docs(web): clarify query and mutation guidance 2026-03-15 22:37:58 +08:00
yyh
2a04f59984 update 2026-03-15 22:32:51 +08:00
yyh
a9f507cd2e update 2026-03-15 22:30:42 +08:00
yyh
96fd3bde1b update 2026-03-15 22:27:32 +08:00
yyh
f9ded4960a chore: update agents guidance in web/ 2026-03-15 22:21:05 +08:00
yyh
c1e8735f7a Merge remote-tracking branch 'origin/main' into fix/use-base 2026-03-15 22:04:32 +08:00
Ye Ding
f795d24151 feat: Add Hologres as a VDB & FullText DB choice (#32830)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Crazywoola <100913391+crazywoola@users.noreply.github.com>
2026-03-15 20:55:02 +08:00
-LAN-
0fa7548346 fix: update INTERNAL_FILES_URL example default for Docker Desktop (#33447)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
2026-03-15 19:27:46 +08:00
RickDamon
ac8021fe27 fix: add doc_type to Weaviate properties and default Vector attributes (#33398)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-15 19:25:24 +08:00
lif
f21288df5a fix: preserve custom icons in exported DSL (#33424)
Signed-off-by: majiayu000 <1835304752@qq.com>
2026-03-15 18:52:41 +08:00
yyh
8f5d65a7fc Merge branch 'main' into fix/use-base 2026-03-15 17:19:38 +08:00
yyh
8f8477a19f fix notes 2026-03-15 17:06:40 +08:00
-LAN-
fb41b215c8 refactor(api): move workflow knowledge nodes and trigger nodes (#33445) 2026-03-15 15:24:59 +08:00
yyh
214f24bcd3 trigger ci
Signed-off-by: yyh <yuanyouhuilyz@gmail.com>
2026-03-15 14:41:43 +08:00
yyh
1677a52900 fix 2026-03-15 13:56:43 +08:00
yyh
e054d5d8b3 fix: stabilize useInvalid and useReset with useCallback and mark as deprecated
Both hooks returned a new function reference on every render, causing
downstream useCallback dependencies to be invalidated unnecessarily.
Wrap return values with useCallback and mark both as @deprecated in
favor of using useQueryClient() directly.
2026-03-15 13:42:52 +08:00
99
1b6e695520 refactor(workflow): move agent node back to core workflow (#33431)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
2026-03-14 22:33:13 +08:00
tmimmanuel
98df8e1d6c refactor: replace remaining sa.String with EnumText 2 (#33448)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
2026-03-14 20:20:30 +09:00
Niels Kaspers
79a8747c1b refactor(tests): use enum values instead of string literals for status fields (#33441) 2026-03-14 16:14:34 +09:00
tmimmanuel
e64f4d6039 refactor: replace sa.String with EnumText in mapped_column for type s… (#33332)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 13:38:27 +09:00
dependabot[bot]
6043ec4423 chore(deps): bump fickling from 0.1.9 to 0.1.10 in /api (#33436)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-14 13:33:24 +09:00
dependabot[bot]
a600794370 chore(deps): bump pyjwt from 2.11.0 to 2.12.0 in /api (#33437)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-14 13:32:55 +09:00
非法操作
573b4e41cb fix: correctly detect required columns in archived workflow run restore (#33403)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-13 23:24:40 +08:00
yevanmore
194c205ed3 fix(api): allow punctuation in uploaded filenames (#33364)
Co-authored-by: Chen Yefan <cyefan2@gmail.com>
2026-03-13 21:33:09 +08:00
Coding On Star
7e1dc3c122 refactor(web): split share text-generation and add high-coverage tests (#33408)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
Co-authored-by: CodingOnStar <hanxujiang@dify.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-13 19:10:24 +08:00
Stephen Zhou
4203647c32 chore: use vite plus (#33407) 2026-03-13 18:18:44 +08:00
343 changed files with 7714 additions and 3744 deletions

View File

@@ -187,53 +187,13 @@ const Template = useMemo(() => {
**When**: Component directly handles API calls, data transformation, or complex async operations.
**Dify Convention**: Use `@tanstack/react-query` hooks from `web/service/use-*.ts` or create custom data hooks.
```typescript
// ❌ Before: API logic in component
const MCPServiceCard = () => {
const [basicAppConfig, setBasicAppConfig] = useState({})
useEffect(() => {
if (isBasicApp && appId) {
(async () => {
const res = await fetchAppDetail({ url: '/apps', id: appId })
setBasicAppConfig(res?.model_config || {})
})()
}
}, [appId, isBasicApp])
// More API-related logic...
}
// ✅ After: Extract to data hook using React Query
// use-app-config.ts
import { useQuery } from '@tanstack/react-query'
import { get } from '@/service/base'
const NAME_SPACE = 'appConfig'
export const useAppConfig = (appId: string, isBasicApp: boolean) => {
return useQuery({
enabled: isBasicApp && !!appId,
queryKey: [NAME_SPACE, 'detail', appId],
queryFn: () => get<AppDetailResponse>(`/apps/${appId}`),
select: data => data?.model_config || {},
})
}
// Component becomes cleaner
const MCPServiceCard = () => {
const { data: config, isLoading } = useAppConfig(appId, isBasicApp)
// UI only
}
```
**React Query Best Practices in Dify**:
- Define `NAME_SPACE` for query key organization
- Use `enabled` option for conditional fetching
- Use `select` for data transformation
- Export invalidation hooks: `useInvalidXxx`
**Dify Convention**:
- This skill is for component decomposition, not query/mutation design.
- When refactoring data fetching, follow `web/AGENTS.md`.
- Use `orpc-contract-first` for contracts, query shape, data-fetching wrappers, and query/mutation call-site patterns.
- Use `web/docs/query-mutation.md` for Dify-specific conditional query, invalidation, and mutation error-handling rules.
- Do not introduce deprecated `useInvalid` / `useReset`.
- Do not add thin passthrough `useQuery` wrappers during refactoring; only extract a custom hook when it truly orchestrates multiple queries/mutations or shared derived state.
**Dify Examples**:
- `web/service/use-workflow.ts`

View File

@@ -155,48 +155,15 @@ const Configuration: FC = () => {
## Common Hook Patterns in Dify
### 1. Data Fetching Hook (React Query)
### 1. Data Fetching / Mutation Hooks
```typescript
// Pattern: Use @tanstack/react-query for data fetching
import { useQuery, useQueryClient } from '@tanstack/react-query'
import { get } from '@/service/base'
import { useInvalid } from '@/service/use-base'
When hook extraction touches query or mutation code, do not use this reference as the source of truth for data-layer patterns.
const NAME_SPACE = 'appConfig'
// Query keys for cache management
export const appConfigQueryKeys = {
detail: (appId: string) => [NAME_SPACE, 'detail', appId] as const,
}
// Main data hook
export const useAppConfig = (appId: string) => {
return useQuery({
enabled: !!appId,
queryKey: appConfigQueryKeys.detail(appId),
queryFn: () => get<AppDetailResponse>(`/apps/${appId}`),
select: data => data?.model_config || null,
})
}
// Invalidation hook for refreshing data
export const useInvalidAppConfig = () => {
return useInvalid([NAME_SPACE])
}
// Usage in component
const Component = () => {
const { data: config, isLoading, error, refetch } = useAppConfig(appId)
const invalidAppConfig = useInvalidAppConfig()
const handleRefresh = () => {
invalidAppConfig() // Invalidates cache and triggers refetch
}
return <div>...</div>
}
```
- Follow `web/AGENTS.md` first.
- Use `orpc-contract-first` for contracts, query shape, data-fetching wrappers, and query/mutation call-site patterns.
- Use `web/docs/query-mutation.md` for conditional query, invalidation, and mutation error-handling rules.
- Do not introduce deprecated `useInvalid` / `useReset`.
- Do not extract thin passthrough `useQuery` hooks; only extract orchestration hooks.
### 2. Form State Hook

View File

@@ -1,33 +1,13 @@
name: Setup Web Environment
description: Setup pnpm, Node.js, and install web dependencies.
inputs:
node-version:
description: Node.js version to use
required: false
default: "22"
install-dependencies:
description: Whether to install web dependencies after setting up Node.js
required: false
default: "true"
runs:
using: composite
steps:
- name: Install pnpm
uses: pnpm/action-setup@41ff72655975bd51cab0327fa583b6e92b6d3061 # v4.2.0
- name: Setup Vite+
uses: voidzero-dev/setup-vp@b5d848f5a62488f3d3d920f8aa6ac318a60c5f07 # v1
with:
package_json_file: web/package.json
run_install: false
- name: Setup Node.js
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # v6.3.0
with:
node-version: ${{ inputs.node-version }}
cache: pnpm
cache-dependency-path: ./web/pnpm-lock.yaml
- name: Install dependencies
if: ${{ inputs.install-dependencies == 'true' }}
shell: bash
run: pnpm --dir web install --frozen-lockfile
node-version-file: "./web/.nvmrc"
cache: true
run-install: |
- cwd: ./web
args: ['--frozen-lockfile']

View File

@@ -102,13 +102,11 @@ jobs:
- name: Setup web environment
if: steps.web-changes.outputs.any_changed == 'true'
uses: ./.github/actions/setup-web
with:
node-version: "24"
- name: ESLint autofix
if: steps.web-changes.outputs.any_changed == 'true'
run: |
cd web
pnpm eslint --concurrency=2 --prune-suppressions --quiet || true
vp exec eslint --concurrency=2 --prune-suppressions --quiet || true
- uses: autofix-ci/action@7a166d7532b277f34e16238930461bf77f9d7ed8 # v1.3.3

View File

@@ -88,7 +88,7 @@ jobs:
if: steps.changed-files.outputs.any_changed == 'true'
working-directory: ./web
run: |
pnpm run lint:ci
vp run lint:ci
# pnpm run lint:report
# continue-on-error: true
@@ -102,17 +102,17 @@ jobs:
- name: Web tsslint
if: steps.changed-files.outputs.any_changed == 'true'
working-directory: ./web
run: pnpm run lint:tss
run: vp run lint:tss
- name: Web type check
if: steps.changed-files.outputs.any_changed == 'true'
working-directory: ./web
run: pnpm run type-check
run: vp run type-check
- name: Web dead code check
if: steps.changed-files.outputs.any_changed == 'true'
working-directory: ./web
run: pnpm run knip
run: vp run knip
superlinter:
name: SuperLinter

View File

@@ -50,8 +50,6 @@ jobs:
- name: Setup web environment
uses: ./.github/actions/setup-web
with:
install-dependencies: "false"
- name: Detect changed files and generate diff
id: detect_changes

View File

@@ -43,7 +43,7 @@ jobs:
uses: ./.github/actions/setup-web
- name: Run tests
run: pnpm vitest run --reporter=blob --shard=${{ matrix.shardIndex }}/${{ matrix.shardTotal }} --coverage
run: vp test run --reporter=blob --shard=${{ matrix.shardIndex }}/${{ matrix.shardTotal }} --coverage
- name: Upload blob report
if: ${{ !cancelled() }}
@@ -84,7 +84,7 @@ jobs:
merge-multiple: true
- name: Merge reports
run: pnpm vitest --merge-reports --reporter=json --reporter=agent --coverage
run: vp test --merge-reports --reporter=json --reporter=agent --coverage
- name: Check app/components diff coverage
env:
@@ -447,4 +447,4 @@ jobs:
- name: Web build check
if: steps.changed-files.outputs.any_changed == 'true'
working-directory: ./web
run: pnpm run build
run: vp run build

3
.gitignore vendored
View File

@@ -237,3 +237,6 @@ scripts/stress-test/reports/
# settings
*.local.json
*.local.md
# Code Agent Folder
.qoder/*

View File

@@ -22,10 +22,10 @@ APP_WEB_URL=http://localhost:3000
# Files URL
FILES_URL=http://localhost:5001
# INTERNAL_FILES_URL is used for plugin daemon communication within Docker network.
# Set this to the internal Docker service URL for proper plugin file access.
# Example: INTERNAL_FILES_URL=http://api:5001
INTERNAL_FILES_URL=http://127.0.0.1:5001
# INTERNAL_FILES_URL is used by services running in Docker to reach the API file endpoints.
# For Docker Desktop (Mac/Windows), use http://host.docker.internal:5001 when the API runs on the host.
# For Docker Compose on Linux, use http://api:5001 when the API runs inside the Docker network.
INTERNAL_FILES_URL=http://host.docker.internal:5001
# TRIGGER URL
TRIGGER_URL=http://localhost:5001
@@ -180,7 +180,7 @@ CONSOLE_CORS_ALLOW_ORIGINS=http://localhost:3000,*
COOKIE_DOMAIN=
# Vector database configuration
# Supported values are `weaviate`, `oceanbase`, `qdrant`, `milvus`, `myscale`, `relyt`, `pgvector`, `pgvecto-rs`, `chroma`, `opensearch`, `oracle`, `tencent`, `elasticsearch`, `elasticsearch-ja`, `analyticdb`, `couchbase`, `vikingdb`, `opengauss`, `tablestore`,`vastbase`,`tidb`,`tidb_on_qdrant`,`baidu`,`lindorm`,`huawei_cloud`,`upstash`, `matrixone`.
# Supported values are `weaviate`, `oceanbase`, `qdrant`, `milvus`, `myscale`, `relyt`, `pgvector`, `pgvecto-rs`, `chroma`, `opensearch`, `oracle`, `tencent`, `elasticsearch`, `elasticsearch-ja`, `analyticdb`, `couchbase`, `vikingdb`, `opengauss`, `tablestore`,`vastbase`,`tidb`,`tidb_on_qdrant`,`baidu`,`lindorm`,`huawei_cloud`,`upstash`, `matrixone`, `hologres`.
VECTOR_STORE=weaviate
# Prefix used to create collection name in vector database
VECTOR_INDEX_NAME_PREFIX=Vector_index
@@ -217,6 +217,20 @@ COUCHBASE_PASSWORD=password
COUCHBASE_BUCKET_NAME=Embeddings
COUCHBASE_SCOPE_NAME=_default
# Hologres configuration
# access_key_id is used as the PG username, access_key_secret is used as the PG password
HOLOGRES_HOST=
HOLOGRES_PORT=80
HOLOGRES_DATABASE=
HOLOGRES_ACCESS_KEY_ID=
HOLOGRES_ACCESS_KEY_SECRET=
HOLOGRES_SCHEMA=public
HOLOGRES_TOKENIZER=jieba
HOLOGRES_DISTANCE_METHOD=Cosine
HOLOGRES_BASE_QUANTIZATION_TYPE=rabitq
HOLOGRES_MAX_DEGREE=64
HOLOGRES_EF_CONSTRUCTION=400
# Milvus configuration
MILVUS_URI=http://127.0.0.1:19530
MILVUS_TOKEN=

View File

@@ -43,7 +43,6 @@ forbidden_modules =
extensions.ext_redis
allow_indirect_imports = True
ignore_imports =
dify_graph.nodes.agent.agent_node -> extensions.ext_database
dify_graph.nodes.llm.node -> extensions.ext_database
dify_graph.model_runtime.model_providers.__base.ai_model -> extensions.ext_redis
dify_graph.model_runtime.model_providers.model_provider_factory -> extensions.ext_redis
@@ -90,9 +89,6 @@ forbidden_modules =
core.trigger
core.variables
ignore_imports =
dify_graph.nodes.agent.agent_node -> core.model_manager
dify_graph.nodes.agent.agent_node -> core.provider_manager
dify_graph.nodes.agent.agent_node -> core.tools.tool_manager
dify_graph.nodes.llm.llm_utils -> core.model_manager
dify_graph.nodes.llm.protocols -> core.model_manager
dify_graph.nodes.llm.llm_utils -> dify_graph.model_runtime.model_providers.__base.large_language_model
@@ -100,9 +96,6 @@ ignore_imports =
dify_graph.nodes.tool.tool_node -> core.callback_handler.workflow_tool_callback_handler
dify_graph.nodes.tool.tool_node -> core.tools.tool_engine
dify_graph.nodes.tool.tool_node -> core.tools.tool_manager
dify_graph.nodes.agent.agent_node -> core.agent.entities
dify_graph.nodes.agent.agent_node -> core.agent.plugin_entities
dify_graph.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.app.app_config.entities
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.advanced_prompt_transform
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.simple_prompt_transform
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> dify_graph.model_runtime.model_providers.__base.large_language_model
@@ -110,12 +103,10 @@ ignore_imports =
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.model_manager
dify_graph.nodes.question_classifier.question_classifier_node -> core.model_manager
dify_graph.nodes.tool.tool_node -> core.tools.utils.message_transformer
dify_graph.nodes.agent.agent_node -> models.model
dify_graph.nodes.llm.node -> core.helper.code_executor
dify_graph.nodes.llm.node -> core.llm_generator.output_parser.errors
dify_graph.nodes.llm.node -> core.llm_generator.output_parser.structured_output
dify_graph.nodes.llm.node -> core.model_manager
dify_graph.nodes.agent.entities -> core.prompt.entities.advanced_prompt_entities
dify_graph.nodes.llm.entities -> core.prompt.entities.advanced_prompt_entities
dify_graph.nodes.llm.node -> core.prompt.entities.advanced_prompt_entities
dify_graph.nodes.llm.node -> core.prompt.utils.prompt_message_util
@@ -124,17 +115,12 @@ ignore_imports =
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.utils.prompt_message_util
dify_graph.nodes.question_classifier.entities -> core.prompt.entities.advanced_prompt_entities
dify_graph.nodes.question_classifier.question_classifier_node -> core.prompt.utils.prompt_message_util
dify_graph.nodes.knowledge_index.entities -> core.rag.retrieval.retrieval_methods
dify_graph.nodes.llm.node -> models.dataset
dify_graph.nodes.agent.agent_node -> core.tools.utils.message_transformer
dify_graph.nodes.llm.file_saver -> core.tools.signature
dify_graph.nodes.llm.file_saver -> core.tools.tool_file_manager
dify_graph.nodes.tool.tool_node -> core.tools.errors
dify_graph.nodes.agent.agent_node -> extensions.ext_database
dify_graph.nodes.llm.node -> extensions.ext_database
dify_graph.nodes.agent.agent_node -> models
dify_graph.nodes.llm.node -> models.model
dify_graph.nodes.agent.agent_node -> services
dify_graph.nodes.tool.tool_node -> services
dify_graph.model_runtime.model_providers.__base.ai_model -> configs
dify_graph.model_runtime.model_providers.__base.ai_model -> extensions.ext_redis

View File

@@ -160,6 +160,7 @@ def migrate_knowledge_vector_database():
}
lower_collection_vector_types = {
VectorType.ANALYTICDB,
VectorType.HOLOGRES,
VectorType.CHROMA,
VectorType.MYSCALE,
VectorType.PGVECTO_RS,

View File

@@ -26,6 +26,7 @@ from .vdb.chroma_config import ChromaConfig
from .vdb.clickzetta_config import ClickzettaConfig
from .vdb.couchbase_config import CouchbaseConfig
from .vdb.elasticsearch_config import ElasticsearchConfig
from .vdb.hologres_config import HologresConfig
from .vdb.huawei_cloud_config import HuaweiCloudConfig
from .vdb.iris_config import IrisVectorConfig
from .vdb.lindorm_config import LindormConfig
@@ -347,6 +348,7 @@ class MiddlewareConfig(
AnalyticdbConfig,
ChromaConfig,
ClickzettaConfig,
HologresConfig,
HuaweiCloudConfig,
IrisVectorConfig,
MilvusConfig,

View File

@@ -0,0 +1,68 @@
from holo_search_sdk.types import BaseQuantizationType, DistanceType, TokenizerType
from pydantic import Field
from pydantic_settings import BaseSettings
class HologresConfig(BaseSettings):
"""
Configuration settings for Hologres vector database.
Hologres is compatible with PostgreSQL protocol.
access_key_id is used as the PostgreSQL username,
and access_key_secret is used as the PostgreSQL password.
"""
HOLOGRES_HOST: str | None = Field(
description="Hostname or IP address of the Hologres instance.",
default=None,
)
HOLOGRES_PORT: int = Field(
description="Port number for connecting to the Hologres instance.",
default=80,
)
HOLOGRES_DATABASE: str | None = Field(
description="Name of the Hologres database to connect to.",
default=None,
)
HOLOGRES_ACCESS_KEY_ID: str | None = Field(
description="Alibaba Cloud AccessKey ID, also used as the PostgreSQL username.",
default=None,
)
HOLOGRES_ACCESS_KEY_SECRET: str | None = Field(
description="Alibaba Cloud AccessKey Secret, also used as the PostgreSQL password.",
default=None,
)
HOLOGRES_SCHEMA: str = Field(
description="Schema name in the Hologres database.",
default="public",
)
HOLOGRES_TOKENIZER: TokenizerType = Field(
description="Tokenizer for full-text search index (e.g., 'jieba', 'ik', 'standard', 'simple').",
default="jieba",
)
HOLOGRES_DISTANCE_METHOD: DistanceType = Field(
description="Distance method for vector index (e.g., 'Cosine', 'Euclidean', 'InnerProduct').",
default="Cosine",
)
HOLOGRES_BASE_QUANTIZATION_TYPE: BaseQuantizationType = Field(
description="Base quantization type for vector index (e.g., 'rabitq', 'sq8', 'fp16', 'fp32').",
default="rabitq",
)
HOLOGRES_MAX_DEGREE: int = Field(
description="Max degree (M) parameter for HNSW vector index.",
default=64,
)
HOLOGRES_EF_CONSTRUCTION: int = Field(
description="ef_construction parameter for HNSW vector index.",
default=400,
)

View File

@@ -25,7 +25,8 @@ from controllers.console.wraps import (
)
from core.ops.ops_trace_manager import OpsTraceManager
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from dify_graph.enums import NodeType, WorkflowExecutionStatus
from core.trigger.constants import TRIGGER_NODE_TYPES
from dify_graph.enums import WorkflowExecutionStatus
from dify_graph.file import helpers as file_helpers
from extensions.ext_database import db
from libs.login import current_account_with_tenant, login_required
@@ -508,11 +509,7 @@ class AppListApi(Resource):
.scalars()
.all()
)
trigger_node_types = {
NodeType.TRIGGER_WEBHOOK,
NodeType.TRIGGER_SCHEDULE,
NodeType.TRIGGER_PLUGIN,
}
trigger_node_types = TRIGGER_NODE_TYPES
for workflow in draft_workflows:
node_id = None
try:

View File

@@ -1,5 +1,4 @@
import json
from enum import StrEnum
from flask_restx import Resource, marshal_with
from pydantic import BaseModel, Field
@@ -11,6 +10,7 @@ from controllers.console.wraps import account_initialization_required, edit_perm
from extensions.ext_database import db
from fields.app_fields import app_server_fields
from libs.login import current_account_with_tenant, login_required
from models.enums import AppMCPServerStatus
from models.model import AppMCPServer
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
@@ -19,11 +19,6 @@ DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
app_server_model = console_ns.model("AppServer", app_server_fields)
class AppMCPServerStatus(StrEnum):
ACTIVE = "active"
INACTIVE = "inactive"
class MCPServerCreatePayload(BaseModel):
description: str | None = Field(default=None, description="Server description")
parameters: dict = Field(..., description="Server parameters configuration")
@@ -117,9 +112,10 @@ class AppMCPServerController(Resource):
server.parameters = json.dumps(payload.parameters, ensure_ascii=False)
if payload.status:
if payload.status not in [status.value for status in AppMCPServerStatus]:
try:
server.status = AppMCPServerStatus(payload.status)
except ValueError:
raise ValueError("Invalid status")
server.status = payload.status
db.session.commit()
return server

View File

@@ -22,6 +22,7 @@ from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY
from core.app.entities.app_invoke_entities import InvokeFrom
from core.helper.trace_id_helper import get_external_trace_id
from core.plugin.impl.exc import PluginInvokeError
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
from core.trigger.debug.event_selectors import (
TriggerDebugEvent,
TriggerDebugEventPoller,
@@ -1209,7 +1210,7 @@ class DraftWorkflowTriggerNodeApi(Resource):
node_type: NodeType = draft_workflow.get_node_type_from_node_config(node_config)
event: TriggerDebugEvent | None = None
# for schedule trigger, when run single node, just execute directly
if node_type == NodeType.TRIGGER_SCHEDULE:
if node_type == TRIGGER_SCHEDULE_NODE_TYPE:
event = TriggerDebugEvent(
workflow_args={},
node_id=node_id,

View File

@@ -263,6 +263,7 @@ def _get_retrieval_methods_by_vector_type(vector_type: str | None, is_mock: bool
VectorType.BAIDU,
VectorType.ALIBABACLOUD_MYSQL,
VectorType.IRIS,
VectorType.HOLOGRES,
}
semantic_methods = {"retrieval_method": [RetrievalMethod.SEMANTIC_SEARCH.value]}

View File

@@ -43,6 +43,7 @@ from libs.datetime_utils import naive_utc_now
from libs.helper import EmailStr, TimestampField, extract_remote_ip, timezone
from libs.login import current_account_with_tenant, login_required
from models import AccountIntegrate, InvitationCode
from models.account import AccountStatus, InvitationCodeStatus
from services.account_service import AccountService
from services.billing_service import BillingService
from services.errors.account import CurrentPasswordIncorrectError as ServiceCurrentPasswordIncorrectError
@@ -215,7 +216,7 @@ class AccountInitApi(Resource):
db.session.query(InvitationCode)
.where(
InvitationCode.code == args.invitation_code,
InvitationCode.status == "unused",
InvitationCode.status == InvitationCodeStatus.UNUSED,
)
.first()
)
@@ -223,7 +224,7 @@ class AccountInitApi(Resource):
if not invitation_code:
raise InvalidInvitationCodeError()
invitation_code.status = "used"
invitation_code.status = InvitationCodeStatus.USED
invitation_code.used_at = naive_utc_now()
invitation_code.used_by_tenant_id = account.current_tenant_id
invitation_code.used_by_account_id = account.id
@@ -231,7 +232,7 @@ class AccountInitApi(Resource):
account.interface_language = args.interface_language
account.timezone = args.timezone
account.interface_theme = "light"
account.status = "active"
account.status = AccountStatus.ACTIVE
account.initialized_at = naive_utc_now()
db.session.commit()

View File

@@ -5,6 +5,7 @@ from typing import Any, Literal
from flask import request, send_file
from flask_restx import Resource
from pydantic import BaseModel, Field
from werkzeug.datastructures import FileStorage
from werkzeug.exceptions import Forbidden
from configs import dify_config
@@ -169,6 +170,20 @@ register_enum_models(
)
def _read_upload_content(file: FileStorage, max_size: int) -> bytes:
"""
Read the uploaded file and validate its actual size before delegating to the plugin service.
FileStorage.content_length is not reliable for multipart test uploads and may be zero even when
content exists, so the controllers validate against the loaded bytes instead.
"""
content = file.read()
if len(content) > max_size:
raise ValueError("File size exceeds the maximum allowed size")
return content
@console_ns.route("/workspaces/current/plugin/debugging-key")
class PluginDebuggingKeyApi(Resource):
@setup_required
@@ -284,12 +299,7 @@ class PluginUploadFromPkgApi(Resource):
_, tenant_id = current_account_with_tenant()
file = request.files["pkg"]
# check file size
if file.content_length > dify_config.PLUGIN_MAX_PACKAGE_SIZE:
raise ValueError("File size exceeds the maximum allowed size")
content = file.read()
content = _read_upload_content(file, dify_config.PLUGIN_MAX_PACKAGE_SIZE)
try:
response = PluginService.upload_pkg(tenant_id, content)
except PluginDaemonClientSideError as e:
@@ -328,12 +338,7 @@ class PluginUploadFromBundleApi(Resource):
_, tenant_id = current_account_with_tenant()
file = request.files["bundle"]
# check file size
if file.content_length > dify_config.PLUGIN_MAX_BUNDLE_SIZE:
raise ValueError("File size exceeds the maximum allowed size")
content = file.read()
content = _read_upload_content(file, dify_config.PLUGIN_MAX_BUNDLE_SIZE)
try:
response = PluginService.upload_bundle(tenant_id, content)
except PluginDaemonClientSideError as e:

View File

@@ -6,13 +6,13 @@ from pydantic import BaseModel, Field, ValidationError
from sqlalchemy.orm import Session
from controllers.common.schema import register_schema_model
from controllers.console.app.mcp_server import AppMCPServerStatus
from controllers.mcp import mcp_ns
from core.mcp import types as mcp_types
from core.mcp.server.streamable_http import handle_mcp_request
from dify_graph.variables.input_entities import VariableEntity
from extensions.ext_database import db
from libs import helper
from models.enums import AppMCPServerStatus
from models.model import App, AppMCPServer, AppMode, EndUser

View File

@@ -6,6 +6,7 @@ from typing import Any
from core.agent.base_agent_runner import BaseAgentRunner
from core.agent.entities import AgentScratchpadUnit
from core.agent.errors import AgentMaxIterationError
from core.agent.output_parser.cot_output_parser import CotAgentOutputParser
from core.app.apps.base_app_queue_manager import PublishFrom
from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessageEndEvent, QueueMessageFileEvent
@@ -22,7 +23,6 @@ from dify_graph.model_runtime.entities.message_entities import (
ToolPromptMessage,
UserPromptMessage,
)
from dify_graph.nodes.agent.exc import AgentMaxIterationError
from models.model import Message
logger = logging.getLogger(__name__)

9
api/core/agent/errors.py Normal file
View File

@@ -0,0 +1,9 @@
class AgentMaxIterationError(Exception):
"""Raised when an agent runner exceeds the configured max iteration count."""
def __init__(self, max_iteration: int):
self.max_iteration = max_iteration
super().__init__(
f"Agent exceeded the maximum iteration limit of {max_iteration}. "
f"The agent was unable to complete the task within the allowed number of iterations."
)

View File

@@ -5,6 +5,7 @@ from copy import deepcopy
from typing import Any, Union
from core.agent.base_agent_runner import BaseAgentRunner
from core.agent.errors import AgentMaxIterationError
from core.app.apps.base_app_queue_manager import PublishFrom
from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessageEndEvent, QueueMessageFileEvent
from core.prompt.agent_history_prompt_transform import AgentHistoryPromptTransform
@@ -25,7 +26,6 @@ from dify_graph.model_runtime.entities import (
UserPromptMessage,
)
from dify_graph.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
from dify_graph.nodes.agent.exc import AgentMaxIterationError
from models.model import Message
logger = logging.getLogger(__name__)

View File

@@ -69,7 +69,7 @@ from dify_graph.entities.pause_reason import HumanInputRequired
from dify_graph.enums import WorkflowExecutionStatus
from dify_graph.model_runtime.entities.llm_entities import LLMUsage
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from dify_graph.nodes import NodeType
from dify_graph.nodes import BuiltinNodeTypes
from dify_graph.repositories.draft_variable_repository import DraftVariableSaverFactory
from dify_graph.runtime import GraphRuntimeState
from dify_graph.system_variable import SystemVariable
@@ -357,7 +357,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
) -> Generator[StreamResponse, None, None]:
"""Handle node succeeded events."""
# Record files if it's an answer node or end node
if event.node_type in [NodeType.ANSWER, NodeType.END, NodeType.LLM]:
if event.node_type in [BuiltinNodeTypes.ANSWER, BuiltinNodeTypes.END, BuiltinNodeTypes.LLM]:
self._recorded_files.extend(
self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {})
)

View File

@@ -48,12 +48,13 @@ from core.app.entities.task_entities import (
from core.plugin.impl.datasource import PluginDatasourceManager
from core.tools.entities.tool_entities import ToolProviderType
from core.tools.tool_manager import ToolManager
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from core.trigger.trigger_manager import TriggerManager
from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.entities.pause_reason import HumanInputRequired
from dify_graph.entities.workflow_start_reason import WorkflowStartReason
from dify_graph.enums import (
NodeType,
BuiltinNodeTypes,
SystemVariableKey,
WorkflowExecutionStatus,
WorkflowNodeExecutionMetadataKey,
@@ -442,7 +443,7 @@ class WorkflowResponseConverter:
event: QueueNodeStartedEvent,
task_id: str,
) -> NodeStartStreamResponse | None:
if event.node_type in {NodeType.ITERATION, NodeType.LOOP}:
if event.node_type in {BuiltinNodeTypes.ITERATION, BuiltinNodeTypes.LOOP}:
return None
run_id = self._ensure_workflow_run_id()
snapshot = self._store_snapshot(event)
@@ -464,13 +465,13 @@ class WorkflowResponseConverter:
)
try:
if event.node_type == NodeType.TOOL:
if event.node_type == BuiltinNodeTypes.TOOL:
response.data.extras["icon"] = ToolManager.get_tool_icon(
tenant_id=self._application_generate_entity.app_config.tenant_id,
provider_type=ToolProviderType(event.provider_type),
provider_id=event.provider_id,
)
elif event.node_type == NodeType.DATASOURCE:
elif event.node_type == BuiltinNodeTypes.DATASOURCE:
manager = PluginDatasourceManager()
provider_entity = manager.fetch_datasource_provider(
self._application_generate_entity.app_config.tenant_id,
@@ -479,7 +480,7 @@ class WorkflowResponseConverter:
response.data.extras["icon"] = provider_entity.declaration.identity.generate_datasource_icon_url(
self._application_generate_entity.app_config.tenant_id
)
elif event.node_type == NodeType.TRIGGER_PLUGIN:
elif event.node_type == TRIGGER_PLUGIN_NODE_TYPE:
response.data.extras["icon"] = TriggerManager.get_trigger_plugin_icon(
self._application_generate_entity.app_config.tenant_id,
event.provider_id,
@@ -496,7 +497,7 @@ class WorkflowResponseConverter:
event: QueueNodeSucceededEvent | QueueNodeFailedEvent | QueueNodeExceptionEvent,
task_id: str,
) -> NodeFinishStreamResponse | None:
if event.node_type in {NodeType.ITERATION, NodeType.LOOP}:
if event.node_type in {BuiltinNodeTypes.ITERATION, BuiltinNodeTypes.LOOP}:
return None
run_id = self._ensure_workflow_run_id()
snapshot = self._pop_snapshot(event.node_execution_id)
@@ -554,7 +555,7 @@ class WorkflowResponseConverter:
event: QueueNodeRetryEvent,
task_id: str,
) -> NodeRetryStreamResponse | None:
if event.node_type in {NodeType.ITERATION, NodeType.LOOP}:
if event.node_type in {BuiltinNodeTypes.ITERATION, BuiltinNodeTypes.LOOP}:
return None
run_id = self._ensure_workflow_run_id()
@@ -612,7 +613,7 @@ class WorkflowResponseConverter:
data=IterationNodeStartStreamResponse.Data(
id=event.node_id,
node_id=event.node_id,
node_type=event.node_type.value,
node_type=event.node_type,
title=event.node_title,
created_at=int(time.time()),
extras={},
@@ -635,7 +636,7 @@ class WorkflowResponseConverter:
data=IterationNodeNextStreamResponse.Data(
id=event.node_id,
node_id=event.node_id,
node_type=event.node_type.value,
node_type=event.node_type,
title=event.node_title,
index=event.index,
created_at=int(time.time()),
@@ -662,7 +663,7 @@ class WorkflowResponseConverter:
data=IterationNodeCompletedStreamResponse.Data(
id=event.node_id,
node_id=event.node_id,
node_type=event.node_type.value,
node_type=event.node_type,
title=event.node_title,
outputs=new_outputs,
outputs_truncated=outputs_truncated,
@@ -692,7 +693,7 @@ class WorkflowResponseConverter:
data=LoopNodeStartStreamResponse.Data(
id=event.node_id,
node_id=event.node_id,
node_type=event.node_type.value,
node_type=event.node_type,
title=event.node_title,
created_at=int(time.time()),
extras={},
@@ -715,7 +716,7 @@ class WorkflowResponseConverter:
data=LoopNodeNextStreamResponse.Data(
id=event.node_id,
node_id=event.node_id,
node_type=event.node_type.value,
node_type=event.node_type,
title=event.node_title,
index=event.index,
# The `pre_loop_output` field is not utilized by the frontend.
@@ -744,7 +745,7 @@ class WorkflowResponseConverter:
data=LoopNodeCompletedStreamResponse.Data(
id=event.node_id,
node_id=event.node_id,
node_type=event.node_type.value,
node_type=event.node_type,
title=event.node_title,
outputs=new_outputs,
outputs_truncated=outputs_truncated,

View File

@@ -12,7 +12,7 @@ from core.app.entities.app_invoke_entities import (
build_dify_run_context,
)
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.workflow.node_factory import DifyNodeFactory
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id
from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.entities.graph_init_params import GraphInitParams
from dify_graph.enums import WorkflowType
@@ -274,6 +274,8 @@ class PipelineRunner(WorkflowBasedAppRunner):
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)
if start_node_id is None:
start_node_id = get_default_root_node_id(graph_config)
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=start_node_id)
if not graph:

View File

@@ -3,7 +3,10 @@ import time
from collections.abc import Mapping, Sequence
from typing import Any, cast
from pydantic import ValidationError
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.entities.agent_strategy import AgentStrategyInfo
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_dify_run_context
from core.app.entities.queue_entities import (
AppQueueEvent,
@@ -29,7 +32,8 @@ from core.app.entities.queue_entities import (
QueueWorkflowStartedEvent,
QueueWorkflowSucceededEvent,
)
from core.workflow.node_factory import DifyNodeFactory
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id, resolve_workflow_node_class
from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDictAdapter
@@ -63,7 +67,6 @@ from dify_graph.graph_events import (
NodeRunSucceededEvent,
)
from dify_graph.graph_events.graph import GraphRunAbortedEvent
from dify_graph.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING
from dify_graph.runtime import GraphRuntimeState, VariablePool
from dify_graph.system_variable import SystemVariable
from dify_graph.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool
@@ -137,6 +140,9 @@ class WorkflowBasedAppRunner:
graph_runtime_state=graph_runtime_state,
)
if root_node_id is None:
root_node_id = get_default_root_node_id(graph_config)
# init graph
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=root_node_id)
@@ -308,7 +314,7 @@ class WorkflowBasedAppRunner:
# Get node class
node_type = target_node_config["data"].type
node_version = str(target_node_config["data"].version)
node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version]
node_cls = resolve_workflow_node_class(node_type=node_type, node_version=node_version)
# Use the variable pool from graph_runtime_state instead of creating a new one
variable_pool = graph_runtime_state.variable_pool
@@ -336,6 +342,18 @@ class WorkflowBasedAppRunner:
return graph, variable_pool
@staticmethod
def _build_agent_strategy_info(event: NodeRunStartedEvent) -> AgentStrategyInfo | None:
raw_agent_strategy = event.extras.get("agent_strategy")
if raw_agent_strategy is None:
return None
try:
return AgentStrategyInfo.model_validate(raw_agent_strategy)
except ValidationError:
logger.warning("Invalid agent strategy payload for node %s", event.node_id, exc_info=True)
return None
def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent):
"""
Handle event
@@ -421,7 +439,7 @@ class WorkflowBasedAppRunner:
start_at=event.start_at,
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
agent_strategy=event.agent_strategy,
agent_strategy=self._build_agent_strategy_info(event),
provider_type=event.provider_type,
provider_id=event.provider_id,
)
@@ -490,7 +508,9 @@ class WorkflowBasedAppRunner:
elif isinstance(event, NodeRunRetrieverResourceEvent):
self._publish_event(
QueueRetrieverResourcesEvent(
retriever_resources=event.retriever_resources,
retriever_resources=[
RetrievalSourceMetadata.model_validate(resource) for resource in event.retriever_resources
],
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
)

View File

@@ -0,0 +1,3 @@
from .agent_strategy import AgentStrategyInfo
__all__ = ["AgentStrategyInfo"]

View File

@@ -0,0 +1,8 @@
from pydantic import BaseModel, ConfigDict
class AgentStrategyInfo(BaseModel):
name: str
icon: str | None = None
model_config = ConfigDict(extra="forbid")

View File

@@ -5,13 +5,12 @@ from typing import Any
from pydantic import BaseModel, ConfigDict, Field
from core.app.entities.agent_strategy import AgentStrategyInfo
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from dify_graph.entities import AgentNodeStrategyInit
from dify_graph.entities.pause_reason import PauseReason
from dify_graph.entities.workflow_start_reason import WorkflowStartReason
from dify_graph.enums import WorkflowNodeExecutionMetadataKey
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
from dify_graph.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
from dify_graph.nodes import NodeType
class QueueEvent(StrEnum):
@@ -314,7 +313,7 @@ class QueueNodeStartedEvent(AppQueueEvent):
in_iteration_id: str | None = None
in_loop_id: str | None = None
start_at: datetime
agent_strategy: AgentNodeStrategyInit | None = None
agent_strategy: AgentStrategyInfo | None = None
# FIXME(-LAN-): only for ToolNode, need to refactor
provider_type: str # should be a core.tools.entities.tool_entities.ToolProviderType

View File

@@ -4,8 +4,8 @@ from typing import Any
from pydantic import BaseModel, ConfigDict, Field
from core.app.entities.agent_strategy import AgentStrategyInfo
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from dify_graph.entities import AgentNodeStrategyInit
from dify_graph.entities.workflow_start_reason import WorkflowStartReason
from dify_graph.enums import WorkflowExecutionStatus, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.model_runtime.entities.llm_entities import LLMResult, LLMUsage
@@ -349,7 +349,7 @@ class NodeStartStreamResponse(StreamResponse):
extras: dict[str, object] = Field(default_factory=dict)
iteration_id: str | None = None
loop_id: str | None = None
agent_strategy: AgentNodeStrategyInit | None = None
agent_strategy: AgentStrategyInfo | None = None
event: StreamEvent = StreamEvent.NODE_STARTED
workflow_run_id: str

View File

@@ -2,7 +2,7 @@ import logging
from dify_graph.constants import CONVERSATION_VARIABLE_NODE_ID
from dify_graph.conversation_variable_updater import ConversationVariableUpdater
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_events import GraphEngineEvent, NodeRunSucceededEvent
from dify_graph.nodes.variable_assigner.common import helpers as common_helpers
@@ -22,7 +22,7 @@ class ConversationVariablePersistenceLayer(GraphEngineLayer):
def on_event(self, event: GraphEngineEvent) -> None:
if not isinstance(event, NodeRunSucceededEvent):
return
if event.node_type != NodeType.VARIABLE_ASSIGNER:
if event.node_type != BuiltinNodeTypes.VARIABLE_ASSIGNER:
return
if self.graph_runtime_state is None:
return

View File

@@ -12,7 +12,7 @@ from typing_extensions import override
from core.app.llm import deduct_llm_quota, ensure_llm_quota_available
from core.errors.error import QuotaExceededError
from core.model_manager import ModelInstance
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes
from dify_graph.graph_engine.entities.commands import AbortCommand, CommandType
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_events import GraphEngineEvent, GraphNodeEventBase
@@ -113,11 +113,11 @@ class LLMQuotaLayer(GraphEngineLayer):
def _extract_model_instance(node: Node) -> ModelInstance | None:
try:
match node.node_type:
case NodeType.LLM:
case BuiltinNodeTypes.LLM:
return cast("LLMNode", node).model_instance
case NodeType.PARAMETER_EXTRACTOR:
case BuiltinNodeTypes.PARAMETER_EXTRACTOR:
return cast("ParameterExtractorNode", node).model_instance
case NodeType.QUESTION_CLASSIFIER:
case BuiltinNodeTypes.QUESTION_CLASSIFIER:
return cast("QuestionClassifierNode", node).model_instance
case _:
return None

View File

@@ -16,7 +16,7 @@ from opentelemetry.trace import Span, SpanKind, Tracer, get_tracer, set_span_in_
from typing_extensions import override
from configs import dify_config
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_events import GraphNodeEventBase
from dify_graph.nodes.base.node import Node
@@ -74,16 +74,13 @@ class ObservabilityLayer(GraphEngineLayer):
def _build_parser_registry(self) -> None:
"""Initialize parser registry for node types."""
self._parsers = {
NodeType.TOOL: ToolNodeOTelParser(),
NodeType.LLM: LLMNodeOTelParser(),
NodeType.KNOWLEDGE_RETRIEVAL: RetrievalNodeOTelParser(),
BuiltinNodeTypes.TOOL: ToolNodeOTelParser(),
BuiltinNodeTypes.LLM: LLMNodeOTelParser(),
BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL: RetrievalNodeOTelParser(),
}
def _get_parser(self, node: Node) -> NodeOTelParser:
node_type = getattr(node, "node_type", None)
if isinstance(node_type, NodeType):
return self._parsers.get(node_type, self._default_parser)
return self._default_parser
return self._parsers.get(node.node_type, self._default_parser)
@override
def on_graph_start(self) -> None:

View File

@@ -12,6 +12,7 @@ from core.rag.models.document import Document
from extensions.ext_database import db
from models.dataset import ChildChunk, DatasetQuery, DocumentSegment
from models.dataset import Document as DatasetDocument
from models.enums import CreatorUserRole
_logger = logging.getLogger(__name__)
@@ -38,7 +39,9 @@ class DatasetIndexToolCallbackHandler:
source="app",
source_app_id=self._app_id,
created_by_role=(
"account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end_user"
CreatorUserRole.ACCOUNT
if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER}
else CreatorUserRole.END_USER
),
created_by=self._user_id,
)

View File

@@ -24,12 +24,12 @@ from core.datasource.utils.message_transformer import DatasourceFileMessageTrans
from core.datasource.website_crawl.website_crawl_provider import WebsiteCrawlDatasourcePluginProviderController
from core.db.session_factory import session_factory
from core.plugin.impl.datasource import PluginDatasourceManager
from core.workflow.nodes.datasource.entities import DatasourceParameter, OnlineDriveDownloadFileParam
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import WorkflowNodeExecutionMetadataKey
from dify_graph.file import File
from dify_graph.file.enums import FileTransferMethod, FileType
from dify_graph.node_events import NodeRunResult, StreamChunkEvent, StreamCompletedEvent
from dify_graph.repositories.datasource_manager_protocol import DatasourceParameter, OnlineDriveDownloadFileParam
from factories import file_factory
from models.model import UploadFile
from models.tools import ToolFile

View File

@@ -58,7 +58,7 @@ from core.ops.entities.trace_entity import (
)
from core.repositories import DifyCoreRepositoryFactory
from dify_graph.entities import WorkflowNodeExecution
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
from extensions.ext_database import db
from models import WorkflowNodeExecutionTriggeredFrom
@@ -302,11 +302,11 @@ class AliyunDataTrace(BaseTraceInstance):
self, node_execution: WorkflowNodeExecution, trace_info: WorkflowTraceInfo, trace_metadata: TraceMetadata
):
try:
if node_execution.node_type == NodeType.LLM:
if node_execution.node_type == BuiltinNodeTypes.LLM:
node_span = self.build_workflow_llm_span(trace_info, node_execution, trace_metadata)
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
elif node_execution.node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
node_span = self.build_workflow_retrieval_span(trace_info, node_execution, trace_metadata)
elif node_execution.node_type == NodeType.TOOL:
elif node_execution.node_type == BuiltinNodeTypes.TOOL:
node_span = self.build_workflow_tool_span(trace_info, node_execution, trace_metadata)
else:
node_span = self.build_workflow_task_span(trace_info, node_execution, trace_metadata)

View File

@@ -155,8 +155,8 @@ def wrap_span_metadata(metadata, **kwargs):
return metadata
# Mapping from NodeType string values to OpenInference span kinds.
# NodeType values not listed here default to CHAIN.
# Mapping from built-in node type strings to OpenInference span kinds.
# Node types not listed here default to CHAIN.
_NODE_TYPE_TO_SPAN_KIND: dict[str, OpenInferenceSpanKindValues] = {
"llm": OpenInferenceSpanKindValues.LLM,
"knowledge-retrieval": OpenInferenceSpanKindValues.RETRIEVER,
@@ -168,7 +168,7 @@ _NODE_TYPE_TO_SPAN_KIND: dict[str, OpenInferenceSpanKindValues] = {
def _get_node_span_kind(node_type: str) -> OpenInferenceSpanKindValues:
"""Return the OpenInference span kind for a given workflow node type.
Covers every ``NodeType`` enum value. Nodes that do not have a
Covers every built-in node type string. Nodes that do not have a
specialised span kind (e.g. ``start``, ``end``, ``if-else``,
``code``, ``loop``, ``iteration``, etc.) are mapped to ``CHAIN``.
"""

View File

@@ -28,7 +28,7 @@ from core.ops.langfuse_trace.entities.langfuse_trace_entity import (
)
from core.ops.utils import filter_none_values
from core.repositories import DifyCoreRepositoryFactory
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes
from extensions.ext_database import db
from models import EndUser, WorkflowNodeExecutionTriggeredFrom
from models.enums import MessageStatus
@@ -141,7 +141,7 @@ class LangFuseDataTrace(BaseTraceInstance):
node_name = node_execution.title
node_type = node_execution.node_type
status = node_execution.status
if node_type == NodeType.LLM:
if node_type == BuiltinNodeTypes.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else:
inputs = node_execution.inputs or {}

View File

@@ -28,7 +28,7 @@ from core.ops.langsmith_trace.entities.langsmith_trace_entity import (
)
from core.ops.utils import filter_none_values, generate_dotted_order
from core.repositories import DifyCoreRepositoryFactory
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
from extensions.ext_database import db
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
@@ -163,7 +163,7 @@ class LangSmithDataTrace(BaseTraceInstance):
node_name = node_execution.title
node_type = node_execution.node_type
status = node_execution.status
if node_type == NodeType.LLM:
if node_type == BuiltinNodeTypes.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else:
inputs = node_execution.inputs or {}
@@ -197,7 +197,7 @@ class LangSmithDataTrace(BaseTraceInstance):
"ls_model_name": process_data.get("model_name", ""),
}
)
elif node_type == NodeType.KNOWLEDGE_RETRIEVAL:
elif node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
run_type = LangSmithRunType.retriever
else:
run_type = LangSmithRunType.tool

View File

@@ -23,7 +23,7 @@ from core.ops.entities.trace_entity import (
TraceTaskName,
WorkflowTraceInfo,
)
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes
from extensions.ext_database import db
from models import EndUser
from models.workflow import WorkflowNodeExecutionModel
@@ -145,10 +145,10 @@ class MLflowDataTrace(BaseTraceInstance):
"app_name": node.title,
}
if node.node_type in (NodeType.LLM, NodeType.QUESTION_CLASSIFIER):
if node.node_type in (BuiltinNodeTypes.LLM, BuiltinNodeTypes.QUESTION_CLASSIFIER):
inputs, llm_attributes = self._parse_llm_inputs_and_attributes(node)
attributes.update(llm_attributes)
elif node.node_type == NodeType.HTTP_REQUEST:
elif node.node_type == BuiltinNodeTypes.HTTP_REQUEST:
inputs = node.process_data # contains request URL
if not inputs:
@@ -180,9 +180,9 @@ class MLflowDataTrace(BaseTraceInstance):
# End node span
finished_at = node.created_at + timedelta(seconds=node.elapsed_time)
outputs = json.loads(node.outputs) if node.outputs else {}
if node.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
if node.node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
outputs = self._parse_knowledge_retrieval_outputs(outputs)
elif node.node_type == NodeType.LLM:
elif node.node_type == BuiltinNodeTypes.LLM:
outputs = outputs.get("text", outputs)
node_span.end(
outputs=outputs,
@@ -471,13 +471,13 @@ class MLflowDataTrace(BaseTraceInstance):
def _get_node_span_type(self, node_type: str) -> str:
"""Map Dify node types to MLflow span types"""
node_type_mapping = {
NodeType.LLM: SpanType.LLM,
NodeType.QUESTION_CLASSIFIER: SpanType.LLM,
NodeType.KNOWLEDGE_RETRIEVAL: SpanType.RETRIEVER,
NodeType.TOOL: SpanType.TOOL,
NodeType.CODE: SpanType.TOOL,
NodeType.HTTP_REQUEST: SpanType.TOOL,
NodeType.AGENT: SpanType.AGENT,
BuiltinNodeTypes.LLM: SpanType.LLM,
BuiltinNodeTypes.QUESTION_CLASSIFIER: SpanType.LLM,
BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL: SpanType.RETRIEVER,
BuiltinNodeTypes.TOOL: SpanType.TOOL,
BuiltinNodeTypes.CODE: SpanType.TOOL,
BuiltinNodeTypes.HTTP_REQUEST: SpanType.TOOL,
BuiltinNodeTypes.AGENT: SpanType.AGENT,
}
return node_type_mapping.get(node_type, "CHAIN") # type: ignore[arg-type,call-overload]

View File

@@ -23,7 +23,7 @@ from core.ops.entities.trace_entity import (
WorkflowTraceInfo,
)
from core.repositories import DifyCoreRepositoryFactory
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
from extensions.ext_database import db
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
@@ -187,7 +187,7 @@ class OpikDataTrace(BaseTraceInstance):
node_name = node_execution.title
node_type = node_execution.node_type
status = node_execution.status
if node_type == NodeType.LLM:
if node_type == BuiltinNodeTypes.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else:
inputs = node_execution.inputs or {}

View File

@@ -628,10 +628,10 @@ class TraceTask:
if not message_data:
return {}
conversation_mode_stmt = select(Conversation.mode).where(Conversation.id == message_data.conversation_id)
conversation_mode = db.session.scalars(conversation_mode_stmt).all()
if not conversation_mode or len(conversation_mode) == 0:
conversation_modes = db.session.scalars(conversation_mode_stmt).all()
if not conversation_modes or len(conversation_modes) == 0:
return {}
conversation_mode = conversation_mode[0]
conversation_mode = conversation_modes[0]
created_at = message_data.created_at
inputs = message_data.message

View File

@@ -27,7 +27,7 @@ from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from dify_graph.entities.workflow_node_execution import (
WorkflowNodeExecution,
)
from dify_graph.nodes import NodeType
from dify_graph.nodes import BuiltinNodeTypes
from extensions.ext_database import db
from models import Account, App, TenantAccountJoin, WorkflowNodeExecutionTriggeredFrom
@@ -179,7 +179,7 @@ class TencentDataTrace(BaseTraceInstance):
if node_span:
self.trace_client.add_span(node_span)
if node_execution.node_type == NodeType.LLM:
if node_execution.node_type == BuiltinNodeTypes.LLM:
self._record_llm_metrics(node_execution)
except Exception:
logger.exception("[Tencent APM] Failed to process node execution: %s", node_execution.id)
@@ -192,15 +192,15 @@ class TencentDataTrace(BaseTraceInstance):
) -> SpanData | None:
"""Build span for different node types"""
try:
if node_execution.node_type == NodeType.LLM:
if node_execution.node_type == BuiltinNodeTypes.LLM:
return TencentSpanBuilder.build_workflow_llm_span(
trace_id, workflow_span_id, trace_info, node_execution
)
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
elif node_execution.node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
return TencentSpanBuilder.build_workflow_retrieval_span(
trace_id, workflow_span_id, trace_info, node_execution
)
elif node_execution.node_type == NodeType.TOOL:
elif node_execution.node_type == BuiltinNodeTypes.TOOL:
return TencentSpanBuilder.build_workflow_tool_span(
trace_id, workflow_span_id, trace_info, node_execution
)

View File

@@ -31,7 +31,7 @@ from core.ops.entities.trace_entity import (
)
from core.ops.weave_trace.entities.weave_trace_entity import WeaveTraceModel
from core.repositories import DifyCoreRepositoryFactory
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
from extensions.ext_database import db
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
@@ -175,7 +175,7 @@ class WeaveDataTrace(BaseTraceInstance):
node_name = node_execution.title
node_type = node_execution.node_type
status = node_execution.status
if node_type == NodeType.LLM:
if node_type == BuiltinNodeTypes.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else:
inputs = node_execution.inputs or {}

View File

@@ -1,5 +1,5 @@
from core.plugin.backwards_invocation.base import BaseBackwardsInvocation
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes
from dify_graph.nodes.parameter_extractor.entities import (
ModelConfig as ParameterExtractorModelConfig,
)
@@ -52,7 +52,7 @@ class PluginNodeBackwardsInvocation(BaseBackwardsInvocation):
instruction=instruction, # instruct with variables are not supported
)
node_data_dict = node_data.model_dump()
node_data_dict["type"] = NodeType.PARAMETER_EXTRACTOR
node_data_dict["type"] = BuiltinNodeTypes.PARAMETER_EXTRACTOR
execution = workflow_service.run_free_workflow_node(
node_data_dict,
tenant_id=tenant_id,

View File

@@ -627,7 +627,7 @@ class ProviderManager:
tenant_id=tenant_id,
# TODO: Use provider name with prefix after the data migration.
provider_name=ModelProviderID(provider_name).provider_name,
provider_type=ProviderType.SYSTEM.value,
provider_type=ProviderType.SYSTEM,
quota_type=quota.quota_type,
quota_limit=0, # type: ignore
quota_used=0,

View File

@@ -0,0 +1,361 @@
import json
import logging
import time
from typing import Any
import holo_search_sdk as holo # type: ignore
from holo_search_sdk.types import BaseQuantizationType, DistanceType, TokenizerType
from psycopg import sql as psql
from pydantic import BaseModel, model_validator
from configs import dify_config
from core.rag.datasource.vdb.vector_base import BaseVector
from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory
from core.rag.datasource.vdb.vector_type import VectorType
from core.rag.embedding.embedding_base import Embeddings
from core.rag.models.document import Document
from extensions.ext_redis import redis_client
from models.dataset import Dataset
logger = logging.getLogger(__name__)
class HologresVectorConfig(BaseModel):
"""
Configuration for Hologres vector database connection.
In Hologres, access_key_id is used as the PostgreSQL username,
and access_key_secret is used as the PostgreSQL password.
"""
host: str
port: int = 80
database: str
access_key_id: str
access_key_secret: str
schema_name: str = "public"
tokenizer: TokenizerType = "jieba"
distance_method: DistanceType = "Cosine"
base_quantization_type: BaseQuantizationType = "rabitq"
max_degree: int = 64
ef_construction: int = 400
@model_validator(mode="before")
@classmethod
def validate_config(cls, values: dict):
if not values.get("host"):
raise ValueError("config HOLOGRES_HOST is required")
if not values.get("database"):
raise ValueError("config HOLOGRES_DATABASE is required")
if not values.get("access_key_id"):
raise ValueError("config HOLOGRES_ACCESS_KEY_ID is required")
if not values.get("access_key_secret"):
raise ValueError("config HOLOGRES_ACCESS_KEY_SECRET is required")
return values
class HologresVector(BaseVector):
"""
Hologres vector storage implementation using holo-search-sdk.
Supports semantic search (vector), full-text search, and hybrid search.
"""
def __init__(self, collection_name: str, config: HologresVectorConfig):
super().__init__(collection_name)
self._config = config
self._client = self._init_client(config)
self.table_name = f"embedding_{collection_name}".lower()
def _init_client(self, config: HologresVectorConfig):
"""Initialize and return a holo-search-sdk client."""
client = holo.connect(
host=config.host,
port=config.port,
database=config.database,
access_key_id=config.access_key_id,
access_key_secret=config.access_key_secret,
schema=config.schema_name,
)
client.connect()
return client
def get_type(self) -> str:
return VectorType.HOLOGRES
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
"""Create collection table with vector and full-text indexes, then add texts."""
dimension = len(embeddings[0])
self._create_collection(dimension)
self.add_texts(texts, embeddings)
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
"""Add texts with embeddings to the collection using batch upsert."""
if not documents:
return []
pks: list[str] = []
batch_size = 100
for i in range(0, len(documents), batch_size):
batch_docs = documents[i : i + batch_size]
batch_embeddings = embeddings[i : i + batch_size]
values = []
column_names = ["id", "text", "meta", "embedding"]
for j, doc in enumerate(batch_docs):
doc_id = doc.metadata.get("doc_id", "") if doc.metadata else ""
pks.append(doc_id)
values.append(
[
doc_id,
doc.page_content,
json.dumps(doc.metadata or {}),
batch_embeddings[j],
]
)
table = self._client.open_table(self.table_name)
table.upsert_multi(
index_column="id",
values=values,
column_names=column_names,
update=True,
update_columns=["text", "meta", "embedding"],
)
return pks
def text_exists(self, id: str) -> bool:
"""Check if a text with the given doc_id exists in the collection."""
if not self._client.check_table_exist(self.table_name):
return False
result = self._client.execute(
psql.SQL("SELECT 1 FROM {} WHERE id = {} LIMIT 1").format(
psql.Identifier(self.table_name), psql.Literal(id)
),
fetch_result=True,
)
return bool(result)
def get_ids_by_metadata_field(self, key: str, value: str) -> list[str] | None:
"""Get document IDs by metadata field key and value."""
result = self._client.execute(
psql.SQL("SELECT id FROM {} WHERE meta->>{} = {}").format(
psql.Identifier(self.table_name), psql.Literal(key), psql.Literal(value)
),
fetch_result=True,
)
if result:
return [row[0] for row in result]
return None
def delete_by_ids(self, ids: list[str]):
"""Delete documents by their doc_id list."""
if not ids:
return
if not self._client.check_table_exist(self.table_name):
return
self._client.execute(
psql.SQL("DELETE FROM {} WHERE id IN ({})").format(
psql.Identifier(self.table_name),
psql.SQL(", ").join(psql.Literal(id) for id in ids),
)
)
def delete_by_metadata_field(self, key: str, value: str):
"""Delete documents by metadata field key and value."""
if not self._client.check_table_exist(self.table_name):
return
self._client.execute(
psql.SQL("DELETE FROM {} WHERE meta->>{} = {}").format(
psql.Identifier(self.table_name), psql.Literal(key), psql.Literal(value)
)
)
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
"""Search for documents by vector similarity."""
if not self._client.check_table_exist(self.table_name):
return []
top_k = kwargs.get("top_k", 4)
score_threshold = float(kwargs.get("score_threshold") or 0.0)
table = self._client.open_table(self.table_name)
query = (
table.search_vector(
vector=query_vector,
column="embedding",
distance_method=self._config.distance_method,
output_name="distance",
)
.select(["id", "text", "meta"])
.limit(top_k)
)
# Apply document_ids_filter if provided
document_ids_filter = kwargs.get("document_ids_filter")
if document_ids_filter:
filter_sql = psql.SQL("meta->>'document_id' IN ({})").format(
psql.SQL(", ").join(psql.Literal(id) for id in document_ids_filter)
)
query = query.where(filter_sql)
results = query.fetchall()
return self._process_vector_results(results, score_threshold)
def _process_vector_results(self, results: list, score_threshold: float) -> list[Document]:
"""Process vector search results into Document objects."""
docs = []
for row in results:
# row format: (distance, id, text, meta)
# distance is first because search_vector() adds the computed column before selected columns
distance = row[0]
text = row[2]
meta = row[3]
if isinstance(meta, str):
meta = json.loads(meta)
# Convert distance to similarity score (consistent with pgvector)
score = 1 - distance
meta["score"] = score
if score >= score_threshold:
docs.append(Document(page_content=text, metadata=meta))
return docs
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
"""Search for documents by full-text search."""
if not self._client.check_table_exist(self.table_name):
return []
top_k = kwargs.get("top_k", 4)
table = self._client.open_table(self.table_name)
search_query = table.search_text(
column="text",
expression=query,
return_score=True,
return_score_name="score",
return_all_columns=True,
).limit(top_k)
# Apply document_ids_filter if provided
document_ids_filter = kwargs.get("document_ids_filter")
if document_ids_filter:
filter_sql = psql.SQL("meta->>'document_id' IN ({})").format(
psql.SQL(", ").join(psql.Literal(id) for id in document_ids_filter)
)
search_query = search_query.where(filter_sql)
results = search_query.fetchall()
return self._process_full_text_results(results)
def _process_full_text_results(self, results: list) -> list[Document]:
"""Process full-text search results into Document objects."""
docs = []
for row in results:
# row format: (id, text, meta, embedding, score)
text = row[1]
meta = row[2]
score = row[-1] # score is the last column from return_score
if isinstance(meta, str):
meta = json.loads(meta)
meta["score"] = score
docs.append(Document(page_content=text, metadata=meta))
return docs
def delete(self):
"""Delete the entire collection table."""
if self._client.check_table_exist(self.table_name):
self._client.drop_table(self.table_name)
def _create_collection(self, dimension: int):
"""Create the collection table with vector and full-text indexes."""
lock_name = f"vector_indexing_lock_{self._collection_name}"
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
return
if not self._client.check_table_exist(self.table_name):
# Create table via SQL with CHECK constraint for vector dimension
create_table_sql = psql.SQL("""
CREATE TABLE IF NOT EXISTS {} (
id TEXT PRIMARY KEY,
text TEXT NOT NULL,
meta JSONB NOT NULL,
embedding float4[] NOT NULL
CHECK (array_ndims(embedding) = 1
AND array_length(embedding, 1) = {})
);
""").format(psql.Identifier(self.table_name), psql.Literal(dimension))
self._client.execute(create_table_sql)
# Wait for table to be fully ready before creating indexes
max_wait_seconds = 30
poll_interval = 2
for _ in range(max_wait_seconds // poll_interval):
if self._client.check_table_exist(self.table_name):
break
time.sleep(poll_interval)
else:
raise RuntimeError(f"Table {self.table_name} was not ready after {max_wait_seconds}s")
# Open table and set vector index
table = self._client.open_table(self.table_name)
table.set_vector_index(
column="embedding",
distance_method=self._config.distance_method,
base_quantization_type=self._config.base_quantization_type,
max_degree=self._config.max_degree,
ef_construction=self._config.ef_construction,
use_reorder=self._config.base_quantization_type == "rabitq",
)
# Create full-text search index
table.create_text_index(
index_name=f"ft_idx_{self._collection_name}",
column="text",
tokenizer=self._config.tokenizer,
)
redis_client.set(collection_exist_cache_key, 1, ex=3600)
class HologresVectorFactory(AbstractVectorFactory):
"""Factory class for creating HologresVector instances."""
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> HologresVector:
if dataset.index_struct_dict:
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
collection_name = class_prefix
else:
dataset_id = dataset.id
collection_name = Dataset.gen_collection_name_by_id(dataset_id)
dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.HOLOGRES, collection_name))
return HologresVector(
collection_name=collection_name,
config=HologresVectorConfig(
host=dify_config.HOLOGRES_HOST or "",
port=dify_config.HOLOGRES_PORT,
database=dify_config.HOLOGRES_DATABASE or "",
access_key_id=dify_config.HOLOGRES_ACCESS_KEY_ID or "",
access_key_secret=dify_config.HOLOGRES_ACCESS_KEY_SECRET or "",
schema_name=dify_config.HOLOGRES_SCHEMA,
tokenizer=dify_config.HOLOGRES_TOKENIZER,
distance_method=dify_config.HOLOGRES_DISTANCE_METHOD,
base_quantization_type=dify_config.HOLOGRES_BASE_QUANTIZATION_TYPE,
max_degree=dify_config.HOLOGRES_MAX_DEGREE,
ef_construction=dify_config.HOLOGRES_EF_CONSTRUCTION,
),
)

View File

@@ -38,7 +38,7 @@ class AbstractVectorFactory(ABC):
class Vector:
def __init__(self, dataset: Dataset, attributes: list | None = None):
if attributes is None:
attributes = ["doc_id", "dataset_id", "document_id", "doc_hash"]
attributes = ["doc_id", "dataset_id", "document_id", "doc_hash", "doc_type"]
self._dataset = dataset
self._embeddings = self._get_embeddings()
self._attributes = attributes
@@ -191,6 +191,10 @@ class Vector:
from core.rag.datasource.vdb.iris.iris_vector import IrisVectorFactory
return IrisVectorFactory
case VectorType.HOLOGRES:
from core.rag.datasource.vdb.hologres.hologres_vector import HologresVectorFactory
return HologresVectorFactory
case _:
raise ValueError(f"Vector store {vector_type} is not supported.")

View File

@@ -34,3 +34,4 @@ class VectorType(StrEnum):
MATRIXONE = "matrixone"
CLICKZETTA = "clickzetta"
IRIS = "iris"
HOLOGRES = "hologres"

View File

@@ -196,6 +196,7 @@ class WeaviateVector(BaseVector):
),
wc.Property(name="document_id", data_type=wc.DataType.TEXT),
wc.Property(name="doc_id", data_type=wc.DataType.TEXT),
wc.Property(name="doc_type", data_type=wc.DataType.TEXT),
wc.Property(name="chunk_index", data_type=wc.DataType.INT),
],
vector_config=wc.Configure.Vectors.self_provided(),
@@ -225,6 +226,8 @@ class WeaviateVector(BaseVector):
to_add.append(wc.Property(name="document_id", data_type=wc.DataType.TEXT))
if "doc_id" not in existing:
to_add.append(wc.Property(name="doc_id", data_type=wc.DataType.TEXT))
if "doc_type" not in existing:
to_add.append(wc.Property(name="doc_type", data_type=wc.DataType.TEXT))
if "chunk_index" not in existing:
to_add.append(wc.Property(name="chunk_index", data_type=wc.DataType.INT))

View File

@@ -9,8 +9,8 @@ from flask import current_app
from sqlalchemy import delete, func, select
from core.db.session_factory import session_factory
from dify_graph.nodes.knowledge_index.exc import KnowledgeIndexNodeError
from dify_graph.repositories.index_processor_protocol import Preview, PreviewItem, QaPreview
from core.workflow.nodes.knowledge_index.exc import KnowledgeIndexNodeError
from core.workflow.nodes.knowledge_index.protocols import Preview, PreviewItem, QaPreview
from models.dataset import Dataset, Document, DocumentSegment
from .index_processor_factory import IndexProcessorFactory

View File

@@ -56,18 +56,18 @@ from core.rag.retrieval.template_prompts import (
)
from core.tools.signature import sign_upload_file
from core.tools.utils.dataset_retriever.dataset_retriever_base_tool import DatasetRetrieverBaseTool
from dify_graph.file import File, FileTransferMethod, FileType
from dify_graph.model_runtime.entities.llm_entities import LLMMode, LLMResult, LLMUsage
from dify_graph.model_runtime.entities.message_entities import PromptMessage, PromptMessageRole, PromptMessageTool
from dify_graph.model_runtime.entities.model_entities import ModelFeature, ModelType
from dify_graph.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from dify_graph.nodes.knowledge_retrieval import exc
from dify_graph.repositories.rag_retrieval_protocol import (
from core.workflow.nodes.knowledge_retrieval import exc
from core.workflow.nodes.knowledge_retrieval.retrieval import (
KnowledgeRetrievalRequest,
Source,
SourceChildChunk,
SourceMetadata,
)
from dify_graph.file import File, FileTransferMethod, FileType
from dify_graph.model_runtime.entities.llm_entities import LLMMode, LLMResult, LLMUsage
from dify_graph.model_runtime.entities.message_entities import PromptMessage, PromptMessageRole, PromptMessageTool
from dify_graph.model_runtime.entities.model_entities import ModelFeature, ModelType
from dify_graph.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.json_in_md_parser import parse_and_check_json_markdown
@@ -83,6 +83,7 @@ from models.dataset import (
)
from models.dataset import Document as DatasetDocument
from models.dataset import Document as DocumentModel
from models.enums import CreatorUserRole
from services.external_knowledge_service import ExternalDatasetService
from services.feature_service import FeatureService
@@ -1009,7 +1010,7 @@ class DatasetRetrieval:
content=json.dumps(contents),
source="app",
source_app_id=app_id,
created_by_role=user_from,
created_by_role=CreatorUserRole(user_from),
created_by=user_id,
)
dataset_queries.append(dataset_query)

View File

@@ -146,7 +146,9 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository):
# No sequence number generation needed anymore
db_model.type = domain_model.workflow_type
from models.workflow import WorkflowType as ModelWorkflowType
db_model.type = ModelWorkflowType(domain_model.workflow_type.value)
db_model.version = domain_model.workflow_version
db_model.graph = json.dumps(domain_model.graph) if domain_model.graph else None
db_model.inputs = json.dumps(domain_model.inputs) if domain_model.inputs else None

View File

@@ -18,7 +18,7 @@ from tenacity import before_sleep_log, retry, retry_if_exception, stop_after_att
from configs import dify_config
from dify_graph.entities import WorkflowNodeExecution
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.enums import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from dify_graph.repositories.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository
from dify_graph.workflow_type_encoder import WorkflowRuntimeTypeConverter
@@ -146,7 +146,7 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository)
index=db_model.index,
predecessor_node_id=db_model.predecessor_node_id,
node_id=db_model.node_id,
node_type=NodeType(db_model.node_type),
node_type=db_model.node_type,
title=db_model.title,
inputs=inputs,
process_data=process_data,

View File

@@ -116,6 +116,7 @@ class ToolParameterConfigurationManager:
return a deep copy of parameters with decrypted values
"""
parameters = self._deep_copy(parameters)
cache = ToolParameterCache(
tenant_id=self.tenant_id,

View File

@@ -3,7 +3,7 @@ from typing import Any
from core.tools.entities.tool_entities import WorkflowToolParameterConfiguration
from core.tools.errors import WorkflowToolHumanInputNotSupportedError
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes
from dify_graph.nodes.base.entities import OutputVariableEntity
from dify_graph.variables.input_entities import VariableEntity
@@ -51,7 +51,7 @@ class WorkflowToolConfigurationUtils:
def ensure_no_human_input_nodes(cls, graph: Mapping[str, Any]) -> None:
nodes = graph.get("nodes", [])
for node in nodes:
if node.get("data", {}).get("type") == NodeType.HUMAN_INPUT:
if node.get("data", {}).get("type") == BuiltinNodeTypes.HUMAN_INPUT:
raise WorkflowToolHumanInputNotSupportedError()
@classmethod

View File

@@ -0,0 +1,18 @@
from typing import Final
TRIGGER_WEBHOOK_NODE_TYPE: Final[str] = "trigger-webhook"
TRIGGER_SCHEDULE_NODE_TYPE: Final[str] = "trigger-schedule"
TRIGGER_PLUGIN_NODE_TYPE: Final[str] = "trigger-plugin"
TRIGGER_INFO_METADATA_KEY: Final[str] = "trigger_info"
TRIGGER_NODE_TYPES: Final[frozenset[str]] = frozenset(
{
TRIGGER_WEBHOOK_NODE_TYPE,
TRIGGER_SCHEDULE_NODE_TYPE,
TRIGGER_PLUGIN_NODE_TYPE,
}
)
def is_trigger_node_type(node_type: str) -> bool:
return node_type in TRIGGER_NODE_TYPES

View File

@@ -11,6 +11,11 @@ from typing import Any
from pydantic import BaseModel
from core.plugin.entities.request import TriggerInvokeEventResponse
from core.trigger.constants import (
TRIGGER_PLUGIN_NODE_TYPE,
TRIGGER_SCHEDULE_NODE_TYPE,
TRIGGER_WEBHOOK_NODE_TYPE,
)
from core.trigger.debug.event_bus import TriggerDebugEventBus
from core.trigger.debug.events import (
PluginTriggerDebugEvent,
@@ -19,10 +24,9 @@ from core.trigger.debug.events import (
build_plugin_pool_key,
build_webhook_pool_key,
)
from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData
from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig
from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.enums import NodeType
from dify_graph.nodes.trigger_plugin.entities import TriggerEventNodeData
from dify_graph.nodes.trigger_schedule.entities import ScheduleConfig
from extensions.ext_redis import redis_client
from libs.datetime_utils import ensure_naive_utc, naive_utc_now
from libs.schedule_utils import calculate_next_run_at
@@ -206,21 +210,19 @@ def create_event_poller(
if not node_config:
raise ValueError("Node data not found for node %s", node_id)
node_type = draft_workflow.get_node_type_from_node_config(node_config)
match node_type:
case NodeType.TRIGGER_PLUGIN:
return PluginTriggerDebugEventPoller(
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
)
case NodeType.TRIGGER_WEBHOOK:
return WebhookTriggerDebugEventPoller(
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
)
case NodeType.TRIGGER_SCHEDULE:
return ScheduleTriggerDebugEventPoller(
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
)
case _:
raise ValueError("unable to create event poller for node type %s", node_type)
if node_type == TRIGGER_PLUGIN_NODE_TYPE:
return PluginTriggerDebugEventPoller(
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
)
if node_type == TRIGGER_WEBHOOK_NODE_TYPE:
return WebhookTriggerDebugEventPoller(
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
)
if node_type == TRIGGER_SCHEDULE_NODE_TYPE:
return ScheduleTriggerDebugEventPoller(
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
)
raise ValueError("unable to create event poller for node type %s", node_type)
def select_trigger_debug_events(

View File

@@ -1,4 +1 @@
from .node_factory import DifyNodeFactory
from .workflow_entry import WorkflowEntry
__all__ = ["DifyNodeFactory", "WorkflowEntry"]
"""Core workflow package."""

View File

@@ -1,4 +1,7 @@
from collections.abc import Callable, Mapping
import importlib
import pkgutil
from collections.abc import Callable, Iterator, Mapping, MutableMapping
from functools import lru_cache
from typing import TYPE_CHECKING, Any, TypeAlias, cast, final
from sqlalchemy import select
@@ -8,7 +11,6 @@ from typing_extensions import override
from configs import dify_config
from core.app.entities.app_invoke_entities import DifyRunContext
from core.app.llm.model_access import build_dify_model_access
from core.datasource.datasource_manager import DatasourceManager
from core.helper.code_executor.code_executor import (
CodeExecutionError,
CodeExecutor,
@@ -17,15 +19,19 @@ from core.helper.ssrf_proxy import ssrf_proxy
from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance
from core.prompt.entities.advanced_prompt_entities import MemoryConfig
from core.rag.index_processor.index_processor import IndexProcessor
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
from core.rag.summary_index.summary_index import SummaryIndex
from core.repositories.human_input_repository import HumanInputFormRepositoryImpl
from core.tools.tool_file_manager import ToolFileManager
from core.trigger.constants import TRIGGER_NODE_TYPES
from core.workflow.nodes.agent.message_transformer import AgentMessageTransformer
from core.workflow.nodes.agent.plugin_strategy_adapter import (
PluginAgentStrategyPresentationProvider,
PluginAgentStrategyResolver,
)
from core.workflow.nodes.agent.runtime_support import AgentRuntimeSupport
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.entities.graph_config import NodeConfigDict, NodeConfigDictAdapter
from dify_graph.entities.graph_init_params import DIFY_RUN_CONTEXT_KEY
from dify_graph.enums import NodeType, SystemVariableKey
from dify_graph.enums import BuiltinNodeTypes, NodeType, SystemVariableKey
from dify_graph.file.file_manager import file_manager
from dify_graph.graph.graph import NodeFactory
from dify_graph.model_runtime.entities.model_entities import ModelType
@@ -39,7 +45,6 @@ from dify_graph.nodes.document_extractor import UnstructuredApiConfig
from dify_graph.nodes.http_request import build_http_request_config
from dify_graph.nodes.llm.entities import LLMNodeData
from dify_graph.nodes.llm.exc import LLMModeRequiredError, ModelNotExistError
from dify_graph.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from dify_graph.nodes.parameter_extractor.entities import ParameterExtractorNodeData
from dify_graph.nodes.question_classifier.entities import QuestionClassifierNodeData
from dify_graph.nodes.template_transform.template_renderer import (
@@ -53,6 +58,135 @@ if TYPE_CHECKING:
from dify_graph.entities import GraphInitParams
from dify_graph.runtime import GraphRuntimeState
LATEST_VERSION = "latest"
_START_NODE_TYPES: frozenset[NodeType] = frozenset(
(BuiltinNodeTypes.START, BuiltinNodeTypes.DATASOURCE, *TRIGGER_NODE_TYPES)
)
def _import_node_package(package_name: str, *, excluded_modules: frozenset[str] = frozenset()) -> None:
package = importlib.import_module(package_name)
for _, module_name, _ in pkgutil.walk_packages(package.__path__, package.__name__ + "."):
if module_name in excluded_modules:
continue
importlib.import_module(module_name)
@lru_cache(maxsize=1)
def register_nodes() -> None:
"""Import production node modules so they self-register with ``Node``."""
_import_node_package("dify_graph.nodes")
_import_node_package("core.workflow.nodes")
def get_node_type_classes_mapping() -> Mapping[NodeType, Mapping[str, type[Node]]]:
"""Return a read-only snapshot of the current production node registry.
The workflow layer owns node bootstrap because it must compose built-in
`dify_graph.nodes.*` implementations with workflow-local nodes under
`core.workflow.nodes.*`. Keeping this import side effect here avoids
reintroducing registry bootstrapping into lower-level graph primitives.
"""
register_nodes()
return Node.get_node_type_classes_mapping()
def resolve_workflow_node_class(*, node_type: NodeType, node_version: str) -> type[Node]:
node_mapping = get_node_type_classes_mapping().get(node_type)
if not node_mapping:
raise ValueError(f"No class mapping found for node type: {node_type}")
latest_node_class = node_mapping.get(LATEST_VERSION)
matched_node_class = node_mapping.get(node_version)
node_class = matched_node_class or latest_node_class
if not node_class:
raise ValueError(f"No latest version class found for node type: {node_type}")
return node_class
def is_start_node_type(node_type: NodeType) -> bool:
"""Return True when the node type can serve as a workflow entry point."""
return node_type in _START_NODE_TYPES
def get_default_root_node_id(graph_config: Mapping[str, Any]) -> str:
"""Resolve the default entry node for a persisted top-level workflow graph.
This workflow-layer helper depends on start-node semantics defined by
`is_start_node_type`, so it intentionally lives next to the node registry
instead of in the raw `dify_graph.entities.graph_config` schema module.
"""
nodes = graph_config.get("nodes")
if not isinstance(nodes, list):
raise ValueError("nodes in workflow graph must be a list")
for node in nodes:
if not isinstance(node, Mapping):
continue
if node.get("type") == "custom-note":
continue
node_id = node.get("id")
data = node.get("data")
if not isinstance(node_id, str) or not isinstance(data, Mapping):
continue
node_type = data.get("type")
if isinstance(node_type, str) and is_start_node_type(node_type):
return node_id
raise ValueError("Unable to determine default root node ID from workflow graph")
class _LazyNodeTypeClassesMapping(MutableMapping[NodeType, Mapping[str, type[Node]]]):
"""Mutable dict-like view over the current node registry."""
def __init__(self) -> None:
self._cached_snapshot: dict[NodeType, Mapping[str, type[Node]]] = {}
self._cached_version = -1
self._deleted: set[NodeType] = set()
self._overrides: dict[NodeType, Mapping[str, type[Node]]] = {}
def _snapshot(self) -> dict[NodeType, Mapping[str, type[Node]]]:
current_version = Node.get_registry_version()
if self._cached_version != current_version:
self._cached_snapshot = dict(get_node_type_classes_mapping())
self._cached_version = current_version
if not self._deleted and not self._overrides:
return self._cached_snapshot
snapshot = {key: value for key, value in self._cached_snapshot.items() if key not in self._deleted}
snapshot.update(self._overrides)
return snapshot
def __getitem__(self, key: NodeType) -> Mapping[str, type[Node]]:
return self._snapshot()[key]
def __setitem__(self, key: NodeType, value: Mapping[str, type[Node]]) -> None:
self._deleted.discard(key)
self._overrides[key] = value
def __delitem__(self, key: NodeType) -> None:
if key in self._overrides:
del self._overrides[key]
return
if key in self._cached_snapshot:
self._deleted.add(key)
return
raise KeyError(key)
def __iter__(self) -> Iterator[NodeType]:
return iter(self._snapshot())
def __len__(self) -> int:
return len(self._snapshot())
# Keep the canonical node-class mapping in the workflow layer that also bootstraps
# legacy `core.workflow.nodes.*` registrations.
NODE_TYPE_CLASSES_MAPPING: MutableMapping[NodeType, Mapping[str, type[Node]]] = _LazyNodeTypeClassesMapping()
LLMCompatibleNodeData: TypeAlias = LLMNodeData | QuestionClassifierNodeData | ParameterExtractorNodeData
@@ -97,10 +231,7 @@ class DefaultWorkflowCodeExecutor:
@final
class DifyNodeFactory(NodeFactory):
"""
Default implementation of NodeFactory that uses the traditional node mapping.
This factory creates nodes by looking up their types in NODE_TYPE_CLASSES_MAPPING
and instantiating the appropriate node class.
Default implementation of NodeFactory that resolves node classes from the live registry.
"""
def __init__(
@@ -127,7 +258,6 @@ class DifyNodeFactory(NodeFactory):
self._http_request_http_client = ssrf_proxy
self._http_request_tool_file_manager_factory = ToolFileManager
self._http_request_file_manager = file_manager
self._rag_retrieval = DatasetRetrieval()
self._document_extractor_unstructured_api_config = UnstructuredApiConfig(
api_url=dify_config.UNSTRUCTURED_API_URL,
api_key=dify_config.UNSTRUCTURED_API_KEY or "",
@@ -143,6 +273,10 @@ class DifyNodeFactory(NodeFactory):
)
self._llm_credentials_provider, self._llm_model_factory = build_dify_model_access(self._dify_context.tenant_id)
self._agent_strategy_resolver = PluginAgentStrategyResolver()
self._agent_strategy_presentation_provider = PluginAgentStrategyPresentationProvider()
self._agent_runtime_support = AgentRuntimeSupport()
self._agent_message_transformer = AgentMessageTransformer()
@staticmethod
def _resolve_dify_context(run_context: Mapping[str, Any]) -> DifyRunContext:
@@ -170,55 +304,51 @@ class DifyNodeFactory(NodeFactory):
node_class = self._resolve_node_class(node_type=node_data.type, node_version=str(node_data.version))
node_type = node_data.type
node_init_kwargs_factories: Mapping[NodeType, Callable[[], dict[str, object]]] = {
NodeType.CODE: lambda: {
BuiltinNodeTypes.CODE: lambda: {
"code_executor": self._code_executor,
"code_limits": self._code_limits,
},
NodeType.TEMPLATE_TRANSFORM: lambda: {
BuiltinNodeTypes.TEMPLATE_TRANSFORM: lambda: {
"template_renderer": self._template_renderer,
"max_output_length": self._template_transform_max_output_length,
},
NodeType.HTTP_REQUEST: lambda: {
BuiltinNodeTypes.HTTP_REQUEST: lambda: {
"http_request_config": self._http_request_config,
"http_client": self._http_request_http_client,
"tool_file_manager_factory": self._http_request_tool_file_manager_factory,
"file_manager": self._http_request_file_manager,
},
NodeType.HUMAN_INPUT: lambda: {
BuiltinNodeTypes.HUMAN_INPUT: lambda: {
"form_repository": HumanInputFormRepositoryImpl(tenant_id=self._dify_context.tenant_id),
},
NodeType.KNOWLEDGE_INDEX: lambda: {
"index_processor": IndexProcessor(),
"summary_index_service": SummaryIndex(),
},
NodeType.LLM: lambda: self._build_llm_compatible_node_init_kwargs(
BuiltinNodeTypes.LLM: lambda: self._build_llm_compatible_node_init_kwargs(
node_class=node_class,
node_data=node_data,
include_http_client=True,
),
NodeType.DATASOURCE: lambda: {
"datasource_manager": DatasourceManager,
},
NodeType.KNOWLEDGE_RETRIEVAL: lambda: {
"rag_retrieval": self._rag_retrieval,
},
NodeType.DOCUMENT_EXTRACTOR: lambda: {
BuiltinNodeTypes.DOCUMENT_EXTRACTOR: lambda: {
"unstructured_api_config": self._document_extractor_unstructured_api_config,
"http_client": self._http_request_http_client,
},
NodeType.QUESTION_CLASSIFIER: lambda: self._build_llm_compatible_node_init_kwargs(
BuiltinNodeTypes.QUESTION_CLASSIFIER: lambda: self._build_llm_compatible_node_init_kwargs(
node_class=node_class,
node_data=node_data,
include_http_client=True,
),
NodeType.PARAMETER_EXTRACTOR: lambda: self._build_llm_compatible_node_init_kwargs(
BuiltinNodeTypes.PARAMETER_EXTRACTOR: lambda: self._build_llm_compatible_node_init_kwargs(
node_class=node_class,
node_data=node_data,
include_http_client=False,
),
NodeType.TOOL: lambda: {
BuiltinNodeTypes.TOOL: lambda: {
"tool_file_manager_factory": self._http_request_tool_file_manager_factory(),
},
BuiltinNodeTypes.AGENT: lambda: {
"strategy_resolver": self._agent_strategy_resolver,
"presentation_provider": self._agent_strategy_presentation_provider,
"runtime_support": self._agent_runtime_support,
"message_transformer": self._agent_message_transformer,
},
}
node_init_kwargs = node_init_kwargs_factories.get(node_type, lambda: {})()
return node_class(
@@ -238,16 +368,7 @@ class DifyNodeFactory(NodeFactory):
@staticmethod
def _resolve_node_class(*, node_type: NodeType, node_version: str) -> type[Node]:
node_mapping = NODE_TYPE_CLASSES_MAPPING.get(node_type)
if not node_mapping:
raise ValueError(f"No class mapping found for node type: {node_type}")
latest_node_class = node_mapping.get(LATEST_VERSION)
matched_node_class = node_mapping.get(node_version)
node_class = matched_node_class or latest_node_class
if not node_class:
raise ValueError(f"No latest version class found for node type: {node_type}")
return node_class
return resolve_workflow_node_class(node_type=node_type, node_version=node_version)
def _build_llm_compatible_node_init_kwargs(
self,

View File

@@ -0,0 +1 @@
"""Workflow node implementations that remain under the legacy core.workflow namespace."""

View File

@@ -0,0 +1,4 @@
from .agent_node import AgentNode
from .entities import AgentNodeData
__all__ = ["AgentNode", "AgentNodeData"]

View File

@@ -0,0 +1,188 @@
from __future__ import annotations
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any
from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.enums import BuiltinNodeTypes, SystemVariableKey, WorkflowNodeExecutionStatus
from dify_graph.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent
from dify_graph.nodes.base.node import Node
from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser
from .entities import AgentNodeData
from .exceptions import (
AgentInvocationError,
AgentMessageTransformError,
)
from .message_transformer import AgentMessageTransformer
from .runtime_support import AgentRuntimeSupport
from .strategy_protocols import AgentStrategyPresentationProvider, AgentStrategyResolver
if TYPE_CHECKING:
from dify_graph.entities import GraphInitParams
from dify_graph.runtime import GraphRuntimeState
class AgentNode(Node[AgentNodeData]):
node_type = BuiltinNodeTypes.AGENT
_strategy_resolver: AgentStrategyResolver
_presentation_provider: AgentStrategyPresentationProvider
_runtime_support: AgentRuntimeSupport
_message_transformer: AgentMessageTransformer
def __init__(
self,
id: str,
config: NodeConfigDict,
graph_init_params: GraphInitParams,
graph_runtime_state: GraphRuntimeState,
*,
strategy_resolver: AgentStrategyResolver,
presentation_provider: AgentStrategyPresentationProvider,
runtime_support: AgentRuntimeSupport,
message_transformer: AgentMessageTransformer,
) -> None:
super().__init__(
id=id,
config=config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)
self._strategy_resolver = strategy_resolver
self._presentation_provider = presentation_provider
self._runtime_support = runtime_support
self._message_transformer = message_transformer
@classmethod
def version(cls) -> str:
return "1"
def populate_start_event(self, event) -> None:
dify_ctx = self.require_dify_context()
event.extras["agent_strategy"] = {
"name": self.node_data.agent_strategy_name,
"icon": self._presentation_provider.get_icon(
tenant_id=dify_ctx.tenant_id,
agent_strategy_provider_name=self.node_data.agent_strategy_provider_name,
),
}
def _run(self) -> Generator[NodeEventBase, None, None]:
from core.plugin.impl.exc import PluginDaemonClientSideError
dify_ctx = self.require_dify_context()
try:
strategy = self._strategy_resolver.resolve(
tenant_id=dify_ctx.tenant_id,
agent_strategy_provider_name=self.node_data.agent_strategy_provider_name,
agent_strategy_name=self.node_data.agent_strategy_name,
)
except Exception as e:
yield StreamCompletedEvent(
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs={},
error=f"Failed to get agent strategy: {str(e)}",
),
)
return
agent_parameters = strategy.get_parameters()
parameters = self._runtime_support.build_parameters(
agent_parameters=agent_parameters,
variable_pool=self.graph_runtime_state.variable_pool,
node_data=self.node_data,
strategy=strategy,
tenant_id=dify_ctx.tenant_id,
app_id=dify_ctx.app_id,
invoke_from=dify_ctx.invoke_from,
)
parameters_for_log = self._runtime_support.build_parameters(
agent_parameters=agent_parameters,
variable_pool=self.graph_runtime_state.variable_pool,
node_data=self.node_data,
strategy=strategy,
tenant_id=dify_ctx.tenant_id,
app_id=dify_ctx.app_id,
invoke_from=dify_ctx.invoke_from,
for_log=True,
)
credentials = self._runtime_support.build_credentials(parameters=parameters)
conversation_id = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.CONVERSATION_ID])
try:
message_stream = strategy.invoke(
params=parameters,
user_id=dify_ctx.user_id,
app_id=dify_ctx.app_id,
conversation_id=conversation_id.text if conversation_id else None,
credentials=credentials,
)
except Exception as e:
error = AgentInvocationError(f"Failed to invoke agent: {str(e)}", original_error=e)
yield StreamCompletedEvent(
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=parameters_for_log,
error=str(error),
)
)
return
try:
yield from self._message_transformer.transform(
messages=message_stream,
tool_info={
"icon": self._presentation_provider.get_icon(
tenant_id=dify_ctx.tenant_id,
agent_strategy_provider_name=self.node_data.agent_strategy_provider_name,
),
"agent_strategy": self.node_data.agent_strategy_name,
},
parameters_for_log=parameters_for_log,
user_id=dify_ctx.user_id,
tenant_id=dify_ctx.tenant_id,
node_type=self.node_type,
node_id=self._node_id,
node_execution_id=self.id,
)
except PluginDaemonClientSideError as e:
transform_error = AgentMessageTransformError(
f"Failed to transform agent message: {str(e)}", original_error=e
)
yield StreamCompletedEvent(
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=parameters_for_log,
error=str(transform_error),
)
)
@classmethod
def _extract_variable_selector_to_variable_mapping(
cls,
*,
graph_config: Mapping[str, Any],
node_id: str,
node_data: AgentNodeData,
) -> Mapping[str, Sequence[str]]:
_ = graph_config # Explicitly mark as unused
result: dict[str, Any] = {}
typed_node_data = node_data
for parameter_name in typed_node_data.agent_parameters:
input = typed_node_data.agent_parameters[parameter_name]
match input.type:
case "mixed" | "constant":
selectors = VariableTemplateParser(str(input.value)).extract_variable_selectors()
for selector in selectors:
result[selector.variable] = selector.value_selector
case "variable":
result[parameter_name] = input.value
result = {node_id + "." + key: value for key, value in result.items()}
return result

View File

@@ -6,14 +6,14 @@ from pydantic import BaseModel
from core.prompt.entities.advanced_prompt_entities import MemoryConfig
from core.tools.entities.tool_entities import ToolSelector
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes, NodeType
class AgentNodeData(BaseNodeData):
type: NodeType = NodeType.AGENT
agent_strategy_provider_name: str # redundancy
type: NodeType = BuiltinNodeTypes.AGENT
agent_strategy_provider_name: str
agent_strategy_name: str
agent_strategy_label: str # redundancy
agent_strategy_label: str
memory: MemoryConfig | None = None
# The version of the tool parameter.
# If this value is None, it indicates this is a previous version

View File

@@ -119,14 +119,3 @@ class AgentVariableTypeError(AgentNodeError):
self.expected_type = expected_type
self.actual_type = actual_type
super().__init__(message)
class AgentMaxIterationError(AgentNodeError):
"""Exception raised when the agent exceeds the maximum iteration limit."""
def __init__(self, max_iteration: int):
self.max_iteration = max_iteration
super().__init__(
f"Agent exceeded the maximum iteration limit of {max_iteration}. "
f"The agent was unable to complete the task within the allowed number of iterations."
)

View File

@@ -0,0 +1,292 @@
from __future__ import annotations
from collections.abc import Generator, Mapping
from typing import Any, cast
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.tools.entities.tool_entities import ToolInvokeMessage
from core.tools.utils.message_transformer import ToolFileMessageTransformer
from dify_graph.enums import BuiltinNodeTypes, NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.file import File, FileTransferMethod
from dify_graph.model_runtime.entities.llm_entities import LLMUsage, LLMUsageMetadata
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from dify_graph.node_events import (
AgentLogEvent,
NodeEventBase,
NodeRunResult,
StreamChunkEvent,
StreamCompletedEvent,
)
from dify_graph.variables.segments import ArrayFileSegment
from extensions.ext_database import db
from factories import file_factory
from models import ToolFile
from services.tools.builtin_tools_manage_service import BuiltinToolManageService
from .exceptions import AgentNodeError, AgentVariableTypeError, ToolFileNotFoundError
class AgentMessageTransformer:
def transform(
self,
*,
messages: Generator[ToolInvokeMessage, None, None],
tool_info: Mapping[str, Any],
parameters_for_log: dict[str, Any],
user_id: str,
tenant_id: str,
node_type: NodeType,
node_id: str,
node_execution_id: str,
) -> Generator[NodeEventBase, None, None]:
from core.plugin.impl.plugin import PluginInstaller
message_stream = ToolFileMessageTransformer.transform_tool_invoke_messages(
messages=messages,
user_id=user_id,
tenant_id=tenant_id,
conversation_id=None,
)
text = ""
files: list[File] = []
json_list: list[dict | list] = []
agent_logs: list[AgentLogEvent] = []
agent_execution_metadata: Mapping[WorkflowNodeExecutionMetadataKey, Any] = {}
llm_usage = LLMUsage.empty_usage()
variables: dict[str, Any] = {}
for message in message_stream:
if message.type in {
ToolInvokeMessage.MessageType.IMAGE_LINK,
ToolInvokeMessage.MessageType.BINARY_LINK,
ToolInvokeMessage.MessageType.IMAGE,
}:
assert isinstance(message.message, ToolInvokeMessage.TextMessage)
url = message.message.text
if message.meta:
transfer_method = message.meta.get("transfer_method", FileTransferMethod.TOOL_FILE)
else:
transfer_method = FileTransferMethod.TOOL_FILE
tool_file_id = str(url).split("/")[-1].split(".")[0]
with Session(db.engine) as session:
stmt = select(ToolFile).where(ToolFile.id == tool_file_id)
tool_file = session.scalar(stmt)
if tool_file is None:
raise ToolFileNotFoundError(tool_file_id)
mapping = {
"tool_file_id": tool_file_id,
"type": file_factory.get_file_type_by_mime_type(tool_file.mimetype),
"transfer_method": transfer_method,
"url": url,
}
file = file_factory.build_from_mapping(
mapping=mapping,
tenant_id=tenant_id,
)
files.append(file)
elif message.type == ToolInvokeMessage.MessageType.BLOB:
assert isinstance(message.message, ToolInvokeMessage.TextMessage)
assert message.meta
tool_file_id = message.message.text.split("/")[-1].split(".")[0]
with Session(db.engine) as session:
stmt = select(ToolFile).where(ToolFile.id == tool_file_id)
tool_file = session.scalar(stmt)
if tool_file is None:
raise ToolFileNotFoundError(tool_file_id)
mapping = {
"tool_file_id": tool_file_id,
"transfer_method": FileTransferMethod.TOOL_FILE,
}
files.append(
file_factory.build_from_mapping(
mapping=mapping,
tenant_id=tenant_id,
)
)
elif message.type == ToolInvokeMessage.MessageType.TEXT:
assert isinstance(message.message, ToolInvokeMessage.TextMessage)
text += message.message.text
yield StreamChunkEvent(
selector=[node_id, "text"],
chunk=message.message.text,
is_final=False,
)
elif message.type == ToolInvokeMessage.MessageType.JSON:
assert isinstance(message.message, ToolInvokeMessage.JsonMessage)
if node_type == BuiltinNodeTypes.AGENT:
if isinstance(message.message.json_object, dict):
msg_metadata: dict[str, Any] = message.message.json_object.pop("execution_metadata", {})
llm_usage = LLMUsage.from_metadata(cast(LLMUsageMetadata, msg_metadata))
agent_execution_metadata = {
WorkflowNodeExecutionMetadataKey(key): value
for key, value in msg_metadata.items()
if key in WorkflowNodeExecutionMetadataKey.__members__.values()
}
else:
llm_usage = LLMUsage.empty_usage()
agent_execution_metadata = {}
if message.message.json_object:
json_list.append(message.message.json_object)
elif message.type == ToolInvokeMessage.MessageType.LINK:
assert isinstance(message.message, ToolInvokeMessage.TextMessage)
stream_text = f"Link: {message.message.text}\n"
text += stream_text
yield StreamChunkEvent(
selector=[node_id, "text"],
chunk=stream_text,
is_final=False,
)
elif message.type == ToolInvokeMessage.MessageType.VARIABLE:
assert isinstance(message.message, ToolInvokeMessage.VariableMessage)
variable_name = message.message.variable_name
variable_value = message.message.variable_value
if message.message.stream:
if not isinstance(variable_value, str):
raise AgentVariableTypeError(
"When 'stream' is True, 'variable_value' must be a string.",
variable_name=variable_name,
expected_type="str",
actual_type=type(variable_value).__name__,
)
if variable_name not in variables:
variables[variable_name] = ""
variables[variable_name] += variable_value
yield StreamChunkEvent(
selector=[node_id, variable_name],
chunk=variable_value,
is_final=False,
)
else:
variables[variable_name] = variable_value
elif message.type == ToolInvokeMessage.MessageType.FILE:
assert message.meta is not None
assert isinstance(message.meta, dict)
if "file" not in message.meta:
raise AgentNodeError("File message is missing 'file' key in meta")
if not isinstance(message.meta["file"], File):
raise AgentNodeError(f"Expected File object but got {type(message.meta['file']).__name__}")
files.append(message.meta["file"])
elif message.type == ToolInvokeMessage.MessageType.LOG:
assert isinstance(message.message, ToolInvokeMessage.LogMessage)
if message.message.metadata:
icon = tool_info.get("icon", "")
dict_metadata = dict(message.message.metadata)
if dict_metadata.get("provider"):
manager = PluginInstaller()
plugins = manager.list_plugins(tenant_id)
try:
current_plugin = next(
plugin
for plugin in plugins
if f"{plugin.plugin_id}/{plugin.name}" == dict_metadata["provider"]
)
icon = current_plugin.declaration.icon
except StopIteration:
pass
icon_dark = None
try:
builtin_tool = next(
provider
for provider in BuiltinToolManageService.list_builtin_tools(
user_id,
tenant_id,
)
if provider.name == dict_metadata["provider"]
)
icon = builtin_tool.icon
icon_dark = builtin_tool.icon_dark
except StopIteration:
pass
dict_metadata["icon"] = icon
dict_metadata["icon_dark"] = icon_dark
message.message.metadata = dict_metadata
agent_log = AgentLogEvent(
message_id=message.message.id,
node_execution_id=node_execution_id,
parent_id=message.message.parent_id,
error=message.message.error,
status=message.message.status.value,
data=message.message.data,
label=message.message.label,
metadata=message.message.metadata,
node_id=node_id,
)
for log in agent_logs:
if log.message_id == agent_log.message_id:
log.data = agent_log.data
log.status = agent_log.status
log.error = agent_log.error
log.label = agent_log.label
log.metadata = agent_log.metadata
break
else:
agent_logs.append(agent_log)
yield agent_log
json_output: list[dict[str, Any] | list[Any]] = []
if agent_logs:
for log in agent_logs:
json_output.append(
{
"id": log.message_id,
"parent_id": log.parent_id,
"error": log.error,
"status": log.status,
"data": log.data,
"label": log.label,
"metadata": log.metadata,
"node_id": log.node_id,
}
)
if json_list:
json_output.extend(json_list)
else:
json_output.append({"data": []})
yield StreamChunkEvent(
selector=[node_id, "text"],
chunk="",
is_final=True,
)
for var_name in variables:
yield StreamChunkEvent(
selector=[node_id, var_name],
chunk="",
is_final=True,
)
yield StreamCompletedEvent(
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs={
"text": text,
"usage": jsonable_encoder(llm_usage),
"files": ArrayFileSegment(value=files),
"json": json_output,
**variables,
},
metadata={
**agent_execution_metadata,
WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info,
WorkflowNodeExecutionMetadataKey.AGENT_LOG: agent_logs,
},
inputs=parameters_for_log,
llm_usage=llm_usage,
)
)

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
from factories.agent_factory import get_plugin_agent_strategy
from .strategy_protocols import AgentStrategyPresentationProvider, AgentStrategyResolver, ResolvedAgentStrategy
class PluginAgentStrategyResolver(AgentStrategyResolver):
def resolve(
self,
*,
tenant_id: str,
agent_strategy_provider_name: str,
agent_strategy_name: str,
) -> ResolvedAgentStrategy:
return get_plugin_agent_strategy(
tenant_id=tenant_id,
agent_strategy_provider_name=agent_strategy_provider_name,
agent_strategy_name=agent_strategy_name,
)
class PluginAgentStrategyPresentationProvider(AgentStrategyPresentationProvider):
def get_icon(self, *, tenant_id: str, agent_strategy_provider_name: str) -> str | None:
from core.plugin.impl.plugin import PluginInstaller
manager = PluginInstaller()
try:
plugins = manager.list_plugins(tenant_id)
except Exception:
return None
try:
current_plugin = next(
plugin for plugin in plugins if f"{plugin.plugin_id}/{plugin.name}" == agent_strategy_provider_name
)
except StopIteration:
return None
return current_plugin.declaration.icon

View File

@@ -0,0 +1,276 @@
from __future__ import annotations
import json
from collections.abc import Sequence
from typing import Any, cast
from packaging.version import Version
from pydantic import ValidationError
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.agent.entities import AgentToolEntity
from core.agent.plugin_entities import AgentStrategyParameter
from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance, ModelManager
from core.plugin.entities.request import InvokeCredentials
from core.provider_manager import ProviderManager
from core.tools.entities.tool_entities import ToolIdentity, ToolParameter, ToolProviderType
from core.tools.tool_manager import ToolManager
from dify_graph.enums import SystemVariableKey
from dify_graph.model_runtime.entities.model_entities import AIModelEntity, ModelType
from dify_graph.runtime import VariablePool
from dify_graph.variables.segments import StringSegment
from extensions.ext_database import db
from models.model import Conversation
from .entities import AgentNodeData, AgentOldVersionModelFeatures, ParamsAutoGenerated
from .exceptions import AgentInputTypeError, AgentVariableNotFoundError
from .strategy_protocols import ResolvedAgentStrategy
class AgentRuntimeSupport:
def build_parameters(
self,
*,
agent_parameters: Sequence[AgentStrategyParameter],
variable_pool: VariablePool,
node_data: AgentNodeData,
strategy: ResolvedAgentStrategy,
tenant_id: str,
app_id: str,
invoke_from: Any,
for_log: bool = False,
) -> dict[str, Any]:
agent_parameters_dictionary = {parameter.name: parameter for parameter in agent_parameters}
result: dict[str, Any] = {}
for parameter_name in node_data.agent_parameters:
parameter = agent_parameters_dictionary.get(parameter_name)
if not parameter:
result[parameter_name] = None
continue
agent_input = node_data.agent_parameters[parameter_name]
match agent_input.type:
case "variable":
variable = variable_pool.get(agent_input.value) # type: ignore[arg-type]
if variable is None:
raise AgentVariableNotFoundError(str(agent_input.value))
parameter_value = variable.value
case "mixed" | "constant":
try:
if not isinstance(agent_input.value, str):
parameter_value = json.dumps(agent_input.value, ensure_ascii=False)
else:
parameter_value = str(agent_input.value)
except TypeError:
parameter_value = str(agent_input.value)
segment_group = variable_pool.convert_template(parameter_value)
parameter_value = segment_group.log if for_log else segment_group.text
try:
if not isinstance(agent_input.value, str):
parameter_value = json.loads(parameter_value)
except json.JSONDecodeError:
parameter_value = parameter_value
case _:
raise AgentInputTypeError(agent_input.type)
value = parameter_value
if parameter.type == "array[tools]":
value = cast(list[dict[str, Any]], value)
value = [tool for tool in value if tool.get("enabled", False)]
value = self._filter_mcp_type_tool(strategy, value)
for tool in value:
if "schemas" in tool:
tool.pop("schemas")
parameters = tool.get("parameters", {})
if all(isinstance(v, dict) for _, v in parameters.items()):
params = {}
for key, param in parameters.items():
if param.get("auto", ParamsAutoGenerated.OPEN) in (
ParamsAutoGenerated.CLOSE,
0,
):
value_param = param.get("value", {})
if value_param and value_param.get("type", "") == "variable":
variable_selector = value_param.get("value")
if not variable_selector:
raise ValueError("Variable selector is missing for a variable-type parameter.")
variable = variable_pool.get(variable_selector)
if variable is None:
raise AgentVariableNotFoundError(str(variable_selector))
params[key] = variable.value
else:
params[key] = value_param.get("value", "") if value_param is not None else None
else:
params[key] = None
parameters = params
tool["settings"] = {k: v.get("value", None) for k, v in tool.get("settings", {}).items()}
tool["parameters"] = parameters
if not for_log:
if parameter.type == "array[tools]":
value = cast(list[dict[str, Any]], value)
tool_value = []
for tool in value:
provider_type = ToolProviderType(tool.get("type", ToolProviderType.BUILT_IN))
setting_params = tool.get("settings", {})
parameters = tool.get("parameters", {})
manual_input_params = [key for key, value in parameters.items() if value is not None]
parameters = {**parameters, **setting_params}
entity = AgentToolEntity(
provider_id=tool.get("provider_name", ""),
provider_type=provider_type,
tool_name=tool.get("tool_name", ""),
tool_parameters=parameters,
plugin_unique_identifier=tool.get("plugin_unique_identifier", None),
credential_id=tool.get("credential_id", None),
)
extra = tool.get("extra", {})
runtime_variable_pool: VariablePool | None = None
if node_data.version != "1" or node_data.tool_node_version is not None:
runtime_variable_pool = variable_pool
tool_runtime = ToolManager.get_agent_tool_runtime(
tenant_id,
app_id,
entity,
invoke_from,
runtime_variable_pool,
)
if tool_runtime.entity.description:
tool_runtime.entity.description.llm = (
extra.get("description", "") or tool_runtime.entity.description.llm
)
for tool_runtime_params in tool_runtime.entity.parameters:
tool_runtime_params.form = (
ToolParameter.ToolParameterForm.FORM
if tool_runtime_params.name in manual_input_params
else tool_runtime_params.form
)
manual_input_value = {}
if tool_runtime.entity.parameters:
manual_input_value = {
key: value for key, value in parameters.items() if key in manual_input_params
}
runtime_parameters = {
**tool_runtime.runtime.runtime_parameters,
**manual_input_value,
}
tool_value.append(
{
**tool_runtime.entity.model_dump(mode="json"),
"runtime_parameters": runtime_parameters,
"credential_id": tool.get("credential_id", None),
"provider_type": provider_type.value,
}
)
value = tool_value
if parameter.type == AgentStrategyParameter.AgentStrategyParameterType.MODEL_SELECTOR:
value = cast(dict[str, Any], value)
model_instance, model_schema = self.fetch_model(tenant_id=tenant_id, value=value)
history_prompt_messages = []
if node_data.memory:
memory = self.fetch_memory(
variable_pool=variable_pool,
app_id=app_id,
model_instance=model_instance,
)
if memory:
prompt_messages = memory.get_history_prompt_messages(
message_limit=node_data.memory.window.size or None
)
history_prompt_messages = [
prompt_message.model_dump(mode="json") for prompt_message in prompt_messages
]
value["history_prompt_messages"] = history_prompt_messages
if model_schema:
model_schema = self._remove_unsupported_model_features_for_old_version(model_schema)
value["entity"] = model_schema.model_dump(mode="json")
else:
value["entity"] = None
result[parameter_name] = value
return result
def build_credentials(self, *, parameters: dict[str, Any]) -> InvokeCredentials:
credentials = InvokeCredentials()
credentials.tool_credentials = {}
for tool in parameters.get("tools", []):
if not tool.get("credential_id"):
continue
try:
identity = ToolIdentity.model_validate(tool.get("identity", {}))
except ValidationError:
continue
credentials.tool_credentials[identity.provider] = tool.get("credential_id", None)
return credentials
def fetch_memory(
self,
*,
variable_pool: VariablePool,
app_id: str,
model_instance: ModelInstance,
) -> TokenBufferMemory | None:
conversation_id_variable = variable_pool.get(["sys", SystemVariableKey.CONVERSATION_ID])
if not isinstance(conversation_id_variable, StringSegment):
return None
conversation_id = conversation_id_variable.value
with Session(db.engine, expire_on_commit=False) as session:
stmt = select(Conversation).where(Conversation.app_id == app_id, Conversation.id == conversation_id)
conversation = session.scalar(stmt)
if not conversation:
return None
return TokenBufferMemory(conversation=conversation, model_instance=model_instance)
def fetch_model(self, *, tenant_id: str, value: dict[str, Any]) -> tuple[ModelInstance, AIModelEntity | None]:
provider_manager = ProviderManager()
provider_model_bundle = provider_manager.get_provider_model_bundle(
tenant_id=tenant_id,
provider=value.get("provider", ""),
model_type=ModelType.LLM,
)
model_name = value.get("model", "")
model_credentials = provider_model_bundle.configuration.get_current_credentials(
model_type=ModelType.LLM,
model=model_name,
)
provider_name = provider_model_bundle.configuration.provider.provider
model_type_instance = provider_model_bundle.model_type_instance
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
provider=provider_name,
model_type=ModelType(value.get("model_type", "")),
model=model_name,
)
model_schema = model_type_instance.get_model_schema(model_name, model_credentials)
return model_instance, model_schema
@staticmethod
def _remove_unsupported_model_features_for_old_version(model_schema: AIModelEntity) -> AIModelEntity:
if model_schema.features:
for feature in model_schema.features[:]:
try:
AgentOldVersionModelFeatures(feature.value)
except ValueError:
model_schema.features.remove(feature)
return model_schema
@staticmethod
def _filter_mcp_type_tool(
strategy: ResolvedAgentStrategy,
tools: list[dict[str, Any]],
) -> list[dict[str, Any]]:
meta_version = strategy.meta_version
if meta_version and Version(meta_version) > Version("0.0.1"):
return tools
return [tool for tool in tools if tool.get("type") != ToolProviderType.MCP]

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
from collections.abc import Generator, Sequence
from typing import Any, Protocol
from core.agent.plugin_entities import AgentStrategyParameter
from core.plugin.entities.request import InvokeCredentials
from core.tools.entities.tool_entities import ToolInvokeMessage
class ResolvedAgentStrategy(Protocol):
meta_version: str | None
def get_parameters(self) -> Sequence[AgentStrategyParameter]: ...
def invoke(
self,
*,
params: dict[str, Any],
user_id: str,
conversation_id: str | None = None,
app_id: str | None = None,
message_id: str | None = None,
credentials: InvokeCredentials | None = None,
) -> Generator[ToolInvokeMessage, None, None]: ...
class AgentStrategyResolver(Protocol):
def resolve(
self,
*,
tenant_id: str,
agent_strategy_provider_name: str,
agent_strategy_name: str,
) -> ResolvedAgentStrategy: ...
class AgentStrategyPresentationProvider(Protocol):
def get_icon(self, *, tenant_id: str, agent_strategy_provider_name: str) -> str | None: ...

View File

@@ -0,0 +1 @@
"""Datasource workflow node package."""

View File

@@ -1,22 +1,17 @@
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any
from core.datasource.datasource_manager import DatasourceManager
from core.datasource.entities.datasource_entities import DatasourceProviderType
from core.plugin.impl.exc import PluginDaemonClientSideError
from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType, SystemVariableKey
from dify_graph.enums import BuiltinNodeTypes, NodeExecutionType, SystemVariableKey, WorkflowNodeExecutionMetadataKey
from dify_graph.node_events import NodeRunResult, StreamCompletedEvent
from dify_graph.nodes.base.node import Node
from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser
from dify_graph.repositories.datasource_manager_protocol import (
DatasourceManagerProtocol,
DatasourceParameter,
OnlineDriveDownloadFileParam,
)
from ...entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from .entities import DatasourceNodeData
from .entities import DatasourceNodeData, DatasourceParameter, OnlineDriveDownloadFileParam
from .exc import DatasourceNodeError
if TYPE_CHECKING:
@@ -29,7 +24,7 @@ class DatasourceNode(Node[DatasourceNodeData]):
Datasource Node
"""
node_type = NodeType.DATASOURCE
node_type = BuiltinNodeTypes.DATASOURCE
execution_type = NodeExecutionType.ROOT
def __init__(
@@ -38,7 +33,6 @@ class DatasourceNode(Node[DatasourceNodeData]):
config: NodeConfigDict,
graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState",
datasource_manager: DatasourceManagerProtocol,
):
super().__init__(
id=id,
@@ -46,7 +40,11 @@ class DatasourceNode(Node[DatasourceNodeData]):
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)
self.datasource_manager = datasource_manager
self.datasource_manager = DatasourceManager
def populate_start_event(self, event) -> None:
event.provider_id = f"{self.node_data.plugin_id}/{self.node_data.provider_name}"
event.provider_type = self.node_data.provider_type
def _run(self) -> Generator:
"""

View File

@@ -4,7 +4,7 @@ from pydantic import BaseModel, field_validator
from pydantic_core.core_schema import ValidationInfo
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes, NodeType
class DatasourceEntity(BaseModel):
@@ -17,7 +17,7 @@ class DatasourceEntity(BaseModel):
class DatasourceNodeData(BaseNodeData, DatasourceEntity):
type: NodeType = NodeType.DATASOURCE
type: NodeType = BuiltinNodeTypes.DATASOURCE
class DatasourceInput(BaseModel):
# TODO: check this type
@@ -42,3 +42,14 @@ class DatasourceNodeData(BaseNodeData, DatasourceEntity):
return typ
datasource_parameters: dict[str, DatasourceInput] | None = None
class DatasourceParameter(BaseModel):
workspace_id: str
page_id: str
type: str
class OnlineDriveDownloadFileParam(BaseModel):
id: str
bucket: str

View File

@@ -1,25 +1,10 @@
from collections.abc import Generator
from typing import Any, Protocol
from pydantic import BaseModel
from dify_graph.file import File
from dify_graph.node_events import StreamChunkEvent, StreamCompletedEvent
class DatasourceParameter(BaseModel):
workspace_id: str
page_id: str
type: str
class OnlineDriveDownloadFileParam(BaseModel):
id: str
bucket: str
class DatasourceFinal(BaseModel):
data: dict[str, Any] | None = None
from .entities import DatasourceParameter, OnlineDriveDownloadFileParam
class DatasourceManagerProtocol(Protocol):

View File

@@ -0,0 +1,5 @@
"""Knowledge index workflow node package."""
KNOWLEDGE_INDEX_NODE_TYPE = "knowledge-index"
__all__ = ["KNOWLEDGE_INDEX_NODE_TYPE"]

View File

@@ -3,6 +3,7 @@ from typing import Literal, Union
from pydantic import BaseModel
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.workflow.nodes.knowledge_index import KNOWLEDGE_INDEX_NODE_TYPE
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
@@ -156,7 +157,7 @@ class KnowledgeIndexNodeData(BaseNodeData):
Knowledge index Node Data.
"""
type: NodeType = NodeType.KNOWLEDGE_INDEX
type: NodeType = KNOWLEDGE_INDEX_NODE_TYPE
chunk_structure: str
index_chunk_variable_selector: list[str]
indexing_technique: str | None = None

View File

@@ -2,14 +2,15 @@ import logging
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any
from core.rag.index_processor.index_processor import IndexProcessor
from core.rag.summary_index.summary_index import SummaryIndex
from core.workflow.nodes.knowledge_index import KNOWLEDGE_INDEX_NODE_TYPE
from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType, SystemVariableKey
from dify_graph.enums import NodeExecutionType, SystemVariableKey
from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node
from dify_graph.nodes.base.template import Template
from dify_graph.repositories.index_processor_protocol import IndexProcessorProtocol
from dify_graph.repositories.summary_index_service_protocol import SummaryIndexServiceProtocol
from .entities import KnowledgeIndexNodeData
from .exc import (
@@ -25,7 +26,7 @@ _INVOKE_FROM_DEBUGGER = "debugger"
class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
node_type = NodeType.KNOWLEDGE_INDEX
node_type = KNOWLEDGE_INDEX_NODE_TYPE
execution_type = NodeExecutionType.RESPONSE
def __init__(
@@ -34,12 +35,10 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
config: NodeConfigDict,
graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState",
index_processor: IndexProcessorProtocol,
summary_index_service: SummaryIndexServiceProtocol,
) -> None:
super().__init__(id, config, graph_init_params, graph_runtime_state)
self.index_processor = index_processor
self.summary_index_service = summary_index_service
self.index_processor = IndexProcessor()
self.summary_index_service = SummaryIndex()
def _run(self) -> NodeRunResult: # type: ignore
node_data = self.node_data

View File

@@ -5,21 +5,21 @@ from pydantic import BaseModel, Field
class PreviewItem(BaseModel):
content: str | None = Field(None)
child_chunks: list[str] | None = Field(None)
summary: str | None = Field(None)
content: str | None = Field(default=None)
child_chunks: list[str] | None = Field(default=None)
summary: str | None = Field(default=None)
class QaPreview(BaseModel):
answer: str | None = Field(None)
question: str | None = Field(None)
answer: str | None = Field(default=None)
question: str | None = Field(default=None)
class Preview(BaseModel):
chunk_structure: str
parent_mode: str | None = Field(None)
preview: list[PreviewItem] = Field([])
qa_preview: list[QaPreview] = Field([])
parent_mode: str | None = Field(default=None)
preview: list[PreviewItem] = Field(default_factory=list)
qa_preview: list[QaPreview] = Field(default_factory=list)
total_segments: int
@@ -39,3 +39,9 @@ class IndexProcessorProtocol(Protocol):
def get_preview_output(
self, chunks: Any, dataset_id: str, document_id: str, chunk_structure: str, summary_index_setting: dict | None
) -> Preview: ...
class SummaryIndexServiceProtocol(Protocol):
def generate_and_vectorize_summary(
self, dataset_id: str, document_id: str, is_preview: bool, summary_index_setting: dict | None = None
) -> None: ...

View File

@@ -0,0 +1 @@
"""Knowledge retrieval workflow node package."""

View File

@@ -4,7 +4,7 @@ from typing import Literal
from pydantic import BaseModel, Field
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.nodes.llm.entities import ModelConfig, VisionConfig
@@ -114,7 +114,7 @@ class KnowledgeRetrievalNodeData(BaseNodeData):
Knowledge retrieval Node Data.
"""
type: NodeType = NodeType.KNOWLEDGE_RETRIEVAL
type: NodeType = BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL
query_variable_selector: list[str] | None | str = None
query_attachment_selector: list[str] | None | str = None
dataset_ids: list[str]

View File

@@ -1,12 +1,19 @@
"""Knowledge retrieval workflow node implementation.
This node now lives under ``core.workflow.nodes`` and is discovered directly by
the workflow node registry.
"""
import logging
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, Literal
from core.app.app_config.entities import DatasetRetrieveConfigEntity
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.enums import (
NodeType,
BuiltinNodeTypes,
WorkflowNodeExecutionMetadataKey,
WorkflowNodeExecutionStatus,
)
@@ -15,7 +22,6 @@ from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base import LLMUsageTrackingMixin
from dify_graph.nodes.base.node import Node
from dify_graph.repositories.rag_retrieval_protocol import KnowledgeRetrievalRequest, RAGRetrievalProtocol, Source
from dify_graph.variables import (
ArrayFileSegment,
FileSegment,
@@ -32,6 +38,7 @@ from .exc import (
KnowledgeRetrievalNodeError,
RateLimitExceededError,
)
from .retrieval import KnowledgeRetrievalRequest, Source
if TYPE_CHECKING:
from dify_graph.file.models import File
@@ -41,7 +48,7 @@ logger = logging.getLogger(__name__)
class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeData]):
node_type = NodeType.KNOWLEDGE_RETRIEVAL
node_type = BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL
# Instance attributes specific to LLMNode.
# Output variable for file
@@ -53,7 +60,6 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
config: NodeConfigDict,
graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState",
rag_retrieval: RAGRetrievalProtocol,
):
super().__init__(
id=id,
@@ -63,7 +69,7 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
)
# LLM file outputs, used for MultiModal outputs.
self._file_outputs = []
self._rag_retrieval = rag_retrieval
self._rag_retrieval = DatasetRetrieval()
@classmethod
def version(cls):

View File

@@ -3,9 +3,10 @@ from typing import Any, Literal, Protocol
from pydantic import BaseModel, Field
from dify_graph.model_runtime.entities import LLMUsage
from dify_graph.nodes.knowledge_retrieval.entities import MetadataFilteringCondition
from dify_graph.nodes.llm.entities import ModelConfig
from .entities import MetadataFilteringCondition
class SourceChildChunk(BaseModel):
id: str = Field(default="", description="Child chunk ID")
@@ -28,7 +29,7 @@ class SourceMetadata(BaseModel):
segment_id: str | None = Field(default=None, description="Segment unique identifier")
retriever_from: str = Field(default="workflow", description="Retriever source context")
score: float = Field(default=0.0, description="Retrieval relevance score")
child_chunks: list[SourceChildChunk] = Field(default=[], description="List of child chunks")
child_chunks: list[SourceChildChunk] = Field(default_factory=list, description="List of child chunks")
segment_hit_count: int | None = Field(default=0, description="Number of times segment was retrieved")
segment_word_count: int | None = Field(default=0, description="Word count of the segment")
segment_position: int | None = Field(default=0, description="Position of segment in document")
@@ -81,28 +82,7 @@ class KnowledgeRetrievalRequest(BaseModel):
class RAGRetrievalProtocol(Protocol):
"""Protocol for RAG-based knowledge retrieval implementations.
Implementations of this protocol handle knowledge retrieval from datasets
including rate limiting, dataset filtering, and document retrieval.
"""
@property
def llm_usage(self) -> LLMUsage:
"""Return accumulated LLM usage for retrieval operations."""
...
def llm_usage(self) -> LLMUsage: ...
def knowledge_retrieval(self, request: KnowledgeRetrievalRequest) -> list[Source]:
"""Retrieve knowledge from datasets based on the provided request.
Args:
request: Knowledge retrieval request with search parameters
Returns:
List of sources matching the search criteria
Raises:
RateLimitExceededError: If rate limit is exceeded
ModelNotExistError: If specified model doesn't exist
"""
...
def knowledge_retrieval(self, request: KnowledgeRetrievalRequest) -> list[Source]: ...

View File

@@ -3,16 +3,18 @@ from typing import Any, Literal, Union
from pydantic import BaseModel, Field, ValidationInfo, field_validator
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from core.trigger.entities.entities import EventParameter
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
from dify_graph.nodes.trigger_plugin.exc import TriggerEventParameterError
from .exc import TriggerEventParameterError
class TriggerEventNodeData(BaseNodeData):
"""Plugin trigger node data"""
type: NodeType = NodeType.TRIGGER_PLUGIN
type: NodeType = TRIGGER_PLUGIN_NODE_TYPE
class TriggerEventInput(BaseModel):
value: Union[Any, list[str]]

View File

@@ -1,8 +1,10 @@
from collections.abc import Mapping
from typing import Any, cast
from core.trigger.constants import TRIGGER_INFO_METADATA_KEY, TRIGGER_PLUGIN_NODE_TYPE
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, WorkflowNodeExecutionMetadataKey
from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node
@@ -10,7 +12,7 @@ from .entities import TriggerEventNodeData
class TriggerEventNode(Node[TriggerEventNodeData]):
node_type = NodeType.TRIGGER_PLUGIN
node_type = TRIGGER_PLUGIN_NODE_TYPE
execution_type = NodeExecutionType.ROOT
@classmethod
@@ -32,6 +34,9 @@ class TriggerEventNode(Node[TriggerEventNodeData]):
def version(cls) -> str:
return "1"
def populate_start_event(self, event) -> None:
event.provider_id = self.node_data.provider_id
def _run(self) -> NodeRunResult:
"""
Run the plugin trigger node.
@@ -41,8 +46,8 @@ class TriggerEventNode(Node[TriggerEventNodeData]):
"""
# Get trigger data passed when workflow was triggered
metadata = {
WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: {
metadata: dict[WorkflowNodeExecutionMetadataKey, Any] = {
cast(WorkflowNodeExecutionMetadataKey, TRIGGER_INFO_METADATA_KEY): {
"provider_id": self.node_data.provider_id,
"event_name": self.node_data.event_name,
"plugin_unique_identifier": self.node_data.plugin_unique_identifier,

View File

@@ -0,0 +1,3 @@
from .trigger_schedule_node import TriggerScheduleNode
__all__ = ["TriggerScheduleNode"]

View File

@@ -2,6 +2,7 @@ from typing import Literal, Union
from pydantic import BaseModel, Field
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
@@ -11,7 +12,7 @@ class TriggerScheduleNodeData(BaseNodeData):
Trigger Schedule Node Data
"""
type: NodeType = NodeType.TRIGGER_SCHEDULE
type: NodeType = TRIGGER_SCHEDULE_NODE_TYPE
mode: str = Field(default="visual", description="Schedule mode: visual or cron")
frequency: str | None = Field(default=None, description="Frequency for visual mode: hourly, daily, weekly, monthly")
cron_expression: str | None = Field(default=None, description="Cron expression for cron mode")

View File

@@ -1,15 +1,17 @@
from collections.abc import Mapping
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType
from dify_graph.enums import NodeExecutionType
from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node
from dify_graph.nodes.trigger_schedule.entities import TriggerScheduleNodeData
from .entities import TriggerScheduleNodeData
class TriggerScheduleNode(Node[TriggerScheduleNodeData]):
node_type = NodeType.TRIGGER_SCHEDULE
node_type = TRIGGER_SCHEDULE_NODE_TYPE
execution_type = NodeExecutionType.ROOT
@classmethod
@@ -19,7 +21,7 @@ class TriggerScheduleNode(Node[TriggerScheduleNodeData]):
@classmethod
def get_default_config(cls, filters: Mapping[str, object] | None = None) -> Mapping[str, object]:
return {
"type": "trigger-schedule",
"type": TRIGGER_SCHEDULE_NODE_TYPE,
"config": {
"mode": "visual",
"frequency": "daily",

View File

@@ -3,6 +3,7 @@ from enum import StrEnum
from pydantic import BaseModel, Field, field_validator
from core.trigger.constants import TRIGGER_WEBHOOK_NODE_TYPE
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType
from dify_graph.variables.types import SegmentType
@@ -93,7 +94,7 @@ class WebhookData(BaseNodeData):
class SyncMode(StrEnum):
SYNC = "async" # only support
type: NodeType = NodeType.TRIGGER_WEBHOOK
type: NodeType = TRIGGER_WEBHOOK_NODE_TYPE
method: Method = Method.GET
content_type: ContentType = Field(default=ContentType.JSON)
headers: Sequence[WebhookParameter] = Field(default_factory=list)

View File

@@ -2,9 +2,10 @@ import logging
from collections.abc import Mapping
from typing import Any
from core.trigger.constants import TRIGGER_WEBHOOK_NODE_TYPE
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType
from dify_graph.enums import NodeExecutionType
from dify_graph.file import FileTransferMethod
from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node
@@ -19,7 +20,7 @@ logger = logging.getLogger(__name__)
class TriggerWebhookNode(Node[WebhookData]):
node_type = NodeType.TRIGGER_WEBHOOK
node_type = TRIGGER_WEBHOOK_NODE_TYPE
execution_type = NodeExecutionType.ROOT
@classmethod

View File

@@ -8,7 +8,7 @@ from core.app.apps.exc import GenerateTaskStoppedError
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_dify_run_context
from core.app.workflow.layers.llm_quota import LLMQuotaLayer
from core.app.workflow.layers.observability import ObservabilityLayer
from core.workflow.node_factory import DifyNodeFactory
from core.workflow.node_factory import DifyNodeFactory, resolve_workflow_node_class
from dify_graph.constants import ENVIRONMENT_VARIABLE_NODE_ID
from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDictAdapter
@@ -21,9 +21,8 @@ from dify_graph.graph_engine.layers import DebugLoggingLayer, ExecutionLimitsLay
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_engine.protocols.command_channel import CommandChannel
from dify_graph.graph_events import GraphEngineEvent, GraphNodeEventBase, GraphRunFailedEvent
from dify_graph.nodes import NodeType
from dify_graph.nodes import BuiltinNodeTypes
from dify_graph.nodes.base.node import Node
from dify_graph.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING
from dify_graph.runtime import ChildGraphNotFoundError, GraphRuntimeState, VariablePool
from dify_graph.system_variable import SystemVariable
from dify_graph.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool
@@ -253,7 +252,7 @@ class WorkflowEntry:
variable_mapping=variable_mapping,
user_inputs=user_inputs,
)
if node_type != NodeType.DATASOURCE:
if node_type != BuiltinNodeTypes.DATASOURCE:
cls.mapping_user_inputs_to_variable_pool(
variable_mapping=variable_mapping,
user_inputs=user_inputs,
@@ -303,7 +302,7 @@ class WorkflowEntry:
"height": node_height,
"type": "custom",
"data": {
"type": NodeType.START,
"type": BuiltinNodeTypes.START,
"title": "Start",
"desc": "Start",
},
@@ -339,11 +338,11 @@ class WorkflowEntry:
# Create a minimal graph for single node execution
graph_dict = cls._create_single_node_graph(node_id, node_data)
node_type = NodeType(node_data.get("type", ""))
if node_type not in {NodeType.PARAMETER_EXTRACTOR, NodeType.QUESTION_CLASSIFIER}:
node_type = node_data.get("type", "")
if node_type not in {BuiltinNodeTypes.PARAMETER_EXTRACTOR, BuiltinNodeTypes.QUESTION_CLASSIFIER}:
raise ValueError(f"Node type {node_type} not supported")
node_cls = NODE_TYPE_CLASSES_MAPPING[node_type]["1"]
node_cls = resolve_workflow_node_class(node_type=node_type, node_version="1")
if not node_cls:
raise ValueError(f"Node class not found for node type {node_type}")

Some files were not shown because too many files have changed in this diff Show More