mirror of
https://github.com/langgenius/dify.git
synced 2026-03-15 20:27:02 +00:00
Compare commits
31 Commits
yanli/pyre
...
fix/use-ba
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b5c5cb72f0 | ||
|
|
1863cdbff0 | ||
|
|
cc769b9dd8 | ||
|
|
6f8ff490a8 | ||
|
|
a9ff96113a | ||
|
|
0d549af0a9 | ||
|
|
2a04f59984 | ||
|
|
a9f507cd2e | ||
|
|
96fd3bde1b | ||
|
|
f9ded4960a | ||
|
|
c1e8735f7a | ||
|
|
f795d24151 | ||
|
|
0fa7548346 | ||
|
|
ac8021fe27 | ||
|
|
f21288df5a | ||
|
|
8f5d65a7fc | ||
|
|
8f8477a19f | ||
|
|
fb41b215c8 | ||
|
|
214f24bcd3 | ||
|
|
1677a52900 | ||
|
|
e054d5d8b3 | ||
|
|
1b6e695520 | ||
|
|
98df8e1d6c | ||
|
|
79a8747c1b | ||
|
|
e64f4d6039 | ||
|
|
6043ec4423 | ||
|
|
a600794370 | ||
|
|
573b4e41cb | ||
|
|
194c205ed3 | ||
|
|
7e1dc3c122 | ||
|
|
4203647c32 |
@@ -187,53 +187,13 @@ const Template = useMemo(() => {
|
||||
|
||||
**When**: Component directly handles API calls, data transformation, or complex async operations.
|
||||
|
||||
**Dify Convention**: Use `@tanstack/react-query` hooks from `web/service/use-*.ts` or create custom data hooks.
|
||||
|
||||
```typescript
|
||||
// ❌ Before: API logic in component
|
||||
const MCPServiceCard = () => {
|
||||
const [basicAppConfig, setBasicAppConfig] = useState({})
|
||||
|
||||
useEffect(() => {
|
||||
if (isBasicApp && appId) {
|
||||
(async () => {
|
||||
const res = await fetchAppDetail({ url: '/apps', id: appId })
|
||||
setBasicAppConfig(res?.model_config || {})
|
||||
})()
|
||||
}
|
||||
}, [appId, isBasicApp])
|
||||
|
||||
// More API-related logic...
|
||||
}
|
||||
|
||||
// ✅ After: Extract to data hook using React Query
|
||||
// use-app-config.ts
|
||||
import { useQuery } from '@tanstack/react-query'
|
||||
import { get } from '@/service/base'
|
||||
|
||||
const NAME_SPACE = 'appConfig'
|
||||
|
||||
export const useAppConfig = (appId: string, isBasicApp: boolean) => {
|
||||
return useQuery({
|
||||
enabled: isBasicApp && !!appId,
|
||||
queryKey: [NAME_SPACE, 'detail', appId],
|
||||
queryFn: () => get<AppDetailResponse>(`/apps/${appId}`),
|
||||
select: data => data?.model_config || {},
|
||||
})
|
||||
}
|
||||
|
||||
// Component becomes cleaner
|
||||
const MCPServiceCard = () => {
|
||||
const { data: config, isLoading } = useAppConfig(appId, isBasicApp)
|
||||
// UI only
|
||||
}
|
||||
```
|
||||
|
||||
**React Query Best Practices in Dify**:
|
||||
- Define `NAME_SPACE` for query key organization
|
||||
- Use `enabled` option for conditional fetching
|
||||
- Use `select` for data transformation
|
||||
- Export invalidation hooks: `useInvalidXxx`
|
||||
**Dify Convention**:
|
||||
- This skill is for component decomposition, not query/mutation design.
|
||||
- When refactoring data fetching, follow `web/AGENTS.md`.
|
||||
- Use `orpc-contract-first` for contracts, query shape, data-fetching wrappers, and query/mutation call-site patterns.
|
||||
- Use `web/docs/query-mutation.md` for Dify-specific conditional query, invalidation, and mutation error-handling rules.
|
||||
- Do not introduce deprecated `useInvalid` / `useReset`.
|
||||
- Do not add thin passthrough `useQuery` wrappers during refactoring; only extract a custom hook when it truly orchestrates multiple queries/mutations or shared derived state.
|
||||
|
||||
**Dify Examples**:
|
||||
- `web/service/use-workflow.ts`
|
||||
|
||||
@@ -155,48 +155,15 @@ const Configuration: FC = () => {
|
||||
|
||||
## Common Hook Patterns in Dify
|
||||
|
||||
### 1. Data Fetching Hook (React Query)
|
||||
### 1. Data Fetching / Mutation Hooks
|
||||
|
||||
```typescript
|
||||
// Pattern: Use @tanstack/react-query for data fetching
|
||||
import { useQuery, useQueryClient } from '@tanstack/react-query'
|
||||
import { get } from '@/service/base'
|
||||
import { useInvalid } from '@/service/use-base'
|
||||
When hook extraction touches query or mutation code, do not use this reference as the source of truth for data-layer patterns.
|
||||
|
||||
const NAME_SPACE = 'appConfig'
|
||||
|
||||
// Query keys for cache management
|
||||
export const appConfigQueryKeys = {
|
||||
detail: (appId: string) => [NAME_SPACE, 'detail', appId] as const,
|
||||
}
|
||||
|
||||
// Main data hook
|
||||
export const useAppConfig = (appId: string) => {
|
||||
return useQuery({
|
||||
enabled: !!appId,
|
||||
queryKey: appConfigQueryKeys.detail(appId),
|
||||
queryFn: () => get<AppDetailResponse>(`/apps/${appId}`),
|
||||
select: data => data?.model_config || null,
|
||||
})
|
||||
}
|
||||
|
||||
// Invalidation hook for refreshing data
|
||||
export const useInvalidAppConfig = () => {
|
||||
return useInvalid([NAME_SPACE])
|
||||
}
|
||||
|
||||
// Usage in component
|
||||
const Component = () => {
|
||||
const { data: config, isLoading, error, refetch } = useAppConfig(appId)
|
||||
const invalidAppConfig = useInvalidAppConfig()
|
||||
|
||||
const handleRefresh = () => {
|
||||
invalidAppConfig() // Invalidates cache and triggers refetch
|
||||
}
|
||||
|
||||
return <div>...</div>
|
||||
}
|
||||
```
|
||||
- Follow `web/AGENTS.md` first.
|
||||
- Use `orpc-contract-first` for contracts, query shape, data-fetching wrappers, and query/mutation call-site patterns.
|
||||
- Use `web/docs/query-mutation.md` for conditional query, invalidation, and mutation error-handling rules.
|
||||
- Do not introduce deprecated `useInvalid` / `useReset`.
|
||||
- Do not extract thin passthrough `useQuery` hooks; only extract orchestration hooks.
|
||||
|
||||
### 2. Form State Hook
|
||||
|
||||
|
||||
34
.github/actions/setup-web/action.yml
vendored
34
.github/actions/setup-web/action.yml
vendored
@@ -1,33 +1,13 @@
|
||||
name: Setup Web Environment
|
||||
description: Setup pnpm, Node.js, and install web dependencies.
|
||||
|
||||
inputs:
|
||||
node-version:
|
||||
description: Node.js version to use
|
||||
required: false
|
||||
default: "22"
|
||||
install-dependencies:
|
||||
description: Whether to install web dependencies after setting up Node.js
|
||||
required: false
|
||||
default: "true"
|
||||
|
||||
runs:
|
||||
using: composite
|
||||
steps:
|
||||
- name: Install pnpm
|
||||
uses: pnpm/action-setup@41ff72655975bd51cab0327fa583b6e92b6d3061 # v4.2.0
|
||||
- name: Setup Vite+
|
||||
uses: voidzero-dev/setup-vp@b5d848f5a62488f3d3d920f8aa6ac318a60c5f07 # v1
|
||||
with:
|
||||
package_json_file: web/package.json
|
||||
run_install: false
|
||||
|
||||
- name: Setup Node.js
|
||||
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # v6.3.0
|
||||
with:
|
||||
node-version: ${{ inputs.node-version }}
|
||||
cache: pnpm
|
||||
cache-dependency-path: ./web/pnpm-lock.yaml
|
||||
|
||||
- name: Install dependencies
|
||||
if: ${{ inputs.install-dependencies == 'true' }}
|
||||
shell: bash
|
||||
run: pnpm --dir web install --frozen-lockfile
|
||||
node-version-file: "./web/.nvmrc"
|
||||
cache: true
|
||||
run-install: |
|
||||
- cwd: ./web
|
||||
args: ['--frozen-lockfile']
|
||||
|
||||
4
.github/workflows/autofix.yml
vendored
4
.github/workflows/autofix.yml
vendored
@@ -102,13 +102,11 @@ jobs:
|
||||
- name: Setup web environment
|
||||
if: steps.web-changes.outputs.any_changed == 'true'
|
||||
uses: ./.github/actions/setup-web
|
||||
with:
|
||||
node-version: "24"
|
||||
|
||||
- name: ESLint autofix
|
||||
if: steps.web-changes.outputs.any_changed == 'true'
|
||||
run: |
|
||||
cd web
|
||||
pnpm eslint --concurrency=2 --prune-suppressions --quiet || true
|
||||
vp exec eslint --concurrency=2 --prune-suppressions --quiet || true
|
||||
|
||||
- uses: autofix-ci/action@7a166d7532b277f34e16238930461bf77f9d7ed8 # v1.3.3
|
||||
|
||||
8
.github/workflows/style.yml
vendored
8
.github/workflows/style.yml
vendored
@@ -88,7 +88,7 @@ jobs:
|
||||
if: steps.changed-files.outputs.any_changed == 'true'
|
||||
working-directory: ./web
|
||||
run: |
|
||||
pnpm run lint:ci
|
||||
vp run lint:ci
|
||||
# pnpm run lint:report
|
||||
# continue-on-error: true
|
||||
|
||||
@@ -102,17 +102,17 @@ jobs:
|
||||
- name: Web tsslint
|
||||
if: steps.changed-files.outputs.any_changed == 'true'
|
||||
working-directory: ./web
|
||||
run: pnpm run lint:tss
|
||||
run: vp run lint:tss
|
||||
|
||||
- name: Web type check
|
||||
if: steps.changed-files.outputs.any_changed == 'true'
|
||||
working-directory: ./web
|
||||
run: pnpm run type-check
|
||||
run: vp run type-check
|
||||
|
||||
- name: Web dead code check
|
||||
if: steps.changed-files.outputs.any_changed == 'true'
|
||||
working-directory: ./web
|
||||
run: pnpm run knip
|
||||
run: vp run knip
|
||||
|
||||
superlinter:
|
||||
name: SuperLinter
|
||||
|
||||
2
.github/workflows/translate-i18n-claude.yml
vendored
2
.github/workflows/translate-i18n-claude.yml
vendored
@@ -50,8 +50,6 @@ jobs:
|
||||
|
||||
- name: Setup web environment
|
||||
uses: ./.github/actions/setup-web
|
||||
with:
|
||||
install-dependencies: "false"
|
||||
|
||||
- name: Detect changed files and generate diff
|
||||
id: detect_changes
|
||||
|
||||
6
.github/workflows/web-tests.yml
vendored
6
.github/workflows/web-tests.yml
vendored
@@ -43,7 +43,7 @@ jobs:
|
||||
uses: ./.github/actions/setup-web
|
||||
|
||||
- name: Run tests
|
||||
run: pnpm vitest run --reporter=blob --shard=${{ matrix.shardIndex }}/${{ matrix.shardTotal }} --coverage
|
||||
run: vp test run --reporter=blob --shard=${{ matrix.shardIndex }}/${{ matrix.shardTotal }} --coverage
|
||||
|
||||
- name: Upload blob report
|
||||
if: ${{ !cancelled() }}
|
||||
@@ -84,7 +84,7 @@ jobs:
|
||||
merge-multiple: true
|
||||
|
||||
- name: Merge reports
|
||||
run: pnpm vitest --merge-reports --reporter=json --reporter=agent --coverage
|
||||
run: vp test --merge-reports --reporter=json --reporter=agent --coverage
|
||||
|
||||
- name: Check app/components diff coverage
|
||||
env:
|
||||
@@ -447,4 +447,4 @@ jobs:
|
||||
- name: Web build check
|
||||
if: steps.changed-files.outputs.any_changed == 'true'
|
||||
working-directory: ./web
|
||||
run: pnpm run build
|
||||
run: vp run build
|
||||
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -237,3 +237,6 @@ scripts/stress-test/reports/
|
||||
# settings
|
||||
*.local.json
|
||||
*.local.md
|
||||
|
||||
# Code Agent Folder
|
||||
.qoder/*
|
||||
@@ -22,10 +22,10 @@ APP_WEB_URL=http://localhost:3000
|
||||
# Files URL
|
||||
FILES_URL=http://localhost:5001
|
||||
|
||||
# INTERNAL_FILES_URL is used for plugin daemon communication within Docker network.
|
||||
# Set this to the internal Docker service URL for proper plugin file access.
|
||||
# Example: INTERNAL_FILES_URL=http://api:5001
|
||||
INTERNAL_FILES_URL=http://127.0.0.1:5001
|
||||
# INTERNAL_FILES_URL is used by services running in Docker to reach the API file endpoints.
|
||||
# For Docker Desktop (Mac/Windows), use http://host.docker.internal:5001 when the API runs on the host.
|
||||
# For Docker Compose on Linux, use http://api:5001 when the API runs inside the Docker network.
|
||||
INTERNAL_FILES_URL=http://host.docker.internal:5001
|
||||
|
||||
# TRIGGER URL
|
||||
TRIGGER_URL=http://localhost:5001
|
||||
@@ -180,7 +180,7 @@ CONSOLE_CORS_ALLOW_ORIGINS=http://localhost:3000,*
|
||||
COOKIE_DOMAIN=
|
||||
|
||||
# Vector database configuration
|
||||
# Supported values are `weaviate`, `oceanbase`, `qdrant`, `milvus`, `myscale`, `relyt`, `pgvector`, `pgvecto-rs`, `chroma`, `opensearch`, `oracle`, `tencent`, `elasticsearch`, `elasticsearch-ja`, `analyticdb`, `couchbase`, `vikingdb`, `opengauss`, `tablestore`,`vastbase`,`tidb`,`tidb_on_qdrant`,`baidu`,`lindorm`,`huawei_cloud`,`upstash`, `matrixone`.
|
||||
# Supported values are `weaviate`, `oceanbase`, `qdrant`, `milvus`, `myscale`, `relyt`, `pgvector`, `pgvecto-rs`, `chroma`, `opensearch`, `oracle`, `tencent`, `elasticsearch`, `elasticsearch-ja`, `analyticdb`, `couchbase`, `vikingdb`, `opengauss`, `tablestore`,`vastbase`,`tidb`,`tidb_on_qdrant`,`baidu`,`lindorm`,`huawei_cloud`,`upstash`, `matrixone`, `hologres`.
|
||||
VECTOR_STORE=weaviate
|
||||
# Prefix used to create collection name in vector database
|
||||
VECTOR_INDEX_NAME_PREFIX=Vector_index
|
||||
@@ -217,6 +217,20 @@ COUCHBASE_PASSWORD=password
|
||||
COUCHBASE_BUCKET_NAME=Embeddings
|
||||
COUCHBASE_SCOPE_NAME=_default
|
||||
|
||||
# Hologres configuration
|
||||
# access_key_id is used as the PG username, access_key_secret is used as the PG password
|
||||
HOLOGRES_HOST=
|
||||
HOLOGRES_PORT=80
|
||||
HOLOGRES_DATABASE=
|
||||
HOLOGRES_ACCESS_KEY_ID=
|
||||
HOLOGRES_ACCESS_KEY_SECRET=
|
||||
HOLOGRES_SCHEMA=public
|
||||
HOLOGRES_TOKENIZER=jieba
|
||||
HOLOGRES_DISTANCE_METHOD=Cosine
|
||||
HOLOGRES_BASE_QUANTIZATION_TYPE=rabitq
|
||||
HOLOGRES_MAX_DEGREE=64
|
||||
HOLOGRES_EF_CONSTRUCTION=400
|
||||
|
||||
# Milvus configuration
|
||||
MILVUS_URI=http://127.0.0.1:19530
|
||||
MILVUS_TOKEN=
|
||||
|
||||
@@ -43,7 +43,6 @@ forbidden_modules =
|
||||
extensions.ext_redis
|
||||
allow_indirect_imports = True
|
||||
ignore_imports =
|
||||
dify_graph.nodes.agent.agent_node -> extensions.ext_database
|
||||
dify_graph.nodes.llm.node -> extensions.ext_database
|
||||
dify_graph.model_runtime.model_providers.__base.ai_model -> extensions.ext_redis
|
||||
dify_graph.model_runtime.model_providers.model_provider_factory -> extensions.ext_redis
|
||||
@@ -90,9 +89,6 @@ forbidden_modules =
|
||||
core.trigger
|
||||
core.variables
|
||||
ignore_imports =
|
||||
dify_graph.nodes.agent.agent_node -> core.model_manager
|
||||
dify_graph.nodes.agent.agent_node -> core.provider_manager
|
||||
dify_graph.nodes.agent.agent_node -> core.tools.tool_manager
|
||||
dify_graph.nodes.llm.llm_utils -> core.model_manager
|
||||
dify_graph.nodes.llm.protocols -> core.model_manager
|
||||
dify_graph.nodes.llm.llm_utils -> dify_graph.model_runtime.model_providers.__base.large_language_model
|
||||
@@ -100,9 +96,6 @@ ignore_imports =
|
||||
dify_graph.nodes.tool.tool_node -> core.callback_handler.workflow_tool_callback_handler
|
||||
dify_graph.nodes.tool.tool_node -> core.tools.tool_engine
|
||||
dify_graph.nodes.tool.tool_node -> core.tools.tool_manager
|
||||
dify_graph.nodes.agent.agent_node -> core.agent.entities
|
||||
dify_graph.nodes.agent.agent_node -> core.agent.plugin_entities
|
||||
dify_graph.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.app.app_config.entities
|
||||
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.advanced_prompt_transform
|
||||
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.simple_prompt_transform
|
||||
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> dify_graph.model_runtime.model_providers.__base.large_language_model
|
||||
@@ -110,12 +103,10 @@ ignore_imports =
|
||||
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.model_manager
|
||||
dify_graph.nodes.question_classifier.question_classifier_node -> core.model_manager
|
||||
dify_graph.nodes.tool.tool_node -> core.tools.utils.message_transformer
|
||||
dify_graph.nodes.agent.agent_node -> models.model
|
||||
dify_graph.nodes.llm.node -> core.helper.code_executor
|
||||
dify_graph.nodes.llm.node -> core.llm_generator.output_parser.errors
|
||||
dify_graph.nodes.llm.node -> core.llm_generator.output_parser.structured_output
|
||||
dify_graph.nodes.llm.node -> core.model_manager
|
||||
dify_graph.nodes.agent.entities -> core.prompt.entities.advanced_prompt_entities
|
||||
dify_graph.nodes.llm.entities -> core.prompt.entities.advanced_prompt_entities
|
||||
dify_graph.nodes.llm.node -> core.prompt.entities.advanced_prompt_entities
|
||||
dify_graph.nodes.llm.node -> core.prompt.utils.prompt_message_util
|
||||
@@ -124,17 +115,12 @@ ignore_imports =
|
||||
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.utils.prompt_message_util
|
||||
dify_graph.nodes.question_classifier.entities -> core.prompt.entities.advanced_prompt_entities
|
||||
dify_graph.nodes.question_classifier.question_classifier_node -> core.prompt.utils.prompt_message_util
|
||||
dify_graph.nodes.knowledge_index.entities -> core.rag.retrieval.retrieval_methods
|
||||
dify_graph.nodes.llm.node -> models.dataset
|
||||
dify_graph.nodes.agent.agent_node -> core.tools.utils.message_transformer
|
||||
dify_graph.nodes.llm.file_saver -> core.tools.signature
|
||||
dify_graph.nodes.llm.file_saver -> core.tools.tool_file_manager
|
||||
dify_graph.nodes.tool.tool_node -> core.tools.errors
|
||||
dify_graph.nodes.agent.agent_node -> extensions.ext_database
|
||||
dify_graph.nodes.llm.node -> extensions.ext_database
|
||||
dify_graph.nodes.agent.agent_node -> models
|
||||
dify_graph.nodes.llm.node -> models.model
|
||||
dify_graph.nodes.agent.agent_node -> services
|
||||
dify_graph.nodes.tool.tool_node -> services
|
||||
dify_graph.model_runtime.model_providers.__base.ai_model -> configs
|
||||
dify_graph.model_runtime.model_providers.__base.ai_model -> extensions.ext_redis
|
||||
|
||||
@@ -160,6 +160,7 @@ def migrate_knowledge_vector_database():
|
||||
}
|
||||
lower_collection_vector_types = {
|
||||
VectorType.ANALYTICDB,
|
||||
VectorType.HOLOGRES,
|
||||
VectorType.CHROMA,
|
||||
VectorType.MYSCALE,
|
||||
VectorType.PGVECTO_RS,
|
||||
|
||||
@@ -26,6 +26,7 @@ from .vdb.chroma_config import ChromaConfig
|
||||
from .vdb.clickzetta_config import ClickzettaConfig
|
||||
from .vdb.couchbase_config import CouchbaseConfig
|
||||
from .vdb.elasticsearch_config import ElasticsearchConfig
|
||||
from .vdb.hologres_config import HologresConfig
|
||||
from .vdb.huawei_cloud_config import HuaweiCloudConfig
|
||||
from .vdb.iris_config import IrisVectorConfig
|
||||
from .vdb.lindorm_config import LindormConfig
|
||||
@@ -347,6 +348,7 @@ class MiddlewareConfig(
|
||||
AnalyticdbConfig,
|
||||
ChromaConfig,
|
||||
ClickzettaConfig,
|
||||
HologresConfig,
|
||||
HuaweiCloudConfig,
|
||||
IrisVectorConfig,
|
||||
MilvusConfig,
|
||||
|
||||
68
api/configs/middleware/vdb/hologres_config.py
Normal file
68
api/configs/middleware/vdb/hologres_config.py
Normal file
@@ -0,0 +1,68 @@
|
||||
from holo_search_sdk.types import BaseQuantizationType, DistanceType, TokenizerType
|
||||
from pydantic import Field
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
class HologresConfig(BaseSettings):
|
||||
"""
|
||||
Configuration settings for Hologres vector database.
|
||||
|
||||
Hologres is compatible with PostgreSQL protocol.
|
||||
access_key_id is used as the PostgreSQL username,
|
||||
and access_key_secret is used as the PostgreSQL password.
|
||||
"""
|
||||
|
||||
HOLOGRES_HOST: str | None = Field(
|
||||
description="Hostname or IP address of the Hologres instance.",
|
||||
default=None,
|
||||
)
|
||||
|
||||
HOLOGRES_PORT: int = Field(
|
||||
description="Port number for connecting to the Hologres instance.",
|
||||
default=80,
|
||||
)
|
||||
|
||||
HOLOGRES_DATABASE: str | None = Field(
|
||||
description="Name of the Hologres database to connect to.",
|
||||
default=None,
|
||||
)
|
||||
|
||||
HOLOGRES_ACCESS_KEY_ID: str | None = Field(
|
||||
description="Alibaba Cloud AccessKey ID, also used as the PostgreSQL username.",
|
||||
default=None,
|
||||
)
|
||||
|
||||
HOLOGRES_ACCESS_KEY_SECRET: str | None = Field(
|
||||
description="Alibaba Cloud AccessKey Secret, also used as the PostgreSQL password.",
|
||||
default=None,
|
||||
)
|
||||
|
||||
HOLOGRES_SCHEMA: str = Field(
|
||||
description="Schema name in the Hologres database.",
|
||||
default="public",
|
||||
)
|
||||
|
||||
HOLOGRES_TOKENIZER: TokenizerType = Field(
|
||||
description="Tokenizer for full-text search index (e.g., 'jieba', 'ik', 'standard', 'simple').",
|
||||
default="jieba",
|
||||
)
|
||||
|
||||
HOLOGRES_DISTANCE_METHOD: DistanceType = Field(
|
||||
description="Distance method for vector index (e.g., 'Cosine', 'Euclidean', 'InnerProduct').",
|
||||
default="Cosine",
|
||||
)
|
||||
|
||||
HOLOGRES_BASE_QUANTIZATION_TYPE: BaseQuantizationType = Field(
|
||||
description="Base quantization type for vector index (e.g., 'rabitq', 'sq8', 'fp16', 'fp32').",
|
||||
default="rabitq",
|
||||
)
|
||||
|
||||
HOLOGRES_MAX_DEGREE: int = Field(
|
||||
description="Max degree (M) parameter for HNSW vector index.",
|
||||
default=64,
|
||||
)
|
||||
|
||||
HOLOGRES_EF_CONSTRUCTION: int = Field(
|
||||
description="ef_construction parameter for HNSW vector index.",
|
||||
default=400,
|
||||
)
|
||||
@@ -25,7 +25,8 @@ from controllers.console.wraps import (
|
||||
)
|
||||
from core.ops.ops_trace_manager import OpsTraceManager
|
||||
from core.rag.retrieval.retrieval_methods import RetrievalMethod
|
||||
from dify_graph.enums import NodeType, WorkflowExecutionStatus
|
||||
from core.trigger.constants import TRIGGER_NODE_TYPES
|
||||
from dify_graph.enums import WorkflowExecutionStatus
|
||||
from dify_graph.file import helpers as file_helpers
|
||||
from extensions.ext_database import db
|
||||
from libs.login import current_account_with_tenant, login_required
|
||||
@@ -508,11 +509,7 @@ class AppListApi(Resource):
|
||||
.scalars()
|
||||
.all()
|
||||
)
|
||||
trigger_node_types = {
|
||||
NodeType.TRIGGER_WEBHOOK,
|
||||
NodeType.TRIGGER_SCHEDULE,
|
||||
NodeType.TRIGGER_PLUGIN,
|
||||
}
|
||||
trigger_node_types = TRIGGER_NODE_TYPES
|
||||
for workflow in draft_workflows:
|
||||
node_id = None
|
||||
try:
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import json
|
||||
from enum import StrEnum
|
||||
|
||||
from flask_restx import Resource, marshal_with
|
||||
from pydantic import BaseModel, Field
|
||||
@@ -11,6 +10,7 @@ from controllers.console.wraps import account_initialization_required, edit_perm
|
||||
from extensions.ext_database import db
|
||||
from fields.app_fields import app_server_fields
|
||||
from libs.login import current_account_with_tenant, login_required
|
||||
from models.enums import AppMCPServerStatus
|
||||
from models.model import AppMCPServer
|
||||
|
||||
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
|
||||
@@ -19,11 +19,6 @@ DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
|
||||
app_server_model = console_ns.model("AppServer", app_server_fields)
|
||||
|
||||
|
||||
class AppMCPServerStatus(StrEnum):
|
||||
ACTIVE = "active"
|
||||
INACTIVE = "inactive"
|
||||
|
||||
|
||||
class MCPServerCreatePayload(BaseModel):
|
||||
description: str | None = Field(default=None, description="Server description")
|
||||
parameters: dict = Field(..., description="Server parameters configuration")
|
||||
@@ -117,9 +112,10 @@ class AppMCPServerController(Resource):
|
||||
|
||||
server.parameters = json.dumps(payload.parameters, ensure_ascii=False)
|
||||
if payload.status:
|
||||
if payload.status not in [status.value for status in AppMCPServerStatus]:
|
||||
try:
|
||||
server.status = AppMCPServerStatus(payload.status)
|
||||
except ValueError:
|
||||
raise ValueError("Invalid status")
|
||||
server.status = payload.status
|
||||
db.session.commit()
|
||||
return server
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.helper.trace_id_helper import get_external_trace_id
|
||||
from core.plugin.impl.exc import PluginInvokeError
|
||||
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
|
||||
from core.trigger.debug.event_selectors import (
|
||||
TriggerDebugEvent,
|
||||
TriggerDebugEventPoller,
|
||||
@@ -1209,7 +1210,7 @@ class DraftWorkflowTriggerNodeApi(Resource):
|
||||
node_type: NodeType = draft_workflow.get_node_type_from_node_config(node_config)
|
||||
event: TriggerDebugEvent | None = None
|
||||
# for schedule trigger, when run single node, just execute directly
|
||||
if node_type == NodeType.TRIGGER_SCHEDULE:
|
||||
if node_type == TRIGGER_SCHEDULE_NODE_TYPE:
|
||||
event = TriggerDebugEvent(
|
||||
workflow_args={},
|
||||
node_id=node_id,
|
||||
|
||||
@@ -263,6 +263,7 @@ def _get_retrieval_methods_by_vector_type(vector_type: str | None, is_mock: bool
|
||||
VectorType.BAIDU,
|
||||
VectorType.ALIBABACLOUD_MYSQL,
|
||||
VectorType.IRIS,
|
||||
VectorType.HOLOGRES,
|
||||
}
|
||||
|
||||
semantic_methods = {"retrieval_method": [RetrievalMethod.SEMANTIC_SEARCH.value]}
|
||||
|
||||
@@ -43,6 +43,7 @@ from libs.datetime_utils import naive_utc_now
|
||||
from libs.helper import EmailStr, TimestampField, extract_remote_ip, timezone
|
||||
from libs.login import current_account_with_tenant, login_required
|
||||
from models import AccountIntegrate, InvitationCode
|
||||
from models.account import AccountStatus, InvitationCodeStatus
|
||||
from services.account_service import AccountService
|
||||
from services.billing_service import BillingService
|
||||
from services.errors.account import CurrentPasswordIncorrectError as ServiceCurrentPasswordIncorrectError
|
||||
@@ -215,7 +216,7 @@ class AccountInitApi(Resource):
|
||||
db.session.query(InvitationCode)
|
||||
.where(
|
||||
InvitationCode.code == args.invitation_code,
|
||||
InvitationCode.status == "unused",
|
||||
InvitationCode.status == InvitationCodeStatus.UNUSED,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
@@ -223,7 +224,7 @@ class AccountInitApi(Resource):
|
||||
if not invitation_code:
|
||||
raise InvalidInvitationCodeError()
|
||||
|
||||
invitation_code.status = "used"
|
||||
invitation_code.status = InvitationCodeStatus.USED
|
||||
invitation_code.used_at = naive_utc_now()
|
||||
invitation_code.used_by_tenant_id = account.current_tenant_id
|
||||
invitation_code.used_by_account_id = account.id
|
||||
@@ -231,7 +232,7 @@ class AccountInitApi(Resource):
|
||||
account.interface_language = args.interface_language
|
||||
account.timezone = args.timezone
|
||||
account.interface_theme = "light"
|
||||
account.status = "active"
|
||||
account.status = AccountStatus.ACTIVE
|
||||
account.initialized_at = naive_utc_now()
|
||||
db.session.commit()
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ from typing import Any, Literal
|
||||
from flask import request, send_file
|
||||
from flask_restx import Resource
|
||||
from pydantic import BaseModel, Field
|
||||
from werkzeug.datastructures import FileStorage
|
||||
from werkzeug.exceptions import Forbidden
|
||||
|
||||
from configs import dify_config
|
||||
@@ -169,6 +170,20 @@ register_enum_models(
|
||||
)
|
||||
|
||||
|
||||
def _read_upload_content(file: FileStorage, max_size: int) -> bytes:
|
||||
"""
|
||||
Read the uploaded file and validate its actual size before delegating to the plugin service.
|
||||
|
||||
FileStorage.content_length is not reliable for multipart test uploads and may be zero even when
|
||||
content exists, so the controllers validate against the loaded bytes instead.
|
||||
"""
|
||||
content = file.read()
|
||||
if len(content) > max_size:
|
||||
raise ValueError("File size exceeds the maximum allowed size")
|
||||
|
||||
return content
|
||||
|
||||
|
||||
@console_ns.route("/workspaces/current/plugin/debugging-key")
|
||||
class PluginDebuggingKeyApi(Resource):
|
||||
@setup_required
|
||||
@@ -284,12 +299,7 @@ class PluginUploadFromPkgApi(Resource):
|
||||
_, tenant_id = current_account_with_tenant()
|
||||
|
||||
file = request.files["pkg"]
|
||||
|
||||
# check file size
|
||||
if file.content_length > dify_config.PLUGIN_MAX_PACKAGE_SIZE:
|
||||
raise ValueError("File size exceeds the maximum allowed size")
|
||||
|
||||
content = file.read()
|
||||
content = _read_upload_content(file, dify_config.PLUGIN_MAX_PACKAGE_SIZE)
|
||||
try:
|
||||
response = PluginService.upload_pkg(tenant_id, content)
|
||||
except PluginDaemonClientSideError as e:
|
||||
@@ -328,12 +338,7 @@ class PluginUploadFromBundleApi(Resource):
|
||||
_, tenant_id = current_account_with_tenant()
|
||||
|
||||
file = request.files["bundle"]
|
||||
|
||||
# check file size
|
||||
if file.content_length > dify_config.PLUGIN_MAX_BUNDLE_SIZE:
|
||||
raise ValueError("File size exceeds the maximum allowed size")
|
||||
|
||||
content = file.read()
|
||||
content = _read_upload_content(file, dify_config.PLUGIN_MAX_BUNDLE_SIZE)
|
||||
try:
|
||||
response = PluginService.upload_bundle(tenant_id, content)
|
||||
except PluginDaemonClientSideError as e:
|
||||
|
||||
@@ -6,13 +6,13 @@ from pydantic import BaseModel, Field, ValidationError
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from controllers.common.schema import register_schema_model
|
||||
from controllers.console.app.mcp_server import AppMCPServerStatus
|
||||
from controllers.mcp import mcp_ns
|
||||
from core.mcp import types as mcp_types
|
||||
from core.mcp.server.streamable_http import handle_mcp_request
|
||||
from dify_graph.variables.input_entities import VariableEntity
|
||||
from extensions.ext_database import db
|
||||
from libs import helper
|
||||
from models.enums import AppMCPServerStatus
|
||||
from models.model import App, AppMCPServer, AppMode, EndUser
|
||||
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import Any
|
||||
|
||||
from core.agent.base_agent_runner import BaseAgentRunner
|
||||
from core.agent.entities import AgentScratchpadUnit
|
||||
from core.agent.errors import AgentMaxIterationError
|
||||
from core.agent.output_parser.cot_output_parser import CotAgentOutputParser
|
||||
from core.app.apps.base_app_queue_manager import PublishFrom
|
||||
from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessageEndEvent, QueueMessageFileEvent
|
||||
@@ -22,7 +23,6 @@ from dify_graph.model_runtime.entities.message_entities import (
|
||||
ToolPromptMessage,
|
||||
UserPromptMessage,
|
||||
)
|
||||
from dify_graph.nodes.agent.exc import AgentMaxIterationError
|
||||
from models.model import Message
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
9
api/core/agent/errors.py
Normal file
9
api/core/agent/errors.py
Normal file
@@ -0,0 +1,9 @@
|
||||
class AgentMaxIterationError(Exception):
|
||||
"""Raised when an agent runner exceeds the configured max iteration count."""
|
||||
|
||||
def __init__(self, max_iteration: int):
|
||||
self.max_iteration = max_iteration
|
||||
super().__init__(
|
||||
f"Agent exceeded the maximum iteration limit of {max_iteration}. "
|
||||
f"The agent was unable to complete the task within the allowed number of iterations."
|
||||
)
|
||||
@@ -5,6 +5,7 @@ from copy import deepcopy
|
||||
from typing import Any, Union
|
||||
|
||||
from core.agent.base_agent_runner import BaseAgentRunner
|
||||
from core.agent.errors import AgentMaxIterationError
|
||||
from core.app.apps.base_app_queue_manager import PublishFrom
|
||||
from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessageEndEvent, QueueMessageFileEvent
|
||||
from core.prompt.agent_history_prompt_transform import AgentHistoryPromptTransform
|
||||
@@ -25,7 +26,6 @@ from dify_graph.model_runtime.entities import (
|
||||
UserPromptMessage,
|
||||
)
|
||||
from dify_graph.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
|
||||
from dify_graph.nodes.agent.exc import AgentMaxIterationError
|
||||
from models.model import Message
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -69,7 +69,7 @@ from dify_graph.entities.pause_reason import HumanInputRequired
|
||||
from dify_graph.enums import WorkflowExecutionStatus
|
||||
from dify_graph.model_runtime.entities.llm_entities import LLMUsage
|
||||
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
|
||||
from dify_graph.nodes import NodeType
|
||||
from dify_graph.nodes import BuiltinNodeTypes
|
||||
from dify_graph.repositories.draft_variable_repository import DraftVariableSaverFactory
|
||||
from dify_graph.runtime import GraphRuntimeState
|
||||
from dify_graph.system_variable import SystemVariable
|
||||
@@ -357,7 +357,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
||||
) -> Generator[StreamResponse, None, None]:
|
||||
"""Handle node succeeded events."""
|
||||
# Record files if it's an answer node or end node
|
||||
if event.node_type in [NodeType.ANSWER, NodeType.END, NodeType.LLM]:
|
||||
if event.node_type in [BuiltinNodeTypes.ANSWER, BuiltinNodeTypes.END, BuiltinNodeTypes.LLM]:
|
||||
self._recorded_files.extend(
|
||||
self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {})
|
||||
)
|
||||
|
||||
@@ -48,12 +48,13 @@ from core.app.entities.task_entities import (
|
||||
from core.plugin.impl.datasource import PluginDatasourceManager
|
||||
from core.tools.entities.tool_entities import ToolProviderType
|
||||
from core.tools.tool_manager import ToolManager
|
||||
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
|
||||
from core.trigger.trigger_manager import TriggerManager
|
||||
from core.workflow.workflow_entry import WorkflowEntry
|
||||
from dify_graph.entities.pause_reason import HumanInputRequired
|
||||
from dify_graph.entities.workflow_start_reason import WorkflowStartReason
|
||||
from dify_graph.enums import (
|
||||
NodeType,
|
||||
BuiltinNodeTypes,
|
||||
SystemVariableKey,
|
||||
WorkflowExecutionStatus,
|
||||
WorkflowNodeExecutionMetadataKey,
|
||||
@@ -442,7 +443,7 @@ class WorkflowResponseConverter:
|
||||
event: QueueNodeStartedEvent,
|
||||
task_id: str,
|
||||
) -> NodeStartStreamResponse | None:
|
||||
if event.node_type in {NodeType.ITERATION, NodeType.LOOP}:
|
||||
if event.node_type in {BuiltinNodeTypes.ITERATION, BuiltinNodeTypes.LOOP}:
|
||||
return None
|
||||
run_id = self._ensure_workflow_run_id()
|
||||
snapshot = self._store_snapshot(event)
|
||||
@@ -464,13 +465,13 @@ class WorkflowResponseConverter:
|
||||
)
|
||||
|
||||
try:
|
||||
if event.node_type == NodeType.TOOL:
|
||||
if event.node_type == BuiltinNodeTypes.TOOL:
|
||||
response.data.extras["icon"] = ToolManager.get_tool_icon(
|
||||
tenant_id=self._application_generate_entity.app_config.tenant_id,
|
||||
provider_type=ToolProviderType(event.provider_type),
|
||||
provider_id=event.provider_id,
|
||||
)
|
||||
elif event.node_type == NodeType.DATASOURCE:
|
||||
elif event.node_type == BuiltinNodeTypes.DATASOURCE:
|
||||
manager = PluginDatasourceManager()
|
||||
provider_entity = manager.fetch_datasource_provider(
|
||||
self._application_generate_entity.app_config.tenant_id,
|
||||
@@ -479,7 +480,7 @@ class WorkflowResponseConverter:
|
||||
response.data.extras["icon"] = provider_entity.declaration.identity.generate_datasource_icon_url(
|
||||
self._application_generate_entity.app_config.tenant_id
|
||||
)
|
||||
elif event.node_type == NodeType.TRIGGER_PLUGIN:
|
||||
elif event.node_type == TRIGGER_PLUGIN_NODE_TYPE:
|
||||
response.data.extras["icon"] = TriggerManager.get_trigger_plugin_icon(
|
||||
self._application_generate_entity.app_config.tenant_id,
|
||||
event.provider_id,
|
||||
@@ -496,7 +497,7 @@ class WorkflowResponseConverter:
|
||||
event: QueueNodeSucceededEvent | QueueNodeFailedEvent | QueueNodeExceptionEvent,
|
||||
task_id: str,
|
||||
) -> NodeFinishStreamResponse | None:
|
||||
if event.node_type in {NodeType.ITERATION, NodeType.LOOP}:
|
||||
if event.node_type in {BuiltinNodeTypes.ITERATION, BuiltinNodeTypes.LOOP}:
|
||||
return None
|
||||
run_id = self._ensure_workflow_run_id()
|
||||
snapshot = self._pop_snapshot(event.node_execution_id)
|
||||
@@ -554,7 +555,7 @@ class WorkflowResponseConverter:
|
||||
event: QueueNodeRetryEvent,
|
||||
task_id: str,
|
||||
) -> NodeRetryStreamResponse | None:
|
||||
if event.node_type in {NodeType.ITERATION, NodeType.LOOP}:
|
||||
if event.node_type in {BuiltinNodeTypes.ITERATION, BuiltinNodeTypes.LOOP}:
|
||||
return None
|
||||
run_id = self._ensure_workflow_run_id()
|
||||
|
||||
@@ -612,7 +613,7 @@ class WorkflowResponseConverter:
|
||||
data=IterationNodeStartStreamResponse.Data(
|
||||
id=event.node_id,
|
||||
node_id=event.node_id,
|
||||
node_type=event.node_type.value,
|
||||
node_type=event.node_type,
|
||||
title=event.node_title,
|
||||
created_at=int(time.time()),
|
||||
extras={},
|
||||
@@ -635,7 +636,7 @@ class WorkflowResponseConverter:
|
||||
data=IterationNodeNextStreamResponse.Data(
|
||||
id=event.node_id,
|
||||
node_id=event.node_id,
|
||||
node_type=event.node_type.value,
|
||||
node_type=event.node_type,
|
||||
title=event.node_title,
|
||||
index=event.index,
|
||||
created_at=int(time.time()),
|
||||
@@ -662,7 +663,7 @@ class WorkflowResponseConverter:
|
||||
data=IterationNodeCompletedStreamResponse.Data(
|
||||
id=event.node_id,
|
||||
node_id=event.node_id,
|
||||
node_type=event.node_type.value,
|
||||
node_type=event.node_type,
|
||||
title=event.node_title,
|
||||
outputs=new_outputs,
|
||||
outputs_truncated=outputs_truncated,
|
||||
@@ -692,7 +693,7 @@ class WorkflowResponseConverter:
|
||||
data=LoopNodeStartStreamResponse.Data(
|
||||
id=event.node_id,
|
||||
node_id=event.node_id,
|
||||
node_type=event.node_type.value,
|
||||
node_type=event.node_type,
|
||||
title=event.node_title,
|
||||
created_at=int(time.time()),
|
||||
extras={},
|
||||
@@ -715,7 +716,7 @@ class WorkflowResponseConverter:
|
||||
data=LoopNodeNextStreamResponse.Data(
|
||||
id=event.node_id,
|
||||
node_id=event.node_id,
|
||||
node_type=event.node_type.value,
|
||||
node_type=event.node_type,
|
||||
title=event.node_title,
|
||||
index=event.index,
|
||||
# The `pre_loop_output` field is not utilized by the frontend.
|
||||
@@ -744,7 +745,7 @@ class WorkflowResponseConverter:
|
||||
data=LoopNodeCompletedStreamResponse.Data(
|
||||
id=event.node_id,
|
||||
node_id=event.node_id,
|
||||
node_type=event.node_type.value,
|
||||
node_type=event.node_type,
|
||||
title=event.node_title,
|
||||
outputs=new_outputs,
|
||||
outputs_truncated=outputs_truncated,
|
||||
|
||||
@@ -12,7 +12,7 @@ from core.app.entities.app_invoke_entities import (
|
||||
build_dify_run_context,
|
||||
)
|
||||
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
|
||||
from core.workflow.node_factory import DifyNodeFactory
|
||||
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id
|
||||
from core.workflow.workflow_entry import WorkflowEntry
|
||||
from dify_graph.entities.graph_init_params import GraphInitParams
|
||||
from dify_graph.enums import WorkflowType
|
||||
@@ -274,6 +274,8 @@ class PipelineRunner(WorkflowBasedAppRunner):
|
||||
graph_init_params=graph_init_params,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
)
|
||||
if start_node_id is None:
|
||||
start_node_id = get_default_root_node_id(graph_config)
|
||||
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=start_node_id)
|
||||
|
||||
if not graph:
|
||||
|
||||
@@ -3,7 +3,10 @@ import time
|
||||
from collections.abc import Mapping, Sequence
|
||||
from typing import Any, cast
|
||||
|
||||
from pydantic import ValidationError
|
||||
|
||||
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
|
||||
from core.app.entities.agent_strategy import AgentStrategyInfo
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_dify_run_context
|
||||
from core.app.entities.queue_entities import (
|
||||
AppQueueEvent,
|
||||
@@ -29,7 +32,8 @@ from core.app.entities.queue_entities import (
|
||||
QueueWorkflowStartedEvent,
|
||||
QueueWorkflowSucceededEvent,
|
||||
)
|
||||
from core.workflow.node_factory import DifyNodeFactory
|
||||
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
|
||||
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id, resolve_workflow_node_class
|
||||
from core.workflow.workflow_entry import WorkflowEntry
|
||||
from dify_graph.entities import GraphInitParams
|
||||
from dify_graph.entities.graph_config import NodeConfigDictAdapter
|
||||
@@ -63,7 +67,6 @@ from dify_graph.graph_events import (
|
||||
NodeRunSucceededEvent,
|
||||
)
|
||||
from dify_graph.graph_events.graph import GraphRunAbortedEvent
|
||||
from dify_graph.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING
|
||||
from dify_graph.runtime import GraphRuntimeState, VariablePool
|
||||
from dify_graph.system_variable import SystemVariable
|
||||
from dify_graph.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool
|
||||
@@ -137,6 +140,9 @@ class WorkflowBasedAppRunner:
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
)
|
||||
|
||||
if root_node_id is None:
|
||||
root_node_id = get_default_root_node_id(graph_config)
|
||||
|
||||
# init graph
|
||||
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=root_node_id)
|
||||
|
||||
@@ -308,7 +314,7 @@ class WorkflowBasedAppRunner:
|
||||
# Get node class
|
||||
node_type = target_node_config["data"].type
|
||||
node_version = str(target_node_config["data"].version)
|
||||
node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version]
|
||||
node_cls = resolve_workflow_node_class(node_type=node_type, node_version=node_version)
|
||||
|
||||
# Use the variable pool from graph_runtime_state instead of creating a new one
|
||||
variable_pool = graph_runtime_state.variable_pool
|
||||
@@ -336,6 +342,18 @@ class WorkflowBasedAppRunner:
|
||||
|
||||
return graph, variable_pool
|
||||
|
||||
@staticmethod
|
||||
def _build_agent_strategy_info(event: NodeRunStartedEvent) -> AgentStrategyInfo | None:
|
||||
raw_agent_strategy = event.extras.get("agent_strategy")
|
||||
if raw_agent_strategy is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
return AgentStrategyInfo.model_validate(raw_agent_strategy)
|
||||
except ValidationError:
|
||||
logger.warning("Invalid agent strategy payload for node %s", event.node_id, exc_info=True)
|
||||
return None
|
||||
|
||||
def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent):
|
||||
"""
|
||||
Handle event
|
||||
@@ -421,7 +439,7 @@ class WorkflowBasedAppRunner:
|
||||
start_at=event.start_at,
|
||||
in_iteration_id=event.in_iteration_id,
|
||||
in_loop_id=event.in_loop_id,
|
||||
agent_strategy=event.agent_strategy,
|
||||
agent_strategy=self._build_agent_strategy_info(event),
|
||||
provider_type=event.provider_type,
|
||||
provider_id=event.provider_id,
|
||||
)
|
||||
@@ -490,7 +508,9 @@ class WorkflowBasedAppRunner:
|
||||
elif isinstance(event, NodeRunRetrieverResourceEvent):
|
||||
self._publish_event(
|
||||
QueueRetrieverResourcesEvent(
|
||||
retriever_resources=event.retriever_resources,
|
||||
retriever_resources=[
|
||||
RetrievalSourceMetadata.model_validate(resource) for resource in event.retriever_resources
|
||||
],
|
||||
in_iteration_id=event.in_iteration_id,
|
||||
in_loop_id=event.in_loop_id,
|
||||
)
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
from .agent_strategy import AgentStrategyInfo
|
||||
|
||||
__all__ = ["AgentStrategyInfo"]
|
||||
|
||||
8
api/core/app/entities/agent_strategy.py
Normal file
8
api/core/app/entities/agent_strategy.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
|
||||
class AgentStrategyInfo(BaseModel):
|
||||
name: str
|
||||
icon: str | None = None
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
@@ -5,13 +5,12 @@ from typing import Any
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from core.app.entities.agent_strategy import AgentStrategyInfo
|
||||
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
|
||||
from dify_graph.entities import AgentNodeStrategyInit
|
||||
from dify_graph.entities.pause_reason import PauseReason
|
||||
from dify_graph.entities.workflow_start_reason import WorkflowStartReason
|
||||
from dify_graph.enums import WorkflowNodeExecutionMetadataKey
|
||||
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
|
||||
from dify_graph.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
|
||||
from dify_graph.nodes import NodeType
|
||||
|
||||
|
||||
class QueueEvent(StrEnum):
|
||||
@@ -314,7 +313,7 @@ class QueueNodeStartedEvent(AppQueueEvent):
|
||||
in_iteration_id: str | None = None
|
||||
in_loop_id: str | None = None
|
||||
start_at: datetime
|
||||
agent_strategy: AgentNodeStrategyInit | None = None
|
||||
agent_strategy: AgentStrategyInfo | None = None
|
||||
|
||||
# FIXME(-LAN-): only for ToolNode, need to refactor
|
||||
provider_type: str # should be a core.tools.entities.tool_entities.ToolProviderType
|
||||
|
||||
@@ -4,8 +4,8 @@ from typing import Any
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from core.app.entities.agent_strategy import AgentStrategyInfo
|
||||
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
|
||||
from dify_graph.entities import AgentNodeStrategyInit
|
||||
from dify_graph.entities.workflow_start_reason import WorkflowStartReason
|
||||
from dify_graph.enums import WorkflowExecutionStatus, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
|
||||
from dify_graph.model_runtime.entities.llm_entities import LLMResult, LLMUsage
|
||||
@@ -349,7 +349,7 @@ class NodeStartStreamResponse(StreamResponse):
|
||||
extras: dict[str, object] = Field(default_factory=dict)
|
||||
iteration_id: str | None = None
|
||||
loop_id: str | None = None
|
||||
agent_strategy: AgentNodeStrategyInit | None = None
|
||||
agent_strategy: AgentStrategyInfo | None = None
|
||||
|
||||
event: StreamEvent = StreamEvent.NODE_STARTED
|
||||
workflow_run_id: str
|
||||
|
||||
@@ -2,7 +2,7 @@ import logging
|
||||
|
||||
from dify_graph.constants import CONVERSATION_VARIABLE_NODE_ID
|
||||
from dify_graph.conversation_variable_updater import ConversationVariableUpdater
|
||||
from dify_graph.enums import NodeType
|
||||
from dify_graph.enums import BuiltinNodeTypes
|
||||
from dify_graph.graph_engine.layers.base import GraphEngineLayer
|
||||
from dify_graph.graph_events import GraphEngineEvent, NodeRunSucceededEvent
|
||||
from dify_graph.nodes.variable_assigner.common import helpers as common_helpers
|
||||
@@ -22,7 +22,7 @@ class ConversationVariablePersistenceLayer(GraphEngineLayer):
|
||||
def on_event(self, event: GraphEngineEvent) -> None:
|
||||
if not isinstance(event, NodeRunSucceededEvent):
|
||||
return
|
||||
if event.node_type != NodeType.VARIABLE_ASSIGNER:
|
||||
if event.node_type != BuiltinNodeTypes.VARIABLE_ASSIGNER:
|
||||
return
|
||||
if self.graph_runtime_state is None:
|
||||
return
|
||||
|
||||
@@ -12,7 +12,7 @@ from typing_extensions import override
|
||||
from core.app.llm import deduct_llm_quota, ensure_llm_quota_available
|
||||
from core.errors.error import QuotaExceededError
|
||||
from core.model_manager import ModelInstance
|
||||
from dify_graph.enums import NodeType
|
||||
from dify_graph.enums import BuiltinNodeTypes
|
||||
from dify_graph.graph_engine.entities.commands import AbortCommand, CommandType
|
||||
from dify_graph.graph_engine.layers.base import GraphEngineLayer
|
||||
from dify_graph.graph_events import GraphEngineEvent, GraphNodeEventBase
|
||||
@@ -113,11 +113,11 @@ class LLMQuotaLayer(GraphEngineLayer):
|
||||
def _extract_model_instance(node: Node) -> ModelInstance | None:
|
||||
try:
|
||||
match node.node_type:
|
||||
case NodeType.LLM:
|
||||
case BuiltinNodeTypes.LLM:
|
||||
return cast("LLMNode", node).model_instance
|
||||
case NodeType.PARAMETER_EXTRACTOR:
|
||||
case BuiltinNodeTypes.PARAMETER_EXTRACTOR:
|
||||
return cast("ParameterExtractorNode", node).model_instance
|
||||
case NodeType.QUESTION_CLASSIFIER:
|
||||
case BuiltinNodeTypes.QUESTION_CLASSIFIER:
|
||||
return cast("QuestionClassifierNode", node).model_instance
|
||||
case _:
|
||||
return None
|
||||
|
||||
@@ -16,7 +16,7 @@ from opentelemetry.trace import Span, SpanKind, Tracer, get_tracer, set_span_in_
|
||||
from typing_extensions import override
|
||||
|
||||
from configs import dify_config
|
||||
from dify_graph.enums import NodeType
|
||||
from dify_graph.enums import BuiltinNodeTypes, NodeType
|
||||
from dify_graph.graph_engine.layers.base import GraphEngineLayer
|
||||
from dify_graph.graph_events import GraphNodeEventBase
|
||||
from dify_graph.nodes.base.node import Node
|
||||
@@ -74,16 +74,13 @@ class ObservabilityLayer(GraphEngineLayer):
|
||||
def _build_parser_registry(self) -> None:
|
||||
"""Initialize parser registry for node types."""
|
||||
self._parsers = {
|
||||
NodeType.TOOL: ToolNodeOTelParser(),
|
||||
NodeType.LLM: LLMNodeOTelParser(),
|
||||
NodeType.KNOWLEDGE_RETRIEVAL: RetrievalNodeOTelParser(),
|
||||
BuiltinNodeTypes.TOOL: ToolNodeOTelParser(),
|
||||
BuiltinNodeTypes.LLM: LLMNodeOTelParser(),
|
||||
BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL: RetrievalNodeOTelParser(),
|
||||
}
|
||||
|
||||
def _get_parser(self, node: Node) -> NodeOTelParser:
|
||||
node_type = getattr(node, "node_type", None)
|
||||
if isinstance(node_type, NodeType):
|
||||
return self._parsers.get(node_type, self._default_parser)
|
||||
return self._default_parser
|
||||
return self._parsers.get(node.node_type, self._default_parser)
|
||||
|
||||
@override
|
||||
def on_graph_start(self) -> None:
|
||||
|
||||
@@ -12,6 +12,7 @@ from core.rag.models.document import Document
|
||||
from extensions.ext_database import db
|
||||
from models.dataset import ChildChunk, DatasetQuery, DocumentSegment
|
||||
from models.dataset import Document as DatasetDocument
|
||||
from models.enums import CreatorUserRole
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -38,7 +39,9 @@ class DatasetIndexToolCallbackHandler:
|
||||
source="app",
|
||||
source_app_id=self._app_id,
|
||||
created_by_role=(
|
||||
"account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end_user"
|
||||
CreatorUserRole.ACCOUNT
|
||||
if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER}
|
||||
else CreatorUserRole.END_USER
|
||||
),
|
||||
created_by=self._user_id,
|
||||
)
|
||||
|
||||
@@ -24,12 +24,12 @@ from core.datasource.utils.message_transformer import DatasourceFileMessageTrans
|
||||
from core.datasource.website_crawl.website_crawl_provider import WebsiteCrawlDatasourcePluginProviderController
|
||||
from core.db.session_factory import session_factory
|
||||
from core.plugin.impl.datasource import PluginDatasourceManager
|
||||
from core.workflow.nodes.datasource.entities import DatasourceParameter, OnlineDriveDownloadFileParam
|
||||
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
|
||||
from dify_graph.enums import WorkflowNodeExecutionMetadataKey
|
||||
from dify_graph.file import File
|
||||
from dify_graph.file.enums import FileTransferMethod, FileType
|
||||
from dify_graph.node_events import NodeRunResult, StreamChunkEvent, StreamCompletedEvent
|
||||
from dify_graph.repositories.datasource_manager_protocol import DatasourceParameter, OnlineDriveDownloadFileParam
|
||||
from factories import file_factory
|
||||
from models.model import UploadFile
|
||||
from models.tools import ToolFile
|
||||
|
||||
@@ -58,7 +58,7 @@ from core.ops.entities.trace_entity import (
|
||||
)
|
||||
from core.repositories import DifyCoreRepositoryFactory
|
||||
from dify_graph.entities import WorkflowNodeExecution
|
||||
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
|
||||
from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
|
||||
from extensions.ext_database import db
|
||||
from models import WorkflowNodeExecutionTriggeredFrom
|
||||
|
||||
@@ -302,11 +302,11 @@ class AliyunDataTrace(BaseTraceInstance):
|
||||
self, node_execution: WorkflowNodeExecution, trace_info: WorkflowTraceInfo, trace_metadata: TraceMetadata
|
||||
):
|
||||
try:
|
||||
if node_execution.node_type == NodeType.LLM:
|
||||
if node_execution.node_type == BuiltinNodeTypes.LLM:
|
||||
node_span = self.build_workflow_llm_span(trace_info, node_execution, trace_metadata)
|
||||
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
|
||||
elif node_execution.node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
|
||||
node_span = self.build_workflow_retrieval_span(trace_info, node_execution, trace_metadata)
|
||||
elif node_execution.node_type == NodeType.TOOL:
|
||||
elif node_execution.node_type == BuiltinNodeTypes.TOOL:
|
||||
node_span = self.build_workflow_tool_span(trace_info, node_execution, trace_metadata)
|
||||
else:
|
||||
node_span = self.build_workflow_task_span(trace_info, node_execution, trace_metadata)
|
||||
|
||||
@@ -155,8 +155,8 @@ def wrap_span_metadata(metadata, **kwargs):
|
||||
return metadata
|
||||
|
||||
|
||||
# Mapping from NodeType string values to OpenInference span kinds.
|
||||
# NodeType values not listed here default to CHAIN.
|
||||
# Mapping from built-in node type strings to OpenInference span kinds.
|
||||
# Node types not listed here default to CHAIN.
|
||||
_NODE_TYPE_TO_SPAN_KIND: dict[str, OpenInferenceSpanKindValues] = {
|
||||
"llm": OpenInferenceSpanKindValues.LLM,
|
||||
"knowledge-retrieval": OpenInferenceSpanKindValues.RETRIEVER,
|
||||
@@ -168,7 +168,7 @@ _NODE_TYPE_TO_SPAN_KIND: dict[str, OpenInferenceSpanKindValues] = {
|
||||
def _get_node_span_kind(node_type: str) -> OpenInferenceSpanKindValues:
|
||||
"""Return the OpenInference span kind for a given workflow node type.
|
||||
|
||||
Covers every ``NodeType`` enum value. Nodes that do not have a
|
||||
Covers every built-in node type string. Nodes that do not have a
|
||||
specialised span kind (e.g. ``start``, ``end``, ``if-else``,
|
||||
``code``, ``loop``, ``iteration``, etc.) are mapped to ``CHAIN``.
|
||||
"""
|
||||
|
||||
@@ -28,7 +28,7 @@ from core.ops.langfuse_trace.entities.langfuse_trace_entity import (
|
||||
)
|
||||
from core.ops.utils import filter_none_values
|
||||
from core.repositories import DifyCoreRepositoryFactory
|
||||
from dify_graph.enums import NodeType
|
||||
from dify_graph.enums import BuiltinNodeTypes
|
||||
from extensions.ext_database import db
|
||||
from models import EndUser, WorkflowNodeExecutionTriggeredFrom
|
||||
from models.enums import MessageStatus
|
||||
@@ -141,7 +141,7 @@ class LangFuseDataTrace(BaseTraceInstance):
|
||||
node_name = node_execution.title
|
||||
node_type = node_execution.node_type
|
||||
status = node_execution.status
|
||||
if node_type == NodeType.LLM:
|
||||
if node_type == BuiltinNodeTypes.LLM:
|
||||
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
|
||||
else:
|
||||
inputs = node_execution.inputs or {}
|
||||
|
||||
@@ -28,7 +28,7 @@ from core.ops.langsmith_trace.entities.langsmith_trace_entity import (
|
||||
)
|
||||
from core.ops.utils import filter_none_values, generate_dotted_order
|
||||
from core.repositories import DifyCoreRepositoryFactory
|
||||
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
|
||||
from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
|
||||
from extensions.ext_database import db
|
||||
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
|
||||
|
||||
@@ -163,7 +163,7 @@ class LangSmithDataTrace(BaseTraceInstance):
|
||||
node_name = node_execution.title
|
||||
node_type = node_execution.node_type
|
||||
status = node_execution.status
|
||||
if node_type == NodeType.LLM:
|
||||
if node_type == BuiltinNodeTypes.LLM:
|
||||
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
|
||||
else:
|
||||
inputs = node_execution.inputs or {}
|
||||
@@ -197,7 +197,7 @@ class LangSmithDataTrace(BaseTraceInstance):
|
||||
"ls_model_name": process_data.get("model_name", ""),
|
||||
}
|
||||
)
|
||||
elif node_type == NodeType.KNOWLEDGE_RETRIEVAL:
|
||||
elif node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
|
||||
run_type = LangSmithRunType.retriever
|
||||
else:
|
||||
run_type = LangSmithRunType.tool
|
||||
|
||||
@@ -23,7 +23,7 @@ from core.ops.entities.trace_entity import (
|
||||
TraceTaskName,
|
||||
WorkflowTraceInfo,
|
||||
)
|
||||
from dify_graph.enums import NodeType
|
||||
from dify_graph.enums import BuiltinNodeTypes
|
||||
from extensions.ext_database import db
|
||||
from models import EndUser
|
||||
from models.workflow import WorkflowNodeExecutionModel
|
||||
@@ -145,10 +145,10 @@ class MLflowDataTrace(BaseTraceInstance):
|
||||
"app_name": node.title,
|
||||
}
|
||||
|
||||
if node.node_type in (NodeType.LLM, NodeType.QUESTION_CLASSIFIER):
|
||||
if node.node_type in (BuiltinNodeTypes.LLM, BuiltinNodeTypes.QUESTION_CLASSIFIER):
|
||||
inputs, llm_attributes = self._parse_llm_inputs_and_attributes(node)
|
||||
attributes.update(llm_attributes)
|
||||
elif node.node_type == NodeType.HTTP_REQUEST:
|
||||
elif node.node_type == BuiltinNodeTypes.HTTP_REQUEST:
|
||||
inputs = node.process_data # contains request URL
|
||||
|
||||
if not inputs:
|
||||
@@ -180,9 +180,9 @@ class MLflowDataTrace(BaseTraceInstance):
|
||||
# End node span
|
||||
finished_at = node.created_at + timedelta(seconds=node.elapsed_time)
|
||||
outputs = json.loads(node.outputs) if node.outputs else {}
|
||||
if node.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
|
||||
if node.node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
|
||||
outputs = self._parse_knowledge_retrieval_outputs(outputs)
|
||||
elif node.node_type == NodeType.LLM:
|
||||
elif node.node_type == BuiltinNodeTypes.LLM:
|
||||
outputs = outputs.get("text", outputs)
|
||||
node_span.end(
|
||||
outputs=outputs,
|
||||
@@ -471,13 +471,13 @@ class MLflowDataTrace(BaseTraceInstance):
|
||||
def _get_node_span_type(self, node_type: str) -> str:
|
||||
"""Map Dify node types to MLflow span types"""
|
||||
node_type_mapping = {
|
||||
NodeType.LLM: SpanType.LLM,
|
||||
NodeType.QUESTION_CLASSIFIER: SpanType.LLM,
|
||||
NodeType.KNOWLEDGE_RETRIEVAL: SpanType.RETRIEVER,
|
||||
NodeType.TOOL: SpanType.TOOL,
|
||||
NodeType.CODE: SpanType.TOOL,
|
||||
NodeType.HTTP_REQUEST: SpanType.TOOL,
|
||||
NodeType.AGENT: SpanType.AGENT,
|
||||
BuiltinNodeTypes.LLM: SpanType.LLM,
|
||||
BuiltinNodeTypes.QUESTION_CLASSIFIER: SpanType.LLM,
|
||||
BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL: SpanType.RETRIEVER,
|
||||
BuiltinNodeTypes.TOOL: SpanType.TOOL,
|
||||
BuiltinNodeTypes.CODE: SpanType.TOOL,
|
||||
BuiltinNodeTypes.HTTP_REQUEST: SpanType.TOOL,
|
||||
BuiltinNodeTypes.AGENT: SpanType.AGENT,
|
||||
}
|
||||
return node_type_mapping.get(node_type, "CHAIN") # type: ignore[arg-type,call-overload]
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ from core.ops.entities.trace_entity import (
|
||||
WorkflowTraceInfo,
|
||||
)
|
||||
from core.repositories import DifyCoreRepositoryFactory
|
||||
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
|
||||
from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
|
||||
from extensions.ext_database import db
|
||||
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
|
||||
|
||||
@@ -187,7 +187,7 @@ class OpikDataTrace(BaseTraceInstance):
|
||||
node_name = node_execution.title
|
||||
node_type = node_execution.node_type
|
||||
status = node_execution.status
|
||||
if node_type == NodeType.LLM:
|
||||
if node_type == BuiltinNodeTypes.LLM:
|
||||
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
|
||||
else:
|
||||
inputs = node_execution.inputs or {}
|
||||
|
||||
@@ -628,10 +628,10 @@ class TraceTask:
|
||||
if not message_data:
|
||||
return {}
|
||||
conversation_mode_stmt = select(Conversation.mode).where(Conversation.id == message_data.conversation_id)
|
||||
conversation_mode = db.session.scalars(conversation_mode_stmt).all()
|
||||
if not conversation_mode or len(conversation_mode) == 0:
|
||||
conversation_modes = db.session.scalars(conversation_mode_stmt).all()
|
||||
if not conversation_modes or len(conversation_modes) == 0:
|
||||
return {}
|
||||
conversation_mode = conversation_mode[0]
|
||||
conversation_mode = conversation_modes[0]
|
||||
created_at = message_data.created_at
|
||||
inputs = message_data.message
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
|
||||
from dify_graph.entities.workflow_node_execution import (
|
||||
WorkflowNodeExecution,
|
||||
)
|
||||
from dify_graph.nodes import NodeType
|
||||
from dify_graph.nodes import BuiltinNodeTypes
|
||||
from extensions.ext_database import db
|
||||
from models import Account, App, TenantAccountJoin, WorkflowNodeExecutionTriggeredFrom
|
||||
|
||||
@@ -179,7 +179,7 @@ class TencentDataTrace(BaseTraceInstance):
|
||||
if node_span:
|
||||
self.trace_client.add_span(node_span)
|
||||
|
||||
if node_execution.node_type == NodeType.LLM:
|
||||
if node_execution.node_type == BuiltinNodeTypes.LLM:
|
||||
self._record_llm_metrics(node_execution)
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Failed to process node execution: %s", node_execution.id)
|
||||
@@ -192,15 +192,15 @@ class TencentDataTrace(BaseTraceInstance):
|
||||
) -> SpanData | None:
|
||||
"""Build span for different node types"""
|
||||
try:
|
||||
if node_execution.node_type == NodeType.LLM:
|
||||
if node_execution.node_type == BuiltinNodeTypes.LLM:
|
||||
return TencentSpanBuilder.build_workflow_llm_span(
|
||||
trace_id, workflow_span_id, trace_info, node_execution
|
||||
)
|
||||
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
|
||||
elif node_execution.node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
|
||||
return TencentSpanBuilder.build_workflow_retrieval_span(
|
||||
trace_id, workflow_span_id, trace_info, node_execution
|
||||
)
|
||||
elif node_execution.node_type == NodeType.TOOL:
|
||||
elif node_execution.node_type == BuiltinNodeTypes.TOOL:
|
||||
return TencentSpanBuilder.build_workflow_tool_span(
|
||||
trace_id, workflow_span_id, trace_info, node_execution
|
||||
)
|
||||
|
||||
@@ -31,7 +31,7 @@ from core.ops.entities.trace_entity import (
|
||||
)
|
||||
from core.ops.weave_trace.entities.weave_trace_entity import WeaveTraceModel
|
||||
from core.repositories import DifyCoreRepositoryFactory
|
||||
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
|
||||
from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
|
||||
from extensions.ext_database import db
|
||||
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
|
||||
|
||||
@@ -175,7 +175,7 @@ class WeaveDataTrace(BaseTraceInstance):
|
||||
node_name = node_execution.title
|
||||
node_type = node_execution.node_type
|
||||
status = node_execution.status
|
||||
if node_type == NodeType.LLM:
|
||||
if node_type == BuiltinNodeTypes.LLM:
|
||||
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
|
||||
else:
|
||||
inputs = node_execution.inputs or {}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from core.plugin.backwards_invocation.base import BaseBackwardsInvocation
|
||||
from dify_graph.enums import NodeType
|
||||
from dify_graph.enums import BuiltinNodeTypes
|
||||
from dify_graph.nodes.parameter_extractor.entities import (
|
||||
ModelConfig as ParameterExtractorModelConfig,
|
||||
)
|
||||
@@ -52,7 +52,7 @@ class PluginNodeBackwardsInvocation(BaseBackwardsInvocation):
|
||||
instruction=instruction, # instruct with variables are not supported
|
||||
)
|
||||
node_data_dict = node_data.model_dump()
|
||||
node_data_dict["type"] = NodeType.PARAMETER_EXTRACTOR
|
||||
node_data_dict["type"] = BuiltinNodeTypes.PARAMETER_EXTRACTOR
|
||||
execution = workflow_service.run_free_workflow_node(
|
||||
node_data_dict,
|
||||
tenant_id=tenant_id,
|
||||
|
||||
@@ -627,7 +627,7 @@ class ProviderManager:
|
||||
tenant_id=tenant_id,
|
||||
# TODO: Use provider name with prefix after the data migration.
|
||||
provider_name=ModelProviderID(provider_name).provider_name,
|
||||
provider_type=ProviderType.SYSTEM.value,
|
||||
provider_type=ProviderType.SYSTEM,
|
||||
quota_type=quota.quota_type,
|
||||
quota_limit=0, # type: ignore
|
||||
quota_used=0,
|
||||
|
||||
0
api/core/rag/datasource/vdb/hologres/__init__.py
Normal file
0
api/core/rag/datasource/vdb/hologres/__init__.py
Normal file
361
api/core/rag/datasource/vdb/hologres/hologres_vector.py
Normal file
361
api/core/rag/datasource/vdb/hologres/hologres_vector.py
Normal file
@@ -0,0 +1,361 @@
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import holo_search_sdk as holo # type: ignore
|
||||
from holo_search_sdk.types import BaseQuantizationType, DistanceType, TokenizerType
|
||||
from psycopg import sql as psql
|
||||
from pydantic import BaseModel, model_validator
|
||||
|
||||
from configs import dify_config
|
||||
from core.rag.datasource.vdb.vector_base import BaseVector
|
||||
from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory
|
||||
from core.rag.datasource.vdb.vector_type import VectorType
|
||||
from core.rag.embedding.embedding_base import Embeddings
|
||||
from core.rag.models.document import Document
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.dataset import Dataset
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HologresVectorConfig(BaseModel):
|
||||
"""
|
||||
Configuration for Hologres vector database connection.
|
||||
|
||||
In Hologres, access_key_id is used as the PostgreSQL username,
|
||||
and access_key_secret is used as the PostgreSQL password.
|
||||
"""
|
||||
|
||||
host: str
|
||||
port: int = 80
|
||||
database: str
|
||||
access_key_id: str
|
||||
access_key_secret: str
|
||||
schema_name: str = "public"
|
||||
tokenizer: TokenizerType = "jieba"
|
||||
distance_method: DistanceType = "Cosine"
|
||||
base_quantization_type: BaseQuantizationType = "rabitq"
|
||||
max_degree: int = 64
|
||||
ef_construction: int = 400
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def validate_config(cls, values: dict):
|
||||
if not values.get("host"):
|
||||
raise ValueError("config HOLOGRES_HOST is required")
|
||||
if not values.get("database"):
|
||||
raise ValueError("config HOLOGRES_DATABASE is required")
|
||||
if not values.get("access_key_id"):
|
||||
raise ValueError("config HOLOGRES_ACCESS_KEY_ID is required")
|
||||
if not values.get("access_key_secret"):
|
||||
raise ValueError("config HOLOGRES_ACCESS_KEY_SECRET is required")
|
||||
return values
|
||||
|
||||
|
||||
class HologresVector(BaseVector):
|
||||
"""
|
||||
Hologres vector storage implementation using holo-search-sdk.
|
||||
|
||||
Supports semantic search (vector), full-text search, and hybrid search.
|
||||
"""
|
||||
|
||||
def __init__(self, collection_name: str, config: HologresVectorConfig):
|
||||
super().__init__(collection_name)
|
||||
self._config = config
|
||||
self._client = self._init_client(config)
|
||||
self.table_name = f"embedding_{collection_name}".lower()
|
||||
|
||||
def _init_client(self, config: HologresVectorConfig):
|
||||
"""Initialize and return a holo-search-sdk client."""
|
||||
client = holo.connect(
|
||||
host=config.host,
|
||||
port=config.port,
|
||||
database=config.database,
|
||||
access_key_id=config.access_key_id,
|
||||
access_key_secret=config.access_key_secret,
|
||||
schema=config.schema_name,
|
||||
)
|
||||
client.connect()
|
||||
return client
|
||||
|
||||
def get_type(self) -> str:
|
||||
return VectorType.HOLOGRES
|
||||
|
||||
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
"""Create collection table with vector and full-text indexes, then add texts."""
|
||||
dimension = len(embeddings[0])
|
||||
self._create_collection(dimension)
|
||||
self.add_texts(texts, embeddings)
|
||||
|
||||
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
"""Add texts with embeddings to the collection using batch upsert."""
|
||||
if not documents:
|
||||
return []
|
||||
|
||||
pks: list[str] = []
|
||||
batch_size = 100
|
||||
for i in range(0, len(documents), batch_size):
|
||||
batch_docs = documents[i : i + batch_size]
|
||||
batch_embeddings = embeddings[i : i + batch_size]
|
||||
|
||||
values = []
|
||||
column_names = ["id", "text", "meta", "embedding"]
|
||||
|
||||
for j, doc in enumerate(batch_docs):
|
||||
doc_id = doc.metadata.get("doc_id", "") if doc.metadata else ""
|
||||
pks.append(doc_id)
|
||||
values.append(
|
||||
[
|
||||
doc_id,
|
||||
doc.page_content,
|
||||
json.dumps(doc.metadata or {}),
|
||||
batch_embeddings[j],
|
||||
]
|
||||
)
|
||||
|
||||
table = self._client.open_table(self.table_name)
|
||||
table.upsert_multi(
|
||||
index_column="id",
|
||||
values=values,
|
||||
column_names=column_names,
|
||||
update=True,
|
||||
update_columns=["text", "meta", "embedding"],
|
||||
)
|
||||
|
||||
return pks
|
||||
|
||||
def text_exists(self, id: str) -> bool:
|
||||
"""Check if a text with the given doc_id exists in the collection."""
|
||||
if not self._client.check_table_exist(self.table_name):
|
||||
return False
|
||||
|
||||
result = self._client.execute(
|
||||
psql.SQL("SELECT 1 FROM {} WHERE id = {} LIMIT 1").format(
|
||||
psql.Identifier(self.table_name), psql.Literal(id)
|
||||
),
|
||||
fetch_result=True,
|
||||
)
|
||||
return bool(result)
|
||||
|
||||
def get_ids_by_metadata_field(self, key: str, value: str) -> list[str] | None:
|
||||
"""Get document IDs by metadata field key and value."""
|
||||
result = self._client.execute(
|
||||
psql.SQL("SELECT id FROM {} WHERE meta->>{} = {}").format(
|
||||
psql.Identifier(self.table_name), psql.Literal(key), psql.Literal(value)
|
||||
),
|
||||
fetch_result=True,
|
||||
)
|
||||
if result:
|
||||
return [row[0] for row in result]
|
||||
return None
|
||||
|
||||
def delete_by_ids(self, ids: list[str]):
|
||||
"""Delete documents by their doc_id list."""
|
||||
if not ids:
|
||||
return
|
||||
if not self._client.check_table_exist(self.table_name):
|
||||
return
|
||||
|
||||
self._client.execute(
|
||||
psql.SQL("DELETE FROM {} WHERE id IN ({})").format(
|
||||
psql.Identifier(self.table_name),
|
||||
psql.SQL(", ").join(psql.Literal(id) for id in ids),
|
||||
)
|
||||
)
|
||||
|
||||
def delete_by_metadata_field(self, key: str, value: str):
|
||||
"""Delete documents by metadata field key and value."""
|
||||
if not self._client.check_table_exist(self.table_name):
|
||||
return
|
||||
|
||||
self._client.execute(
|
||||
psql.SQL("DELETE FROM {} WHERE meta->>{} = {}").format(
|
||||
psql.Identifier(self.table_name), psql.Literal(key), psql.Literal(value)
|
||||
)
|
||||
)
|
||||
|
||||
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
|
||||
"""Search for documents by vector similarity."""
|
||||
if not self._client.check_table_exist(self.table_name):
|
||||
return []
|
||||
|
||||
top_k = kwargs.get("top_k", 4)
|
||||
score_threshold = float(kwargs.get("score_threshold") or 0.0)
|
||||
|
||||
table = self._client.open_table(self.table_name)
|
||||
query = (
|
||||
table.search_vector(
|
||||
vector=query_vector,
|
||||
column="embedding",
|
||||
distance_method=self._config.distance_method,
|
||||
output_name="distance",
|
||||
)
|
||||
.select(["id", "text", "meta"])
|
||||
.limit(top_k)
|
||||
)
|
||||
|
||||
# Apply document_ids_filter if provided
|
||||
document_ids_filter = kwargs.get("document_ids_filter")
|
||||
if document_ids_filter:
|
||||
filter_sql = psql.SQL("meta->>'document_id' IN ({})").format(
|
||||
psql.SQL(", ").join(psql.Literal(id) for id in document_ids_filter)
|
||||
)
|
||||
query = query.where(filter_sql)
|
||||
|
||||
results = query.fetchall()
|
||||
return self._process_vector_results(results, score_threshold)
|
||||
|
||||
def _process_vector_results(self, results: list, score_threshold: float) -> list[Document]:
|
||||
"""Process vector search results into Document objects."""
|
||||
docs = []
|
||||
for row in results:
|
||||
# row format: (distance, id, text, meta)
|
||||
# distance is first because search_vector() adds the computed column before selected columns
|
||||
distance = row[0]
|
||||
text = row[2]
|
||||
meta = row[3]
|
||||
|
||||
if isinstance(meta, str):
|
||||
meta = json.loads(meta)
|
||||
|
||||
# Convert distance to similarity score (consistent with pgvector)
|
||||
score = 1 - distance
|
||||
meta["score"] = score
|
||||
|
||||
if score >= score_threshold:
|
||||
docs.append(Document(page_content=text, metadata=meta))
|
||||
|
||||
return docs
|
||||
|
||||
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
|
||||
"""Search for documents by full-text search."""
|
||||
if not self._client.check_table_exist(self.table_name):
|
||||
return []
|
||||
|
||||
top_k = kwargs.get("top_k", 4)
|
||||
|
||||
table = self._client.open_table(self.table_name)
|
||||
search_query = table.search_text(
|
||||
column="text",
|
||||
expression=query,
|
||||
return_score=True,
|
||||
return_score_name="score",
|
||||
return_all_columns=True,
|
||||
).limit(top_k)
|
||||
|
||||
# Apply document_ids_filter if provided
|
||||
document_ids_filter = kwargs.get("document_ids_filter")
|
||||
if document_ids_filter:
|
||||
filter_sql = psql.SQL("meta->>'document_id' IN ({})").format(
|
||||
psql.SQL(", ").join(psql.Literal(id) for id in document_ids_filter)
|
||||
)
|
||||
search_query = search_query.where(filter_sql)
|
||||
|
||||
results = search_query.fetchall()
|
||||
return self._process_full_text_results(results)
|
||||
|
||||
def _process_full_text_results(self, results: list) -> list[Document]:
|
||||
"""Process full-text search results into Document objects."""
|
||||
docs = []
|
||||
for row in results:
|
||||
# row format: (id, text, meta, embedding, score)
|
||||
text = row[1]
|
||||
meta = row[2]
|
||||
score = row[-1] # score is the last column from return_score
|
||||
|
||||
if isinstance(meta, str):
|
||||
meta = json.loads(meta)
|
||||
|
||||
meta["score"] = score
|
||||
docs.append(Document(page_content=text, metadata=meta))
|
||||
|
||||
return docs
|
||||
|
||||
def delete(self):
|
||||
"""Delete the entire collection table."""
|
||||
if self._client.check_table_exist(self.table_name):
|
||||
self._client.drop_table(self.table_name)
|
||||
|
||||
def _create_collection(self, dimension: int):
|
||||
"""Create the collection table with vector and full-text indexes."""
|
||||
lock_name = f"vector_indexing_lock_{self._collection_name}"
|
||||
with redis_client.lock(lock_name, timeout=20):
|
||||
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
|
||||
if redis_client.get(collection_exist_cache_key):
|
||||
return
|
||||
|
||||
if not self._client.check_table_exist(self.table_name):
|
||||
# Create table via SQL with CHECK constraint for vector dimension
|
||||
create_table_sql = psql.SQL("""
|
||||
CREATE TABLE IF NOT EXISTS {} (
|
||||
id TEXT PRIMARY KEY,
|
||||
text TEXT NOT NULL,
|
||||
meta JSONB NOT NULL,
|
||||
embedding float4[] NOT NULL
|
||||
CHECK (array_ndims(embedding) = 1
|
||||
AND array_length(embedding, 1) = {})
|
||||
);
|
||||
""").format(psql.Identifier(self.table_name), psql.Literal(dimension))
|
||||
self._client.execute(create_table_sql)
|
||||
|
||||
# Wait for table to be fully ready before creating indexes
|
||||
max_wait_seconds = 30
|
||||
poll_interval = 2
|
||||
for _ in range(max_wait_seconds // poll_interval):
|
||||
if self._client.check_table_exist(self.table_name):
|
||||
break
|
||||
time.sleep(poll_interval)
|
||||
else:
|
||||
raise RuntimeError(f"Table {self.table_name} was not ready after {max_wait_seconds}s")
|
||||
|
||||
# Open table and set vector index
|
||||
table = self._client.open_table(self.table_name)
|
||||
table.set_vector_index(
|
||||
column="embedding",
|
||||
distance_method=self._config.distance_method,
|
||||
base_quantization_type=self._config.base_quantization_type,
|
||||
max_degree=self._config.max_degree,
|
||||
ef_construction=self._config.ef_construction,
|
||||
use_reorder=self._config.base_quantization_type == "rabitq",
|
||||
)
|
||||
|
||||
# Create full-text search index
|
||||
table.create_text_index(
|
||||
index_name=f"ft_idx_{self._collection_name}",
|
||||
column="text",
|
||||
tokenizer=self._config.tokenizer,
|
||||
)
|
||||
|
||||
redis_client.set(collection_exist_cache_key, 1, ex=3600)
|
||||
|
||||
|
||||
class HologresVectorFactory(AbstractVectorFactory):
|
||||
"""Factory class for creating HologresVector instances."""
|
||||
|
||||
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> HologresVector:
|
||||
if dataset.index_struct_dict:
|
||||
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
|
||||
collection_name = class_prefix
|
||||
else:
|
||||
dataset_id = dataset.id
|
||||
collection_name = Dataset.gen_collection_name_by_id(dataset_id)
|
||||
dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.HOLOGRES, collection_name))
|
||||
|
||||
return HologresVector(
|
||||
collection_name=collection_name,
|
||||
config=HologresVectorConfig(
|
||||
host=dify_config.HOLOGRES_HOST or "",
|
||||
port=dify_config.HOLOGRES_PORT,
|
||||
database=dify_config.HOLOGRES_DATABASE or "",
|
||||
access_key_id=dify_config.HOLOGRES_ACCESS_KEY_ID or "",
|
||||
access_key_secret=dify_config.HOLOGRES_ACCESS_KEY_SECRET or "",
|
||||
schema_name=dify_config.HOLOGRES_SCHEMA,
|
||||
tokenizer=dify_config.HOLOGRES_TOKENIZER,
|
||||
distance_method=dify_config.HOLOGRES_DISTANCE_METHOD,
|
||||
base_quantization_type=dify_config.HOLOGRES_BASE_QUANTIZATION_TYPE,
|
||||
max_degree=dify_config.HOLOGRES_MAX_DEGREE,
|
||||
ef_construction=dify_config.HOLOGRES_EF_CONSTRUCTION,
|
||||
),
|
||||
)
|
||||
@@ -38,7 +38,7 @@ class AbstractVectorFactory(ABC):
|
||||
class Vector:
|
||||
def __init__(self, dataset: Dataset, attributes: list | None = None):
|
||||
if attributes is None:
|
||||
attributes = ["doc_id", "dataset_id", "document_id", "doc_hash"]
|
||||
attributes = ["doc_id", "dataset_id", "document_id", "doc_hash", "doc_type"]
|
||||
self._dataset = dataset
|
||||
self._embeddings = self._get_embeddings()
|
||||
self._attributes = attributes
|
||||
@@ -191,6 +191,10 @@ class Vector:
|
||||
from core.rag.datasource.vdb.iris.iris_vector import IrisVectorFactory
|
||||
|
||||
return IrisVectorFactory
|
||||
case VectorType.HOLOGRES:
|
||||
from core.rag.datasource.vdb.hologres.hologres_vector import HologresVectorFactory
|
||||
|
||||
return HologresVectorFactory
|
||||
case _:
|
||||
raise ValueError(f"Vector store {vector_type} is not supported.")
|
||||
|
||||
|
||||
@@ -34,3 +34,4 @@ class VectorType(StrEnum):
|
||||
MATRIXONE = "matrixone"
|
||||
CLICKZETTA = "clickzetta"
|
||||
IRIS = "iris"
|
||||
HOLOGRES = "hologres"
|
||||
|
||||
@@ -196,6 +196,7 @@ class WeaviateVector(BaseVector):
|
||||
),
|
||||
wc.Property(name="document_id", data_type=wc.DataType.TEXT),
|
||||
wc.Property(name="doc_id", data_type=wc.DataType.TEXT),
|
||||
wc.Property(name="doc_type", data_type=wc.DataType.TEXT),
|
||||
wc.Property(name="chunk_index", data_type=wc.DataType.INT),
|
||||
],
|
||||
vector_config=wc.Configure.Vectors.self_provided(),
|
||||
@@ -225,6 +226,8 @@ class WeaviateVector(BaseVector):
|
||||
to_add.append(wc.Property(name="document_id", data_type=wc.DataType.TEXT))
|
||||
if "doc_id" not in existing:
|
||||
to_add.append(wc.Property(name="doc_id", data_type=wc.DataType.TEXT))
|
||||
if "doc_type" not in existing:
|
||||
to_add.append(wc.Property(name="doc_type", data_type=wc.DataType.TEXT))
|
||||
if "chunk_index" not in existing:
|
||||
to_add.append(wc.Property(name="chunk_index", data_type=wc.DataType.INT))
|
||||
|
||||
|
||||
@@ -9,8 +9,8 @@ from flask import current_app
|
||||
from sqlalchemy import delete, func, select
|
||||
|
||||
from core.db.session_factory import session_factory
|
||||
from dify_graph.nodes.knowledge_index.exc import KnowledgeIndexNodeError
|
||||
from dify_graph.repositories.index_processor_protocol import Preview, PreviewItem, QaPreview
|
||||
from core.workflow.nodes.knowledge_index.exc import KnowledgeIndexNodeError
|
||||
from core.workflow.nodes.knowledge_index.protocols import Preview, PreviewItem, QaPreview
|
||||
from models.dataset import Dataset, Document, DocumentSegment
|
||||
|
||||
from .index_processor_factory import IndexProcessorFactory
|
||||
|
||||
@@ -56,18 +56,18 @@ from core.rag.retrieval.template_prompts import (
|
||||
)
|
||||
from core.tools.signature import sign_upload_file
|
||||
from core.tools.utils.dataset_retriever.dataset_retriever_base_tool import DatasetRetrieverBaseTool
|
||||
from dify_graph.file import File, FileTransferMethod, FileType
|
||||
from dify_graph.model_runtime.entities.llm_entities import LLMMode, LLMResult, LLMUsage
|
||||
from dify_graph.model_runtime.entities.message_entities import PromptMessage, PromptMessageRole, PromptMessageTool
|
||||
from dify_graph.model_runtime.entities.model_entities import ModelFeature, ModelType
|
||||
from dify_graph.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
|
||||
from dify_graph.nodes.knowledge_retrieval import exc
|
||||
from dify_graph.repositories.rag_retrieval_protocol import (
|
||||
from core.workflow.nodes.knowledge_retrieval import exc
|
||||
from core.workflow.nodes.knowledge_retrieval.retrieval import (
|
||||
KnowledgeRetrievalRequest,
|
||||
Source,
|
||||
SourceChildChunk,
|
||||
SourceMetadata,
|
||||
)
|
||||
from dify_graph.file import File, FileTransferMethod, FileType
|
||||
from dify_graph.model_runtime.entities.llm_entities import LLMMode, LLMResult, LLMUsage
|
||||
from dify_graph.model_runtime.entities.message_entities import PromptMessage, PromptMessageRole, PromptMessageTool
|
||||
from dify_graph.model_runtime.entities.model_entities import ModelFeature, ModelType
|
||||
from dify_graph.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs.json_in_md_parser import parse_and_check_json_markdown
|
||||
@@ -83,6 +83,7 @@ from models.dataset import (
|
||||
)
|
||||
from models.dataset import Document as DatasetDocument
|
||||
from models.dataset import Document as DocumentModel
|
||||
from models.enums import CreatorUserRole
|
||||
from services.external_knowledge_service import ExternalDatasetService
|
||||
from services.feature_service import FeatureService
|
||||
|
||||
@@ -1009,7 +1010,7 @@ class DatasetRetrieval:
|
||||
content=json.dumps(contents),
|
||||
source="app",
|
||||
source_app_id=app_id,
|
||||
created_by_role=user_from,
|
||||
created_by_role=CreatorUserRole(user_from),
|
||||
created_by=user_id,
|
||||
)
|
||||
dataset_queries.append(dataset_query)
|
||||
|
||||
@@ -146,7 +146,9 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository):
|
||||
|
||||
# No sequence number generation needed anymore
|
||||
|
||||
db_model.type = domain_model.workflow_type
|
||||
from models.workflow import WorkflowType as ModelWorkflowType
|
||||
|
||||
db_model.type = ModelWorkflowType(domain_model.workflow_type.value)
|
||||
db_model.version = domain_model.workflow_version
|
||||
db_model.graph = json.dumps(domain_model.graph) if domain_model.graph else None
|
||||
db_model.inputs = json.dumps(domain_model.inputs) if domain_model.inputs else None
|
||||
|
||||
@@ -18,7 +18,7 @@ from tenacity import before_sleep_log, retry, retry_if_exception, stop_after_att
|
||||
|
||||
from configs import dify_config
|
||||
from dify_graph.entities import WorkflowNodeExecution
|
||||
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
|
||||
from dify_graph.enums import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
|
||||
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
|
||||
from dify_graph.repositories.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository
|
||||
from dify_graph.workflow_type_encoder import WorkflowRuntimeTypeConverter
|
||||
@@ -146,7 +146,7 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository)
|
||||
index=db_model.index,
|
||||
predecessor_node_id=db_model.predecessor_node_id,
|
||||
node_id=db_model.node_id,
|
||||
node_type=NodeType(db_model.node_type),
|
||||
node_type=db_model.node_type,
|
||||
title=db_model.title,
|
||||
inputs=inputs,
|
||||
process_data=process_data,
|
||||
|
||||
@@ -116,6 +116,7 @@ class ToolParameterConfigurationManager:
|
||||
|
||||
return a deep copy of parameters with decrypted values
|
||||
"""
|
||||
parameters = self._deep_copy(parameters)
|
||||
|
||||
cache = ToolParameterCache(
|
||||
tenant_id=self.tenant_id,
|
||||
|
||||
@@ -3,7 +3,7 @@ from typing import Any
|
||||
|
||||
from core.tools.entities.tool_entities import WorkflowToolParameterConfiguration
|
||||
from core.tools.errors import WorkflowToolHumanInputNotSupportedError
|
||||
from dify_graph.enums import NodeType
|
||||
from dify_graph.enums import BuiltinNodeTypes
|
||||
from dify_graph.nodes.base.entities import OutputVariableEntity
|
||||
from dify_graph.variables.input_entities import VariableEntity
|
||||
|
||||
@@ -51,7 +51,7 @@ class WorkflowToolConfigurationUtils:
|
||||
def ensure_no_human_input_nodes(cls, graph: Mapping[str, Any]) -> None:
|
||||
nodes = graph.get("nodes", [])
|
||||
for node in nodes:
|
||||
if node.get("data", {}).get("type") == NodeType.HUMAN_INPUT:
|
||||
if node.get("data", {}).get("type") == BuiltinNodeTypes.HUMAN_INPUT:
|
||||
raise WorkflowToolHumanInputNotSupportedError()
|
||||
|
||||
@classmethod
|
||||
|
||||
18
api/core/trigger/constants.py
Normal file
18
api/core/trigger/constants.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from typing import Final
|
||||
|
||||
TRIGGER_WEBHOOK_NODE_TYPE: Final[str] = "trigger-webhook"
|
||||
TRIGGER_SCHEDULE_NODE_TYPE: Final[str] = "trigger-schedule"
|
||||
TRIGGER_PLUGIN_NODE_TYPE: Final[str] = "trigger-plugin"
|
||||
TRIGGER_INFO_METADATA_KEY: Final[str] = "trigger_info"
|
||||
|
||||
TRIGGER_NODE_TYPES: Final[frozenset[str]] = frozenset(
|
||||
{
|
||||
TRIGGER_WEBHOOK_NODE_TYPE,
|
||||
TRIGGER_SCHEDULE_NODE_TYPE,
|
||||
TRIGGER_PLUGIN_NODE_TYPE,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def is_trigger_node_type(node_type: str) -> bool:
|
||||
return node_type in TRIGGER_NODE_TYPES
|
||||
@@ -11,6 +11,11 @@ from typing import Any
|
||||
from pydantic import BaseModel
|
||||
|
||||
from core.plugin.entities.request import TriggerInvokeEventResponse
|
||||
from core.trigger.constants import (
|
||||
TRIGGER_PLUGIN_NODE_TYPE,
|
||||
TRIGGER_SCHEDULE_NODE_TYPE,
|
||||
TRIGGER_WEBHOOK_NODE_TYPE,
|
||||
)
|
||||
from core.trigger.debug.event_bus import TriggerDebugEventBus
|
||||
from core.trigger.debug.events import (
|
||||
PluginTriggerDebugEvent,
|
||||
@@ -19,10 +24,9 @@ from core.trigger.debug.events import (
|
||||
build_plugin_pool_key,
|
||||
build_webhook_pool_key,
|
||||
)
|
||||
from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData
|
||||
from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig
|
||||
from dify_graph.entities.graph_config import NodeConfigDict
|
||||
from dify_graph.enums import NodeType
|
||||
from dify_graph.nodes.trigger_plugin.entities import TriggerEventNodeData
|
||||
from dify_graph.nodes.trigger_schedule.entities import ScheduleConfig
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs.datetime_utils import ensure_naive_utc, naive_utc_now
|
||||
from libs.schedule_utils import calculate_next_run_at
|
||||
@@ -206,21 +210,19 @@ def create_event_poller(
|
||||
if not node_config:
|
||||
raise ValueError("Node data not found for node %s", node_id)
|
||||
node_type = draft_workflow.get_node_type_from_node_config(node_config)
|
||||
match node_type:
|
||||
case NodeType.TRIGGER_PLUGIN:
|
||||
return PluginTriggerDebugEventPoller(
|
||||
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
|
||||
)
|
||||
case NodeType.TRIGGER_WEBHOOK:
|
||||
return WebhookTriggerDebugEventPoller(
|
||||
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
|
||||
)
|
||||
case NodeType.TRIGGER_SCHEDULE:
|
||||
return ScheduleTriggerDebugEventPoller(
|
||||
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
|
||||
)
|
||||
case _:
|
||||
raise ValueError("unable to create event poller for node type %s", node_type)
|
||||
if node_type == TRIGGER_PLUGIN_NODE_TYPE:
|
||||
return PluginTriggerDebugEventPoller(
|
||||
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
|
||||
)
|
||||
if node_type == TRIGGER_WEBHOOK_NODE_TYPE:
|
||||
return WebhookTriggerDebugEventPoller(
|
||||
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
|
||||
)
|
||||
if node_type == TRIGGER_SCHEDULE_NODE_TYPE:
|
||||
return ScheduleTriggerDebugEventPoller(
|
||||
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
|
||||
)
|
||||
raise ValueError("unable to create event poller for node type %s", node_type)
|
||||
|
||||
|
||||
def select_trigger_debug_events(
|
||||
|
||||
@@ -1,4 +1 @@
|
||||
from .node_factory import DifyNodeFactory
|
||||
from .workflow_entry import WorkflowEntry
|
||||
|
||||
__all__ = ["DifyNodeFactory", "WorkflowEntry"]
|
||||
"""Core workflow package."""
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
from collections.abc import Callable, Mapping
|
||||
import importlib
|
||||
import pkgutil
|
||||
from collections.abc import Callable, Iterator, Mapping, MutableMapping
|
||||
from functools import lru_cache
|
||||
from typing import TYPE_CHECKING, Any, TypeAlias, cast, final
|
||||
|
||||
from sqlalchemy import select
|
||||
@@ -8,7 +11,6 @@ from typing_extensions import override
|
||||
from configs import dify_config
|
||||
from core.app.entities.app_invoke_entities import DifyRunContext
|
||||
from core.app.llm.model_access import build_dify_model_access
|
||||
from core.datasource.datasource_manager import DatasourceManager
|
||||
from core.helper.code_executor.code_executor import (
|
||||
CodeExecutionError,
|
||||
CodeExecutor,
|
||||
@@ -17,15 +19,19 @@ from core.helper.ssrf_proxy import ssrf_proxy
|
||||
from core.memory.token_buffer_memory import TokenBufferMemory
|
||||
from core.model_manager import ModelInstance
|
||||
from core.prompt.entities.advanced_prompt_entities import MemoryConfig
|
||||
from core.rag.index_processor.index_processor import IndexProcessor
|
||||
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
|
||||
from core.rag.summary_index.summary_index import SummaryIndex
|
||||
from core.repositories.human_input_repository import HumanInputFormRepositoryImpl
|
||||
from core.tools.tool_file_manager import ToolFileManager
|
||||
from core.trigger.constants import TRIGGER_NODE_TYPES
|
||||
from core.workflow.nodes.agent.message_transformer import AgentMessageTransformer
|
||||
from core.workflow.nodes.agent.plugin_strategy_adapter import (
|
||||
PluginAgentStrategyPresentationProvider,
|
||||
PluginAgentStrategyResolver,
|
||||
)
|
||||
from core.workflow.nodes.agent.runtime_support import AgentRuntimeSupport
|
||||
from dify_graph.entities.base_node_data import BaseNodeData
|
||||
from dify_graph.entities.graph_config import NodeConfigDict, NodeConfigDictAdapter
|
||||
from dify_graph.entities.graph_init_params import DIFY_RUN_CONTEXT_KEY
|
||||
from dify_graph.enums import NodeType, SystemVariableKey
|
||||
from dify_graph.enums import BuiltinNodeTypes, NodeType, SystemVariableKey
|
||||
from dify_graph.file.file_manager import file_manager
|
||||
from dify_graph.graph.graph import NodeFactory
|
||||
from dify_graph.model_runtime.entities.model_entities import ModelType
|
||||
@@ -39,7 +45,6 @@ from dify_graph.nodes.document_extractor import UnstructuredApiConfig
|
||||
from dify_graph.nodes.http_request import build_http_request_config
|
||||
from dify_graph.nodes.llm.entities import LLMNodeData
|
||||
from dify_graph.nodes.llm.exc import LLMModeRequiredError, ModelNotExistError
|
||||
from dify_graph.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
|
||||
from dify_graph.nodes.parameter_extractor.entities import ParameterExtractorNodeData
|
||||
from dify_graph.nodes.question_classifier.entities import QuestionClassifierNodeData
|
||||
from dify_graph.nodes.template_transform.template_renderer import (
|
||||
@@ -53,6 +58,135 @@ if TYPE_CHECKING:
|
||||
from dify_graph.entities import GraphInitParams
|
||||
from dify_graph.runtime import GraphRuntimeState
|
||||
|
||||
LATEST_VERSION = "latest"
|
||||
_START_NODE_TYPES: frozenset[NodeType] = frozenset(
|
||||
(BuiltinNodeTypes.START, BuiltinNodeTypes.DATASOURCE, *TRIGGER_NODE_TYPES)
|
||||
)
|
||||
|
||||
|
||||
def _import_node_package(package_name: str, *, excluded_modules: frozenset[str] = frozenset()) -> None:
|
||||
package = importlib.import_module(package_name)
|
||||
for _, module_name, _ in pkgutil.walk_packages(package.__path__, package.__name__ + "."):
|
||||
if module_name in excluded_modules:
|
||||
continue
|
||||
importlib.import_module(module_name)
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def register_nodes() -> None:
|
||||
"""Import production node modules so they self-register with ``Node``."""
|
||||
_import_node_package("dify_graph.nodes")
|
||||
_import_node_package("core.workflow.nodes")
|
||||
|
||||
|
||||
def get_node_type_classes_mapping() -> Mapping[NodeType, Mapping[str, type[Node]]]:
|
||||
"""Return a read-only snapshot of the current production node registry.
|
||||
|
||||
The workflow layer owns node bootstrap because it must compose built-in
|
||||
`dify_graph.nodes.*` implementations with workflow-local nodes under
|
||||
`core.workflow.nodes.*`. Keeping this import side effect here avoids
|
||||
reintroducing registry bootstrapping into lower-level graph primitives.
|
||||
"""
|
||||
register_nodes()
|
||||
return Node.get_node_type_classes_mapping()
|
||||
|
||||
|
||||
def resolve_workflow_node_class(*, node_type: NodeType, node_version: str) -> type[Node]:
|
||||
node_mapping = get_node_type_classes_mapping().get(node_type)
|
||||
if not node_mapping:
|
||||
raise ValueError(f"No class mapping found for node type: {node_type}")
|
||||
|
||||
latest_node_class = node_mapping.get(LATEST_VERSION)
|
||||
matched_node_class = node_mapping.get(node_version)
|
||||
node_class = matched_node_class or latest_node_class
|
||||
if not node_class:
|
||||
raise ValueError(f"No latest version class found for node type: {node_type}")
|
||||
return node_class
|
||||
|
||||
|
||||
def is_start_node_type(node_type: NodeType) -> bool:
|
||||
"""Return True when the node type can serve as a workflow entry point."""
|
||||
return node_type in _START_NODE_TYPES
|
||||
|
||||
|
||||
def get_default_root_node_id(graph_config: Mapping[str, Any]) -> str:
|
||||
"""Resolve the default entry node for a persisted top-level workflow graph.
|
||||
|
||||
This workflow-layer helper depends on start-node semantics defined by
|
||||
`is_start_node_type`, so it intentionally lives next to the node registry
|
||||
instead of in the raw `dify_graph.entities.graph_config` schema module.
|
||||
"""
|
||||
nodes = graph_config.get("nodes")
|
||||
if not isinstance(nodes, list):
|
||||
raise ValueError("nodes in workflow graph must be a list")
|
||||
|
||||
for node in nodes:
|
||||
if not isinstance(node, Mapping):
|
||||
continue
|
||||
|
||||
if node.get("type") == "custom-note":
|
||||
continue
|
||||
|
||||
node_id = node.get("id")
|
||||
data = node.get("data")
|
||||
if not isinstance(node_id, str) or not isinstance(data, Mapping):
|
||||
continue
|
||||
|
||||
node_type = data.get("type")
|
||||
if isinstance(node_type, str) and is_start_node_type(node_type):
|
||||
return node_id
|
||||
|
||||
raise ValueError("Unable to determine default root node ID from workflow graph")
|
||||
|
||||
|
||||
class _LazyNodeTypeClassesMapping(MutableMapping[NodeType, Mapping[str, type[Node]]]):
|
||||
"""Mutable dict-like view over the current node registry."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._cached_snapshot: dict[NodeType, Mapping[str, type[Node]]] = {}
|
||||
self._cached_version = -1
|
||||
self._deleted: set[NodeType] = set()
|
||||
self._overrides: dict[NodeType, Mapping[str, type[Node]]] = {}
|
||||
|
||||
def _snapshot(self) -> dict[NodeType, Mapping[str, type[Node]]]:
|
||||
current_version = Node.get_registry_version()
|
||||
if self._cached_version != current_version:
|
||||
self._cached_snapshot = dict(get_node_type_classes_mapping())
|
||||
self._cached_version = current_version
|
||||
if not self._deleted and not self._overrides:
|
||||
return self._cached_snapshot
|
||||
|
||||
snapshot = {key: value for key, value in self._cached_snapshot.items() if key not in self._deleted}
|
||||
snapshot.update(self._overrides)
|
||||
return snapshot
|
||||
|
||||
def __getitem__(self, key: NodeType) -> Mapping[str, type[Node]]:
|
||||
return self._snapshot()[key]
|
||||
|
||||
def __setitem__(self, key: NodeType, value: Mapping[str, type[Node]]) -> None:
|
||||
self._deleted.discard(key)
|
||||
self._overrides[key] = value
|
||||
|
||||
def __delitem__(self, key: NodeType) -> None:
|
||||
if key in self._overrides:
|
||||
del self._overrides[key]
|
||||
return
|
||||
if key in self._cached_snapshot:
|
||||
self._deleted.add(key)
|
||||
return
|
||||
raise KeyError(key)
|
||||
|
||||
def __iter__(self) -> Iterator[NodeType]:
|
||||
return iter(self._snapshot())
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._snapshot())
|
||||
|
||||
|
||||
# Keep the canonical node-class mapping in the workflow layer that also bootstraps
|
||||
# legacy `core.workflow.nodes.*` registrations.
|
||||
NODE_TYPE_CLASSES_MAPPING: MutableMapping[NodeType, Mapping[str, type[Node]]] = _LazyNodeTypeClassesMapping()
|
||||
|
||||
|
||||
LLMCompatibleNodeData: TypeAlias = LLMNodeData | QuestionClassifierNodeData | ParameterExtractorNodeData
|
||||
|
||||
@@ -97,10 +231,7 @@ class DefaultWorkflowCodeExecutor:
|
||||
@final
|
||||
class DifyNodeFactory(NodeFactory):
|
||||
"""
|
||||
Default implementation of NodeFactory that uses the traditional node mapping.
|
||||
|
||||
This factory creates nodes by looking up their types in NODE_TYPE_CLASSES_MAPPING
|
||||
and instantiating the appropriate node class.
|
||||
Default implementation of NodeFactory that resolves node classes from the live registry.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -127,7 +258,6 @@ class DifyNodeFactory(NodeFactory):
|
||||
self._http_request_http_client = ssrf_proxy
|
||||
self._http_request_tool_file_manager_factory = ToolFileManager
|
||||
self._http_request_file_manager = file_manager
|
||||
self._rag_retrieval = DatasetRetrieval()
|
||||
self._document_extractor_unstructured_api_config = UnstructuredApiConfig(
|
||||
api_url=dify_config.UNSTRUCTURED_API_URL,
|
||||
api_key=dify_config.UNSTRUCTURED_API_KEY or "",
|
||||
@@ -143,6 +273,10 @@ class DifyNodeFactory(NodeFactory):
|
||||
)
|
||||
|
||||
self._llm_credentials_provider, self._llm_model_factory = build_dify_model_access(self._dify_context.tenant_id)
|
||||
self._agent_strategy_resolver = PluginAgentStrategyResolver()
|
||||
self._agent_strategy_presentation_provider = PluginAgentStrategyPresentationProvider()
|
||||
self._agent_runtime_support = AgentRuntimeSupport()
|
||||
self._agent_message_transformer = AgentMessageTransformer()
|
||||
|
||||
@staticmethod
|
||||
def _resolve_dify_context(run_context: Mapping[str, Any]) -> DifyRunContext:
|
||||
@@ -170,55 +304,51 @@ class DifyNodeFactory(NodeFactory):
|
||||
node_class = self._resolve_node_class(node_type=node_data.type, node_version=str(node_data.version))
|
||||
node_type = node_data.type
|
||||
node_init_kwargs_factories: Mapping[NodeType, Callable[[], dict[str, object]]] = {
|
||||
NodeType.CODE: lambda: {
|
||||
BuiltinNodeTypes.CODE: lambda: {
|
||||
"code_executor": self._code_executor,
|
||||
"code_limits": self._code_limits,
|
||||
},
|
||||
NodeType.TEMPLATE_TRANSFORM: lambda: {
|
||||
BuiltinNodeTypes.TEMPLATE_TRANSFORM: lambda: {
|
||||
"template_renderer": self._template_renderer,
|
||||
"max_output_length": self._template_transform_max_output_length,
|
||||
},
|
||||
NodeType.HTTP_REQUEST: lambda: {
|
||||
BuiltinNodeTypes.HTTP_REQUEST: lambda: {
|
||||
"http_request_config": self._http_request_config,
|
||||
"http_client": self._http_request_http_client,
|
||||
"tool_file_manager_factory": self._http_request_tool_file_manager_factory,
|
||||
"file_manager": self._http_request_file_manager,
|
||||
},
|
||||
NodeType.HUMAN_INPUT: lambda: {
|
||||
BuiltinNodeTypes.HUMAN_INPUT: lambda: {
|
||||
"form_repository": HumanInputFormRepositoryImpl(tenant_id=self._dify_context.tenant_id),
|
||||
},
|
||||
NodeType.KNOWLEDGE_INDEX: lambda: {
|
||||
"index_processor": IndexProcessor(),
|
||||
"summary_index_service": SummaryIndex(),
|
||||
},
|
||||
NodeType.LLM: lambda: self._build_llm_compatible_node_init_kwargs(
|
||||
BuiltinNodeTypes.LLM: lambda: self._build_llm_compatible_node_init_kwargs(
|
||||
node_class=node_class,
|
||||
node_data=node_data,
|
||||
include_http_client=True,
|
||||
),
|
||||
NodeType.DATASOURCE: lambda: {
|
||||
"datasource_manager": DatasourceManager,
|
||||
},
|
||||
NodeType.KNOWLEDGE_RETRIEVAL: lambda: {
|
||||
"rag_retrieval": self._rag_retrieval,
|
||||
},
|
||||
NodeType.DOCUMENT_EXTRACTOR: lambda: {
|
||||
BuiltinNodeTypes.DOCUMENT_EXTRACTOR: lambda: {
|
||||
"unstructured_api_config": self._document_extractor_unstructured_api_config,
|
||||
"http_client": self._http_request_http_client,
|
||||
},
|
||||
NodeType.QUESTION_CLASSIFIER: lambda: self._build_llm_compatible_node_init_kwargs(
|
||||
BuiltinNodeTypes.QUESTION_CLASSIFIER: lambda: self._build_llm_compatible_node_init_kwargs(
|
||||
node_class=node_class,
|
||||
node_data=node_data,
|
||||
include_http_client=True,
|
||||
),
|
||||
NodeType.PARAMETER_EXTRACTOR: lambda: self._build_llm_compatible_node_init_kwargs(
|
||||
BuiltinNodeTypes.PARAMETER_EXTRACTOR: lambda: self._build_llm_compatible_node_init_kwargs(
|
||||
node_class=node_class,
|
||||
node_data=node_data,
|
||||
include_http_client=False,
|
||||
),
|
||||
NodeType.TOOL: lambda: {
|
||||
BuiltinNodeTypes.TOOL: lambda: {
|
||||
"tool_file_manager_factory": self._http_request_tool_file_manager_factory(),
|
||||
},
|
||||
BuiltinNodeTypes.AGENT: lambda: {
|
||||
"strategy_resolver": self._agent_strategy_resolver,
|
||||
"presentation_provider": self._agent_strategy_presentation_provider,
|
||||
"runtime_support": self._agent_runtime_support,
|
||||
"message_transformer": self._agent_message_transformer,
|
||||
},
|
||||
}
|
||||
node_init_kwargs = node_init_kwargs_factories.get(node_type, lambda: {})()
|
||||
return node_class(
|
||||
@@ -238,16 +368,7 @@ class DifyNodeFactory(NodeFactory):
|
||||
|
||||
@staticmethod
|
||||
def _resolve_node_class(*, node_type: NodeType, node_version: str) -> type[Node]:
|
||||
node_mapping = NODE_TYPE_CLASSES_MAPPING.get(node_type)
|
||||
if not node_mapping:
|
||||
raise ValueError(f"No class mapping found for node type: {node_type}")
|
||||
|
||||
latest_node_class = node_mapping.get(LATEST_VERSION)
|
||||
matched_node_class = node_mapping.get(node_version)
|
||||
node_class = matched_node_class or latest_node_class
|
||||
if not node_class:
|
||||
raise ValueError(f"No latest version class found for node type: {node_type}")
|
||||
return node_class
|
||||
return resolve_workflow_node_class(node_type=node_type, node_version=node_version)
|
||||
|
||||
def _build_llm_compatible_node_init_kwargs(
|
||||
self,
|
||||
|
||||
1
api/core/workflow/nodes/__init__.py
Normal file
1
api/core/workflow/nodes/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Workflow node implementations that remain under the legacy core.workflow namespace."""
|
||||
4
api/core/workflow/nodes/agent/__init__.py
Normal file
4
api/core/workflow/nodes/agent/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .agent_node import AgentNode
|
||||
from .entities import AgentNodeData
|
||||
|
||||
__all__ = ["AgentNode", "AgentNodeData"]
|
||||
188
api/core/workflow/nodes/agent/agent_node.py
Normal file
188
api/core/workflow/nodes/agent/agent_node.py
Normal file
@@ -0,0 +1,188 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator, Mapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from dify_graph.entities.graph_config import NodeConfigDict
|
||||
from dify_graph.enums import BuiltinNodeTypes, SystemVariableKey, WorkflowNodeExecutionStatus
|
||||
from dify_graph.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent
|
||||
from dify_graph.nodes.base.node import Node
|
||||
from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser
|
||||
|
||||
from .entities import AgentNodeData
|
||||
from .exceptions import (
|
||||
AgentInvocationError,
|
||||
AgentMessageTransformError,
|
||||
)
|
||||
from .message_transformer import AgentMessageTransformer
|
||||
from .runtime_support import AgentRuntimeSupport
|
||||
from .strategy_protocols import AgentStrategyPresentationProvider, AgentStrategyResolver
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from dify_graph.entities import GraphInitParams
|
||||
from dify_graph.runtime import GraphRuntimeState
|
||||
|
||||
|
||||
class AgentNode(Node[AgentNodeData]):
|
||||
node_type = BuiltinNodeTypes.AGENT
|
||||
|
||||
_strategy_resolver: AgentStrategyResolver
|
||||
_presentation_provider: AgentStrategyPresentationProvider
|
||||
_runtime_support: AgentRuntimeSupport
|
||||
_message_transformer: AgentMessageTransformer
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
id: str,
|
||||
config: NodeConfigDict,
|
||||
graph_init_params: GraphInitParams,
|
||||
graph_runtime_state: GraphRuntimeState,
|
||||
*,
|
||||
strategy_resolver: AgentStrategyResolver,
|
||||
presentation_provider: AgentStrategyPresentationProvider,
|
||||
runtime_support: AgentRuntimeSupport,
|
||||
message_transformer: AgentMessageTransformer,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
id=id,
|
||||
config=config,
|
||||
graph_init_params=graph_init_params,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
)
|
||||
self._strategy_resolver = strategy_resolver
|
||||
self._presentation_provider = presentation_provider
|
||||
self._runtime_support = runtime_support
|
||||
self._message_transformer = message_transformer
|
||||
|
||||
@classmethod
|
||||
def version(cls) -> str:
|
||||
return "1"
|
||||
|
||||
def populate_start_event(self, event) -> None:
|
||||
dify_ctx = self.require_dify_context()
|
||||
event.extras["agent_strategy"] = {
|
||||
"name": self.node_data.agent_strategy_name,
|
||||
"icon": self._presentation_provider.get_icon(
|
||||
tenant_id=dify_ctx.tenant_id,
|
||||
agent_strategy_provider_name=self.node_data.agent_strategy_provider_name,
|
||||
),
|
||||
}
|
||||
|
||||
def _run(self) -> Generator[NodeEventBase, None, None]:
|
||||
from core.plugin.impl.exc import PluginDaemonClientSideError
|
||||
|
||||
dify_ctx = self.require_dify_context()
|
||||
|
||||
try:
|
||||
strategy = self._strategy_resolver.resolve(
|
||||
tenant_id=dify_ctx.tenant_id,
|
||||
agent_strategy_provider_name=self.node_data.agent_strategy_provider_name,
|
||||
agent_strategy_name=self.node_data.agent_strategy_name,
|
||||
)
|
||||
except Exception as e:
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.FAILED,
|
||||
inputs={},
|
||||
error=f"Failed to get agent strategy: {str(e)}",
|
||||
),
|
||||
)
|
||||
return
|
||||
|
||||
agent_parameters = strategy.get_parameters()
|
||||
|
||||
parameters = self._runtime_support.build_parameters(
|
||||
agent_parameters=agent_parameters,
|
||||
variable_pool=self.graph_runtime_state.variable_pool,
|
||||
node_data=self.node_data,
|
||||
strategy=strategy,
|
||||
tenant_id=dify_ctx.tenant_id,
|
||||
app_id=dify_ctx.app_id,
|
||||
invoke_from=dify_ctx.invoke_from,
|
||||
)
|
||||
parameters_for_log = self._runtime_support.build_parameters(
|
||||
agent_parameters=agent_parameters,
|
||||
variable_pool=self.graph_runtime_state.variable_pool,
|
||||
node_data=self.node_data,
|
||||
strategy=strategy,
|
||||
tenant_id=dify_ctx.tenant_id,
|
||||
app_id=dify_ctx.app_id,
|
||||
invoke_from=dify_ctx.invoke_from,
|
||||
for_log=True,
|
||||
)
|
||||
credentials = self._runtime_support.build_credentials(parameters=parameters)
|
||||
|
||||
conversation_id = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.CONVERSATION_ID])
|
||||
|
||||
try:
|
||||
message_stream = strategy.invoke(
|
||||
params=parameters,
|
||||
user_id=dify_ctx.user_id,
|
||||
app_id=dify_ctx.app_id,
|
||||
conversation_id=conversation_id.text if conversation_id else None,
|
||||
credentials=credentials,
|
||||
)
|
||||
except Exception as e:
|
||||
error = AgentInvocationError(f"Failed to invoke agent: {str(e)}", original_error=e)
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.FAILED,
|
||||
inputs=parameters_for_log,
|
||||
error=str(error),
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
yield from self._message_transformer.transform(
|
||||
messages=message_stream,
|
||||
tool_info={
|
||||
"icon": self._presentation_provider.get_icon(
|
||||
tenant_id=dify_ctx.tenant_id,
|
||||
agent_strategy_provider_name=self.node_data.agent_strategy_provider_name,
|
||||
),
|
||||
"agent_strategy": self.node_data.agent_strategy_name,
|
||||
},
|
||||
parameters_for_log=parameters_for_log,
|
||||
user_id=dify_ctx.user_id,
|
||||
tenant_id=dify_ctx.tenant_id,
|
||||
node_type=self.node_type,
|
||||
node_id=self._node_id,
|
||||
node_execution_id=self.id,
|
||||
)
|
||||
except PluginDaemonClientSideError as e:
|
||||
transform_error = AgentMessageTransformError(
|
||||
f"Failed to transform agent message: {str(e)}", original_error=e
|
||||
)
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.FAILED,
|
||||
inputs=parameters_for_log,
|
||||
error=str(transform_error),
|
||||
)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _extract_variable_selector_to_variable_mapping(
|
||||
cls,
|
||||
*,
|
||||
graph_config: Mapping[str, Any],
|
||||
node_id: str,
|
||||
node_data: AgentNodeData,
|
||||
) -> Mapping[str, Sequence[str]]:
|
||||
_ = graph_config # Explicitly mark as unused
|
||||
result: dict[str, Any] = {}
|
||||
typed_node_data = node_data
|
||||
for parameter_name in typed_node_data.agent_parameters:
|
||||
input = typed_node_data.agent_parameters[parameter_name]
|
||||
match input.type:
|
||||
case "mixed" | "constant":
|
||||
selectors = VariableTemplateParser(str(input.value)).extract_variable_selectors()
|
||||
for selector in selectors:
|
||||
result[selector.variable] = selector.value_selector
|
||||
case "variable":
|
||||
result[parameter_name] = input.value
|
||||
|
||||
result = {node_id + "." + key: value for key, value in result.items()}
|
||||
|
||||
return result
|
||||
@@ -6,14 +6,14 @@ from pydantic import BaseModel
|
||||
from core.prompt.entities.advanced_prompt_entities import MemoryConfig
|
||||
from core.tools.entities.tool_entities import ToolSelector
|
||||
from dify_graph.entities.base_node_data import BaseNodeData
|
||||
from dify_graph.enums import NodeType
|
||||
from dify_graph.enums import BuiltinNodeTypes, NodeType
|
||||
|
||||
|
||||
class AgentNodeData(BaseNodeData):
|
||||
type: NodeType = NodeType.AGENT
|
||||
agent_strategy_provider_name: str # redundancy
|
||||
type: NodeType = BuiltinNodeTypes.AGENT
|
||||
agent_strategy_provider_name: str
|
||||
agent_strategy_name: str
|
||||
agent_strategy_label: str # redundancy
|
||||
agent_strategy_label: str
|
||||
memory: MemoryConfig | None = None
|
||||
# The version of the tool parameter.
|
||||
# If this value is None, it indicates this is a previous version
|
||||
@@ -119,14 +119,3 @@ class AgentVariableTypeError(AgentNodeError):
|
||||
self.expected_type = expected_type
|
||||
self.actual_type = actual_type
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class AgentMaxIterationError(AgentNodeError):
|
||||
"""Exception raised when the agent exceeds the maximum iteration limit."""
|
||||
|
||||
def __init__(self, max_iteration: int):
|
||||
self.max_iteration = max_iteration
|
||||
super().__init__(
|
||||
f"Agent exceeded the maximum iteration limit of {max_iteration}. "
|
||||
f"The agent was unable to complete the task within the allowed number of iterations."
|
||||
)
|
||||
292
api/core/workflow/nodes/agent/message_transformer.py
Normal file
292
api/core/workflow/nodes/agent/message_transformer.py
Normal file
@@ -0,0 +1,292 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator, Mapping
|
||||
from typing import Any, cast
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.tools.entities.tool_entities import ToolInvokeMessage
|
||||
from core.tools.utils.message_transformer import ToolFileMessageTransformer
|
||||
from dify_graph.enums import BuiltinNodeTypes, NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
|
||||
from dify_graph.file import File, FileTransferMethod
|
||||
from dify_graph.model_runtime.entities.llm_entities import LLMUsage, LLMUsageMetadata
|
||||
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
|
||||
from dify_graph.node_events import (
|
||||
AgentLogEvent,
|
||||
NodeEventBase,
|
||||
NodeRunResult,
|
||||
StreamChunkEvent,
|
||||
StreamCompletedEvent,
|
||||
)
|
||||
from dify_graph.variables.segments import ArrayFileSegment
|
||||
from extensions.ext_database import db
|
||||
from factories import file_factory
|
||||
from models import ToolFile
|
||||
from services.tools.builtin_tools_manage_service import BuiltinToolManageService
|
||||
|
||||
from .exceptions import AgentNodeError, AgentVariableTypeError, ToolFileNotFoundError
|
||||
|
||||
|
||||
class AgentMessageTransformer:
|
||||
def transform(
|
||||
self,
|
||||
*,
|
||||
messages: Generator[ToolInvokeMessage, None, None],
|
||||
tool_info: Mapping[str, Any],
|
||||
parameters_for_log: dict[str, Any],
|
||||
user_id: str,
|
||||
tenant_id: str,
|
||||
node_type: NodeType,
|
||||
node_id: str,
|
||||
node_execution_id: str,
|
||||
) -> Generator[NodeEventBase, None, None]:
|
||||
from core.plugin.impl.plugin import PluginInstaller
|
||||
|
||||
message_stream = ToolFileMessageTransformer.transform_tool_invoke_messages(
|
||||
messages=messages,
|
||||
user_id=user_id,
|
||||
tenant_id=tenant_id,
|
||||
conversation_id=None,
|
||||
)
|
||||
|
||||
text = ""
|
||||
files: list[File] = []
|
||||
json_list: list[dict | list] = []
|
||||
|
||||
agent_logs: list[AgentLogEvent] = []
|
||||
agent_execution_metadata: Mapping[WorkflowNodeExecutionMetadataKey, Any] = {}
|
||||
llm_usage = LLMUsage.empty_usage()
|
||||
variables: dict[str, Any] = {}
|
||||
|
||||
for message in message_stream:
|
||||
if message.type in {
|
||||
ToolInvokeMessage.MessageType.IMAGE_LINK,
|
||||
ToolInvokeMessage.MessageType.BINARY_LINK,
|
||||
ToolInvokeMessage.MessageType.IMAGE,
|
||||
}:
|
||||
assert isinstance(message.message, ToolInvokeMessage.TextMessage)
|
||||
|
||||
url = message.message.text
|
||||
if message.meta:
|
||||
transfer_method = message.meta.get("transfer_method", FileTransferMethod.TOOL_FILE)
|
||||
else:
|
||||
transfer_method = FileTransferMethod.TOOL_FILE
|
||||
|
||||
tool_file_id = str(url).split("/")[-1].split(".")[0]
|
||||
|
||||
with Session(db.engine) as session:
|
||||
stmt = select(ToolFile).where(ToolFile.id == tool_file_id)
|
||||
tool_file = session.scalar(stmt)
|
||||
if tool_file is None:
|
||||
raise ToolFileNotFoundError(tool_file_id)
|
||||
|
||||
mapping = {
|
||||
"tool_file_id": tool_file_id,
|
||||
"type": file_factory.get_file_type_by_mime_type(tool_file.mimetype),
|
||||
"transfer_method": transfer_method,
|
||||
"url": url,
|
||||
}
|
||||
file = file_factory.build_from_mapping(
|
||||
mapping=mapping,
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
files.append(file)
|
||||
elif message.type == ToolInvokeMessage.MessageType.BLOB:
|
||||
assert isinstance(message.message, ToolInvokeMessage.TextMessage)
|
||||
assert message.meta
|
||||
|
||||
tool_file_id = message.message.text.split("/")[-1].split(".")[0]
|
||||
with Session(db.engine) as session:
|
||||
stmt = select(ToolFile).where(ToolFile.id == tool_file_id)
|
||||
tool_file = session.scalar(stmt)
|
||||
if tool_file is None:
|
||||
raise ToolFileNotFoundError(tool_file_id)
|
||||
|
||||
mapping = {
|
||||
"tool_file_id": tool_file_id,
|
||||
"transfer_method": FileTransferMethod.TOOL_FILE,
|
||||
}
|
||||
files.append(
|
||||
file_factory.build_from_mapping(
|
||||
mapping=mapping,
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
)
|
||||
elif message.type == ToolInvokeMessage.MessageType.TEXT:
|
||||
assert isinstance(message.message, ToolInvokeMessage.TextMessage)
|
||||
text += message.message.text
|
||||
yield StreamChunkEvent(
|
||||
selector=[node_id, "text"],
|
||||
chunk=message.message.text,
|
||||
is_final=False,
|
||||
)
|
||||
elif message.type == ToolInvokeMessage.MessageType.JSON:
|
||||
assert isinstance(message.message, ToolInvokeMessage.JsonMessage)
|
||||
if node_type == BuiltinNodeTypes.AGENT:
|
||||
if isinstance(message.message.json_object, dict):
|
||||
msg_metadata: dict[str, Any] = message.message.json_object.pop("execution_metadata", {})
|
||||
llm_usage = LLMUsage.from_metadata(cast(LLMUsageMetadata, msg_metadata))
|
||||
agent_execution_metadata = {
|
||||
WorkflowNodeExecutionMetadataKey(key): value
|
||||
for key, value in msg_metadata.items()
|
||||
if key in WorkflowNodeExecutionMetadataKey.__members__.values()
|
||||
}
|
||||
else:
|
||||
llm_usage = LLMUsage.empty_usage()
|
||||
agent_execution_metadata = {}
|
||||
if message.message.json_object:
|
||||
json_list.append(message.message.json_object)
|
||||
elif message.type == ToolInvokeMessage.MessageType.LINK:
|
||||
assert isinstance(message.message, ToolInvokeMessage.TextMessage)
|
||||
stream_text = f"Link: {message.message.text}\n"
|
||||
text += stream_text
|
||||
yield StreamChunkEvent(
|
||||
selector=[node_id, "text"],
|
||||
chunk=stream_text,
|
||||
is_final=False,
|
||||
)
|
||||
elif message.type == ToolInvokeMessage.MessageType.VARIABLE:
|
||||
assert isinstance(message.message, ToolInvokeMessage.VariableMessage)
|
||||
variable_name = message.message.variable_name
|
||||
variable_value = message.message.variable_value
|
||||
if message.message.stream:
|
||||
if not isinstance(variable_value, str):
|
||||
raise AgentVariableTypeError(
|
||||
"When 'stream' is True, 'variable_value' must be a string.",
|
||||
variable_name=variable_name,
|
||||
expected_type="str",
|
||||
actual_type=type(variable_value).__name__,
|
||||
)
|
||||
if variable_name not in variables:
|
||||
variables[variable_name] = ""
|
||||
variables[variable_name] += variable_value
|
||||
|
||||
yield StreamChunkEvent(
|
||||
selector=[node_id, variable_name],
|
||||
chunk=variable_value,
|
||||
is_final=False,
|
||||
)
|
||||
else:
|
||||
variables[variable_name] = variable_value
|
||||
elif message.type == ToolInvokeMessage.MessageType.FILE:
|
||||
assert message.meta is not None
|
||||
assert isinstance(message.meta, dict)
|
||||
if "file" not in message.meta:
|
||||
raise AgentNodeError("File message is missing 'file' key in meta")
|
||||
|
||||
if not isinstance(message.meta["file"], File):
|
||||
raise AgentNodeError(f"Expected File object but got {type(message.meta['file']).__name__}")
|
||||
files.append(message.meta["file"])
|
||||
elif message.type == ToolInvokeMessage.MessageType.LOG:
|
||||
assert isinstance(message.message, ToolInvokeMessage.LogMessage)
|
||||
if message.message.metadata:
|
||||
icon = tool_info.get("icon", "")
|
||||
dict_metadata = dict(message.message.metadata)
|
||||
if dict_metadata.get("provider"):
|
||||
manager = PluginInstaller()
|
||||
plugins = manager.list_plugins(tenant_id)
|
||||
try:
|
||||
current_plugin = next(
|
||||
plugin
|
||||
for plugin in plugins
|
||||
if f"{plugin.plugin_id}/{plugin.name}" == dict_metadata["provider"]
|
||||
)
|
||||
icon = current_plugin.declaration.icon
|
||||
except StopIteration:
|
||||
pass
|
||||
icon_dark = None
|
||||
try:
|
||||
builtin_tool = next(
|
||||
provider
|
||||
for provider in BuiltinToolManageService.list_builtin_tools(
|
||||
user_id,
|
||||
tenant_id,
|
||||
)
|
||||
if provider.name == dict_metadata["provider"]
|
||||
)
|
||||
icon = builtin_tool.icon
|
||||
icon_dark = builtin_tool.icon_dark
|
||||
except StopIteration:
|
||||
pass
|
||||
|
||||
dict_metadata["icon"] = icon
|
||||
dict_metadata["icon_dark"] = icon_dark
|
||||
message.message.metadata = dict_metadata
|
||||
agent_log = AgentLogEvent(
|
||||
message_id=message.message.id,
|
||||
node_execution_id=node_execution_id,
|
||||
parent_id=message.message.parent_id,
|
||||
error=message.message.error,
|
||||
status=message.message.status.value,
|
||||
data=message.message.data,
|
||||
label=message.message.label,
|
||||
metadata=message.message.metadata,
|
||||
node_id=node_id,
|
||||
)
|
||||
|
||||
for log in agent_logs:
|
||||
if log.message_id == agent_log.message_id:
|
||||
log.data = agent_log.data
|
||||
log.status = agent_log.status
|
||||
log.error = agent_log.error
|
||||
log.label = agent_log.label
|
||||
log.metadata = agent_log.metadata
|
||||
break
|
||||
else:
|
||||
agent_logs.append(agent_log)
|
||||
|
||||
yield agent_log
|
||||
|
||||
json_output: list[dict[str, Any] | list[Any]] = []
|
||||
if agent_logs:
|
||||
for log in agent_logs:
|
||||
json_output.append(
|
||||
{
|
||||
"id": log.message_id,
|
||||
"parent_id": log.parent_id,
|
||||
"error": log.error,
|
||||
"status": log.status,
|
||||
"data": log.data,
|
||||
"label": log.label,
|
||||
"metadata": log.metadata,
|
||||
"node_id": log.node_id,
|
||||
}
|
||||
)
|
||||
if json_list:
|
||||
json_output.extend(json_list)
|
||||
else:
|
||||
json_output.append({"data": []})
|
||||
|
||||
yield StreamChunkEvent(
|
||||
selector=[node_id, "text"],
|
||||
chunk="",
|
||||
is_final=True,
|
||||
)
|
||||
|
||||
for var_name in variables:
|
||||
yield StreamChunkEvent(
|
||||
selector=[node_id, var_name],
|
||||
chunk="",
|
||||
is_final=True,
|
||||
)
|
||||
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
outputs={
|
||||
"text": text,
|
||||
"usage": jsonable_encoder(llm_usage),
|
||||
"files": ArrayFileSegment(value=files),
|
||||
"json": json_output,
|
||||
**variables,
|
||||
},
|
||||
metadata={
|
||||
**agent_execution_metadata,
|
||||
WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info,
|
||||
WorkflowNodeExecutionMetadataKey.AGENT_LOG: agent_logs,
|
||||
},
|
||||
inputs=parameters_for_log,
|
||||
llm_usage=llm_usage,
|
||||
)
|
||||
)
|
||||
40
api/core/workflow/nodes/agent/plugin_strategy_adapter.py
Normal file
40
api/core/workflow/nodes/agent/plugin_strategy_adapter.py
Normal file
@@ -0,0 +1,40 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from factories.agent_factory import get_plugin_agent_strategy
|
||||
|
||||
from .strategy_protocols import AgentStrategyPresentationProvider, AgentStrategyResolver, ResolvedAgentStrategy
|
||||
|
||||
|
||||
class PluginAgentStrategyResolver(AgentStrategyResolver):
|
||||
def resolve(
|
||||
self,
|
||||
*,
|
||||
tenant_id: str,
|
||||
agent_strategy_provider_name: str,
|
||||
agent_strategy_name: str,
|
||||
) -> ResolvedAgentStrategy:
|
||||
return get_plugin_agent_strategy(
|
||||
tenant_id=tenant_id,
|
||||
agent_strategy_provider_name=agent_strategy_provider_name,
|
||||
agent_strategy_name=agent_strategy_name,
|
||||
)
|
||||
|
||||
|
||||
class PluginAgentStrategyPresentationProvider(AgentStrategyPresentationProvider):
|
||||
def get_icon(self, *, tenant_id: str, agent_strategy_provider_name: str) -> str | None:
|
||||
from core.plugin.impl.plugin import PluginInstaller
|
||||
|
||||
manager = PluginInstaller()
|
||||
try:
|
||||
plugins = manager.list_plugins(tenant_id)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
try:
|
||||
current_plugin = next(
|
||||
plugin for plugin in plugins if f"{plugin.plugin_id}/{plugin.name}" == agent_strategy_provider_name
|
||||
)
|
||||
except StopIteration:
|
||||
return None
|
||||
|
||||
return current_plugin.declaration.icon
|
||||
276
api/core/workflow/nodes/agent/runtime_support.py
Normal file
276
api/core/workflow/nodes/agent/runtime_support.py
Normal file
@@ -0,0 +1,276 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from collections.abc import Sequence
|
||||
from typing import Any, cast
|
||||
|
||||
from packaging.version import Version
|
||||
from pydantic import ValidationError
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.agent.entities import AgentToolEntity
|
||||
from core.agent.plugin_entities import AgentStrategyParameter
|
||||
from core.memory.token_buffer_memory import TokenBufferMemory
|
||||
from core.model_manager import ModelInstance, ModelManager
|
||||
from core.plugin.entities.request import InvokeCredentials
|
||||
from core.provider_manager import ProviderManager
|
||||
from core.tools.entities.tool_entities import ToolIdentity, ToolParameter, ToolProviderType
|
||||
from core.tools.tool_manager import ToolManager
|
||||
from dify_graph.enums import SystemVariableKey
|
||||
from dify_graph.model_runtime.entities.model_entities import AIModelEntity, ModelType
|
||||
from dify_graph.runtime import VariablePool
|
||||
from dify_graph.variables.segments import StringSegment
|
||||
from extensions.ext_database import db
|
||||
from models.model import Conversation
|
||||
|
||||
from .entities import AgentNodeData, AgentOldVersionModelFeatures, ParamsAutoGenerated
|
||||
from .exceptions import AgentInputTypeError, AgentVariableNotFoundError
|
||||
from .strategy_protocols import ResolvedAgentStrategy
|
||||
|
||||
|
||||
class AgentRuntimeSupport:
|
||||
def build_parameters(
|
||||
self,
|
||||
*,
|
||||
agent_parameters: Sequence[AgentStrategyParameter],
|
||||
variable_pool: VariablePool,
|
||||
node_data: AgentNodeData,
|
||||
strategy: ResolvedAgentStrategy,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
invoke_from: Any,
|
||||
for_log: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
agent_parameters_dictionary = {parameter.name: parameter for parameter in agent_parameters}
|
||||
|
||||
result: dict[str, Any] = {}
|
||||
for parameter_name in node_data.agent_parameters:
|
||||
parameter = agent_parameters_dictionary.get(parameter_name)
|
||||
if not parameter:
|
||||
result[parameter_name] = None
|
||||
continue
|
||||
|
||||
agent_input = node_data.agent_parameters[parameter_name]
|
||||
match agent_input.type:
|
||||
case "variable":
|
||||
variable = variable_pool.get(agent_input.value) # type: ignore[arg-type]
|
||||
if variable is None:
|
||||
raise AgentVariableNotFoundError(str(agent_input.value))
|
||||
parameter_value = variable.value
|
||||
case "mixed" | "constant":
|
||||
try:
|
||||
if not isinstance(agent_input.value, str):
|
||||
parameter_value = json.dumps(agent_input.value, ensure_ascii=False)
|
||||
else:
|
||||
parameter_value = str(agent_input.value)
|
||||
except TypeError:
|
||||
parameter_value = str(agent_input.value)
|
||||
|
||||
segment_group = variable_pool.convert_template(parameter_value)
|
||||
parameter_value = segment_group.log if for_log else segment_group.text
|
||||
try:
|
||||
if not isinstance(agent_input.value, str):
|
||||
parameter_value = json.loads(parameter_value)
|
||||
except json.JSONDecodeError:
|
||||
parameter_value = parameter_value
|
||||
case _:
|
||||
raise AgentInputTypeError(agent_input.type)
|
||||
|
||||
value = parameter_value
|
||||
if parameter.type == "array[tools]":
|
||||
value = cast(list[dict[str, Any]], value)
|
||||
value = [tool for tool in value if tool.get("enabled", False)]
|
||||
value = self._filter_mcp_type_tool(strategy, value)
|
||||
for tool in value:
|
||||
if "schemas" in tool:
|
||||
tool.pop("schemas")
|
||||
parameters = tool.get("parameters", {})
|
||||
if all(isinstance(v, dict) for _, v in parameters.items()):
|
||||
params = {}
|
||||
for key, param in parameters.items():
|
||||
if param.get("auto", ParamsAutoGenerated.OPEN) in (
|
||||
ParamsAutoGenerated.CLOSE,
|
||||
0,
|
||||
):
|
||||
value_param = param.get("value", {})
|
||||
if value_param and value_param.get("type", "") == "variable":
|
||||
variable_selector = value_param.get("value")
|
||||
if not variable_selector:
|
||||
raise ValueError("Variable selector is missing for a variable-type parameter.")
|
||||
|
||||
variable = variable_pool.get(variable_selector)
|
||||
if variable is None:
|
||||
raise AgentVariableNotFoundError(str(variable_selector))
|
||||
|
||||
params[key] = variable.value
|
||||
else:
|
||||
params[key] = value_param.get("value", "") if value_param is not None else None
|
||||
else:
|
||||
params[key] = None
|
||||
parameters = params
|
||||
tool["settings"] = {k: v.get("value", None) for k, v in tool.get("settings", {}).items()}
|
||||
tool["parameters"] = parameters
|
||||
|
||||
if not for_log:
|
||||
if parameter.type == "array[tools]":
|
||||
value = cast(list[dict[str, Any]], value)
|
||||
tool_value = []
|
||||
for tool in value:
|
||||
provider_type = ToolProviderType(tool.get("type", ToolProviderType.BUILT_IN))
|
||||
setting_params = tool.get("settings", {})
|
||||
parameters = tool.get("parameters", {})
|
||||
manual_input_params = [key for key, value in parameters.items() if value is not None]
|
||||
|
||||
parameters = {**parameters, **setting_params}
|
||||
entity = AgentToolEntity(
|
||||
provider_id=tool.get("provider_name", ""),
|
||||
provider_type=provider_type,
|
||||
tool_name=tool.get("tool_name", ""),
|
||||
tool_parameters=parameters,
|
||||
plugin_unique_identifier=tool.get("plugin_unique_identifier", None),
|
||||
credential_id=tool.get("credential_id", None),
|
||||
)
|
||||
|
||||
extra = tool.get("extra", {})
|
||||
|
||||
runtime_variable_pool: VariablePool | None = None
|
||||
if node_data.version != "1" or node_data.tool_node_version is not None:
|
||||
runtime_variable_pool = variable_pool
|
||||
tool_runtime = ToolManager.get_agent_tool_runtime(
|
||||
tenant_id,
|
||||
app_id,
|
||||
entity,
|
||||
invoke_from,
|
||||
runtime_variable_pool,
|
||||
)
|
||||
if tool_runtime.entity.description:
|
||||
tool_runtime.entity.description.llm = (
|
||||
extra.get("description", "") or tool_runtime.entity.description.llm
|
||||
)
|
||||
for tool_runtime_params in tool_runtime.entity.parameters:
|
||||
tool_runtime_params.form = (
|
||||
ToolParameter.ToolParameterForm.FORM
|
||||
if tool_runtime_params.name in manual_input_params
|
||||
else tool_runtime_params.form
|
||||
)
|
||||
manual_input_value = {}
|
||||
if tool_runtime.entity.parameters:
|
||||
manual_input_value = {
|
||||
key: value for key, value in parameters.items() if key in manual_input_params
|
||||
}
|
||||
runtime_parameters = {
|
||||
**tool_runtime.runtime.runtime_parameters,
|
||||
**manual_input_value,
|
||||
}
|
||||
tool_value.append(
|
||||
{
|
||||
**tool_runtime.entity.model_dump(mode="json"),
|
||||
"runtime_parameters": runtime_parameters,
|
||||
"credential_id": tool.get("credential_id", None),
|
||||
"provider_type": provider_type.value,
|
||||
}
|
||||
)
|
||||
value = tool_value
|
||||
if parameter.type == AgentStrategyParameter.AgentStrategyParameterType.MODEL_SELECTOR:
|
||||
value = cast(dict[str, Any], value)
|
||||
model_instance, model_schema = self.fetch_model(tenant_id=tenant_id, value=value)
|
||||
history_prompt_messages = []
|
||||
if node_data.memory:
|
||||
memory = self.fetch_memory(
|
||||
variable_pool=variable_pool,
|
||||
app_id=app_id,
|
||||
model_instance=model_instance,
|
||||
)
|
||||
if memory:
|
||||
prompt_messages = memory.get_history_prompt_messages(
|
||||
message_limit=node_data.memory.window.size or None
|
||||
)
|
||||
history_prompt_messages = [
|
||||
prompt_message.model_dump(mode="json") for prompt_message in prompt_messages
|
||||
]
|
||||
value["history_prompt_messages"] = history_prompt_messages
|
||||
if model_schema:
|
||||
model_schema = self._remove_unsupported_model_features_for_old_version(model_schema)
|
||||
value["entity"] = model_schema.model_dump(mode="json")
|
||||
else:
|
||||
value["entity"] = None
|
||||
result[parameter_name] = value
|
||||
|
||||
return result
|
||||
|
||||
def build_credentials(self, *, parameters: dict[str, Any]) -> InvokeCredentials:
|
||||
credentials = InvokeCredentials()
|
||||
credentials.tool_credentials = {}
|
||||
for tool in parameters.get("tools", []):
|
||||
if not tool.get("credential_id"):
|
||||
continue
|
||||
try:
|
||||
identity = ToolIdentity.model_validate(tool.get("identity", {}))
|
||||
except ValidationError:
|
||||
continue
|
||||
credentials.tool_credentials[identity.provider] = tool.get("credential_id", None)
|
||||
return credentials
|
||||
|
||||
def fetch_memory(
|
||||
self,
|
||||
*,
|
||||
variable_pool: VariablePool,
|
||||
app_id: str,
|
||||
model_instance: ModelInstance,
|
||||
) -> TokenBufferMemory | None:
|
||||
conversation_id_variable = variable_pool.get(["sys", SystemVariableKey.CONVERSATION_ID])
|
||||
if not isinstance(conversation_id_variable, StringSegment):
|
||||
return None
|
||||
conversation_id = conversation_id_variable.value
|
||||
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
stmt = select(Conversation).where(Conversation.app_id == app_id, Conversation.id == conversation_id)
|
||||
conversation = session.scalar(stmt)
|
||||
if not conversation:
|
||||
return None
|
||||
|
||||
return TokenBufferMemory(conversation=conversation, model_instance=model_instance)
|
||||
|
||||
def fetch_model(self, *, tenant_id: str, value: dict[str, Any]) -> tuple[ModelInstance, AIModelEntity | None]:
|
||||
provider_manager = ProviderManager()
|
||||
provider_model_bundle = provider_manager.get_provider_model_bundle(
|
||||
tenant_id=tenant_id,
|
||||
provider=value.get("provider", ""),
|
||||
model_type=ModelType.LLM,
|
||||
)
|
||||
model_name = value.get("model", "")
|
||||
model_credentials = provider_model_bundle.configuration.get_current_credentials(
|
||||
model_type=ModelType.LLM,
|
||||
model=model_name,
|
||||
)
|
||||
provider_name = provider_model_bundle.configuration.provider.provider
|
||||
model_type_instance = provider_model_bundle.model_type_instance
|
||||
model_instance = ModelManager().get_model_instance(
|
||||
tenant_id=tenant_id,
|
||||
provider=provider_name,
|
||||
model_type=ModelType(value.get("model_type", "")),
|
||||
model=model_name,
|
||||
)
|
||||
model_schema = model_type_instance.get_model_schema(model_name, model_credentials)
|
||||
return model_instance, model_schema
|
||||
|
||||
@staticmethod
|
||||
def _remove_unsupported_model_features_for_old_version(model_schema: AIModelEntity) -> AIModelEntity:
|
||||
if model_schema.features:
|
||||
for feature in model_schema.features[:]:
|
||||
try:
|
||||
AgentOldVersionModelFeatures(feature.value)
|
||||
except ValueError:
|
||||
model_schema.features.remove(feature)
|
||||
return model_schema
|
||||
|
||||
@staticmethod
|
||||
def _filter_mcp_type_tool(
|
||||
strategy: ResolvedAgentStrategy,
|
||||
tools: list[dict[str, Any]],
|
||||
) -> list[dict[str, Any]]:
|
||||
meta_version = strategy.meta_version
|
||||
if meta_version and Version(meta_version) > Version("0.0.1"):
|
||||
return tools
|
||||
return [tool for tool in tools if tool.get("type") != ToolProviderType.MCP]
|
||||
39
api/core/workflow/nodes/agent/strategy_protocols.py
Normal file
39
api/core/workflow/nodes/agent/strategy_protocols.py
Normal file
@@ -0,0 +1,39 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator, Sequence
|
||||
from typing import Any, Protocol
|
||||
|
||||
from core.agent.plugin_entities import AgentStrategyParameter
|
||||
from core.plugin.entities.request import InvokeCredentials
|
||||
from core.tools.entities.tool_entities import ToolInvokeMessage
|
||||
|
||||
|
||||
class ResolvedAgentStrategy(Protocol):
|
||||
meta_version: str | None
|
||||
|
||||
def get_parameters(self) -> Sequence[AgentStrategyParameter]: ...
|
||||
|
||||
def invoke(
|
||||
self,
|
||||
*,
|
||||
params: dict[str, Any],
|
||||
user_id: str,
|
||||
conversation_id: str | None = None,
|
||||
app_id: str | None = None,
|
||||
message_id: str | None = None,
|
||||
credentials: InvokeCredentials | None = None,
|
||||
) -> Generator[ToolInvokeMessage, None, None]: ...
|
||||
|
||||
|
||||
class AgentStrategyResolver(Protocol):
|
||||
def resolve(
|
||||
self,
|
||||
*,
|
||||
tenant_id: str,
|
||||
agent_strategy_provider_name: str,
|
||||
agent_strategy_name: str,
|
||||
) -> ResolvedAgentStrategy: ...
|
||||
|
||||
|
||||
class AgentStrategyPresentationProvider(Protocol):
|
||||
def get_icon(self, *, tenant_id: str, agent_strategy_provider_name: str) -> str | None: ...
|
||||
1
api/core/workflow/nodes/datasource/__init__.py
Normal file
1
api/core/workflow/nodes/datasource/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Datasource workflow node package."""
|
||||
@@ -1,22 +1,17 @@
|
||||
from collections.abc import Generator, Mapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from core.datasource.datasource_manager import DatasourceManager
|
||||
from core.datasource.entities.datasource_entities import DatasourceProviderType
|
||||
from core.plugin.impl.exc import PluginDaemonClientSideError
|
||||
from dify_graph.entities.graph_config import NodeConfigDict
|
||||
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
|
||||
from dify_graph.enums import NodeExecutionType, NodeType, SystemVariableKey
|
||||
from dify_graph.enums import BuiltinNodeTypes, NodeExecutionType, SystemVariableKey, WorkflowNodeExecutionMetadataKey
|
||||
from dify_graph.node_events import NodeRunResult, StreamCompletedEvent
|
||||
from dify_graph.nodes.base.node import Node
|
||||
from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser
|
||||
from dify_graph.repositories.datasource_manager_protocol import (
|
||||
DatasourceManagerProtocol,
|
||||
DatasourceParameter,
|
||||
OnlineDriveDownloadFileParam,
|
||||
)
|
||||
|
||||
from ...entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
|
||||
from .entities import DatasourceNodeData
|
||||
from .entities import DatasourceNodeData, DatasourceParameter, OnlineDriveDownloadFileParam
|
||||
from .exc import DatasourceNodeError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -29,7 +24,7 @@ class DatasourceNode(Node[DatasourceNodeData]):
|
||||
Datasource Node
|
||||
"""
|
||||
|
||||
node_type = NodeType.DATASOURCE
|
||||
node_type = BuiltinNodeTypes.DATASOURCE
|
||||
execution_type = NodeExecutionType.ROOT
|
||||
|
||||
def __init__(
|
||||
@@ -38,7 +33,6 @@ class DatasourceNode(Node[DatasourceNodeData]):
|
||||
config: NodeConfigDict,
|
||||
graph_init_params: "GraphInitParams",
|
||||
graph_runtime_state: "GraphRuntimeState",
|
||||
datasource_manager: DatasourceManagerProtocol,
|
||||
):
|
||||
super().__init__(
|
||||
id=id,
|
||||
@@ -46,7 +40,11 @@ class DatasourceNode(Node[DatasourceNodeData]):
|
||||
graph_init_params=graph_init_params,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
)
|
||||
self.datasource_manager = datasource_manager
|
||||
self.datasource_manager = DatasourceManager
|
||||
|
||||
def populate_start_event(self, event) -> None:
|
||||
event.provider_id = f"{self.node_data.plugin_id}/{self.node_data.provider_name}"
|
||||
event.provider_type = self.node_data.provider_type
|
||||
|
||||
def _run(self) -> Generator:
|
||||
"""
|
||||
@@ -4,7 +4,7 @@ from pydantic import BaseModel, field_validator
|
||||
from pydantic_core.core_schema import ValidationInfo
|
||||
|
||||
from dify_graph.entities.base_node_data import BaseNodeData
|
||||
from dify_graph.enums import NodeType
|
||||
from dify_graph.enums import BuiltinNodeTypes, NodeType
|
||||
|
||||
|
||||
class DatasourceEntity(BaseModel):
|
||||
@@ -17,7 +17,7 @@ class DatasourceEntity(BaseModel):
|
||||
|
||||
|
||||
class DatasourceNodeData(BaseNodeData, DatasourceEntity):
|
||||
type: NodeType = NodeType.DATASOURCE
|
||||
type: NodeType = BuiltinNodeTypes.DATASOURCE
|
||||
|
||||
class DatasourceInput(BaseModel):
|
||||
# TODO: check this type
|
||||
@@ -42,3 +42,14 @@ class DatasourceNodeData(BaseNodeData, DatasourceEntity):
|
||||
return typ
|
||||
|
||||
datasource_parameters: dict[str, DatasourceInput] | None = None
|
||||
|
||||
|
||||
class DatasourceParameter(BaseModel):
|
||||
workspace_id: str
|
||||
page_id: str
|
||||
type: str
|
||||
|
||||
|
||||
class OnlineDriveDownloadFileParam(BaseModel):
|
||||
id: str
|
||||
bucket: str
|
||||
@@ -1,25 +1,10 @@
|
||||
from collections.abc import Generator
|
||||
from typing import Any, Protocol
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from dify_graph.file import File
|
||||
from dify_graph.node_events import StreamChunkEvent, StreamCompletedEvent
|
||||
|
||||
|
||||
class DatasourceParameter(BaseModel):
|
||||
workspace_id: str
|
||||
page_id: str
|
||||
type: str
|
||||
|
||||
|
||||
class OnlineDriveDownloadFileParam(BaseModel):
|
||||
id: str
|
||||
bucket: str
|
||||
|
||||
|
||||
class DatasourceFinal(BaseModel):
|
||||
data: dict[str, Any] | None = None
|
||||
from .entities import DatasourceParameter, OnlineDriveDownloadFileParam
|
||||
|
||||
|
||||
class DatasourceManagerProtocol(Protocol):
|
||||
5
api/core/workflow/nodes/knowledge_index/__init__.py
Normal file
5
api/core/workflow/nodes/knowledge_index/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""Knowledge index workflow node package."""
|
||||
|
||||
KNOWLEDGE_INDEX_NODE_TYPE = "knowledge-index"
|
||||
|
||||
__all__ = ["KNOWLEDGE_INDEX_NODE_TYPE"]
|
||||
@@ -3,6 +3,7 @@ from typing import Literal, Union
|
||||
from pydantic import BaseModel
|
||||
|
||||
from core.rag.retrieval.retrieval_methods import RetrievalMethod
|
||||
from core.workflow.nodes.knowledge_index import KNOWLEDGE_INDEX_NODE_TYPE
|
||||
from dify_graph.entities.base_node_data import BaseNodeData
|
||||
from dify_graph.enums import NodeType
|
||||
|
||||
@@ -156,7 +157,7 @@ class KnowledgeIndexNodeData(BaseNodeData):
|
||||
Knowledge index Node Data.
|
||||
"""
|
||||
|
||||
type: NodeType = NodeType.KNOWLEDGE_INDEX
|
||||
type: NodeType = KNOWLEDGE_INDEX_NODE_TYPE
|
||||
chunk_structure: str
|
||||
index_chunk_variable_selector: list[str]
|
||||
indexing_technique: str | None = None
|
||||
@@ -2,14 +2,15 @@ import logging
|
||||
from collections.abc import Mapping
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from core.rag.index_processor.index_processor import IndexProcessor
|
||||
from core.rag.summary_index.summary_index import SummaryIndex
|
||||
from core.workflow.nodes.knowledge_index import KNOWLEDGE_INDEX_NODE_TYPE
|
||||
from dify_graph.entities.graph_config import NodeConfigDict
|
||||
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
|
||||
from dify_graph.enums import NodeExecutionType, NodeType, SystemVariableKey
|
||||
from dify_graph.enums import NodeExecutionType, SystemVariableKey
|
||||
from dify_graph.node_events import NodeRunResult
|
||||
from dify_graph.nodes.base.node import Node
|
||||
from dify_graph.nodes.base.template import Template
|
||||
from dify_graph.repositories.index_processor_protocol import IndexProcessorProtocol
|
||||
from dify_graph.repositories.summary_index_service_protocol import SummaryIndexServiceProtocol
|
||||
|
||||
from .entities import KnowledgeIndexNodeData
|
||||
from .exc import (
|
||||
@@ -25,7 +26,7 @@ _INVOKE_FROM_DEBUGGER = "debugger"
|
||||
|
||||
|
||||
class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
|
||||
node_type = NodeType.KNOWLEDGE_INDEX
|
||||
node_type = KNOWLEDGE_INDEX_NODE_TYPE
|
||||
execution_type = NodeExecutionType.RESPONSE
|
||||
|
||||
def __init__(
|
||||
@@ -34,12 +35,10 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
|
||||
config: NodeConfigDict,
|
||||
graph_init_params: "GraphInitParams",
|
||||
graph_runtime_state: "GraphRuntimeState",
|
||||
index_processor: IndexProcessorProtocol,
|
||||
summary_index_service: SummaryIndexServiceProtocol,
|
||||
) -> None:
|
||||
super().__init__(id, config, graph_init_params, graph_runtime_state)
|
||||
self.index_processor = index_processor
|
||||
self.summary_index_service = summary_index_service
|
||||
self.index_processor = IndexProcessor()
|
||||
self.summary_index_service = SummaryIndex()
|
||||
|
||||
def _run(self) -> NodeRunResult: # type: ignore
|
||||
node_data = self.node_data
|
||||
@@ -5,21 +5,21 @@ from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class PreviewItem(BaseModel):
|
||||
content: str | None = Field(None)
|
||||
child_chunks: list[str] | None = Field(None)
|
||||
summary: str | None = Field(None)
|
||||
content: str | None = Field(default=None)
|
||||
child_chunks: list[str] | None = Field(default=None)
|
||||
summary: str | None = Field(default=None)
|
||||
|
||||
|
||||
class QaPreview(BaseModel):
|
||||
answer: str | None = Field(None)
|
||||
question: str | None = Field(None)
|
||||
answer: str | None = Field(default=None)
|
||||
question: str | None = Field(default=None)
|
||||
|
||||
|
||||
class Preview(BaseModel):
|
||||
chunk_structure: str
|
||||
parent_mode: str | None = Field(None)
|
||||
preview: list[PreviewItem] = Field([])
|
||||
qa_preview: list[QaPreview] = Field([])
|
||||
parent_mode: str | None = Field(default=None)
|
||||
preview: list[PreviewItem] = Field(default_factory=list)
|
||||
qa_preview: list[QaPreview] = Field(default_factory=list)
|
||||
total_segments: int
|
||||
|
||||
|
||||
@@ -39,3 +39,9 @@ class IndexProcessorProtocol(Protocol):
|
||||
def get_preview_output(
|
||||
self, chunks: Any, dataset_id: str, document_id: str, chunk_structure: str, summary_index_setting: dict | None
|
||||
) -> Preview: ...
|
||||
|
||||
|
||||
class SummaryIndexServiceProtocol(Protocol):
|
||||
def generate_and_vectorize_summary(
|
||||
self, dataset_id: str, document_id: str, is_preview: bool, summary_index_setting: dict | None = None
|
||||
) -> None: ...
|
||||
1
api/core/workflow/nodes/knowledge_retrieval/__init__.py
Normal file
1
api/core/workflow/nodes/knowledge_retrieval/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Knowledge retrieval workflow node package."""
|
||||
@@ -4,7 +4,7 @@ from typing import Literal
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from dify_graph.entities.base_node_data import BaseNodeData
|
||||
from dify_graph.enums import NodeType
|
||||
from dify_graph.enums import BuiltinNodeTypes, NodeType
|
||||
from dify_graph.nodes.llm.entities import ModelConfig, VisionConfig
|
||||
|
||||
|
||||
@@ -114,7 +114,7 @@ class KnowledgeRetrievalNodeData(BaseNodeData):
|
||||
Knowledge retrieval Node Data.
|
||||
"""
|
||||
|
||||
type: NodeType = NodeType.KNOWLEDGE_RETRIEVAL
|
||||
type: NodeType = BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL
|
||||
query_variable_selector: list[str] | None | str = None
|
||||
query_attachment_selector: list[str] | None | str = None
|
||||
dataset_ids: list[str]
|
||||
@@ -1,12 +1,19 @@
|
||||
"""Knowledge retrieval workflow node implementation.
|
||||
|
||||
This node now lives under ``core.workflow.nodes`` and is discovered directly by
|
||||
the workflow node registry.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from collections.abc import Mapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any, Literal
|
||||
|
||||
from core.app.app_config.entities import DatasetRetrieveConfigEntity
|
||||
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
|
||||
from dify_graph.entities import GraphInitParams
|
||||
from dify_graph.entities.graph_config import NodeConfigDict
|
||||
from dify_graph.enums import (
|
||||
NodeType,
|
||||
BuiltinNodeTypes,
|
||||
WorkflowNodeExecutionMetadataKey,
|
||||
WorkflowNodeExecutionStatus,
|
||||
)
|
||||
@@ -15,7 +22,6 @@ from dify_graph.model_runtime.utils.encoders import jsonable_encoder
|
||||
from dify_graph.node_events import NodeRunResult
|
||||
from dify_graph.nodes.base import LLMUsageTrackingMixin
|
||||
from dify_graph.nodes.base.node import Node
|
||||
from dify_graph.repositories.rag_retrieval_protocol import KnowledgeRetrievalRequest, RAGRetrievalProtocol, Source
|
||||
from dify_graph.variables import (
|
||||
ArrayFileSegment,
|
||||
FileSegment,
|
||||
@@ -32,6 +38,7 @@ from .exc import (
|
||||
KnowledgeRetrievalNodeError,
|
||||
RateLimitExceededError,
|
||||
)
|
||||
from .retrieval import KnowledgeRetrievalRequest, Source
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from dify_graph.file.models import File
|
||||
@@ -41,7 +48,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeData]):
|
||||
node_type = NodeType.KNOWLEDGE_RETRIEVAL
|
||||
node_type = BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL
|
||||
|
||||
# Instance attributes specific to LLMNode.
|
||||
# Output variable for file
|
||||
@@ -53,7 +60,6 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
|
||||
config: NodeConfigDict,
|
||||
graph_init_params: "GraphInitParams",
|
||||
graph_runtime_state: "GraphRuntimeState",
|
||||
rag_retrieval: RAGRetrievalProtocol,
|
||||
):
|
||||
super().__init__(
|
||||
id=id,
|
||||
@@ -63,7 +69,7 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
|
||||
)
|
||||
# LLM file outputs, used for MultiModal outputs.
|
||||
self._file_outputs = []
|
||||
self._rag_retrieval = rag_retrieval
|
||||
self._rag_retrieval = DatasetRetrieval()
|
||||
|
||||
@classmethod
|
||||
def version(cls):
|
||||
@@ -3,9 +3,10 @@ from typing import Any, Literal, Protocol
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from dify_graph.model_runtime.entities import LLMUsage
|
||||
from dify_graph.nodes.knowledge_retrieval.entities import MetadataFilteringCondition
|
||||
from dify_graph.nodes.llm.entities import ModelConfig
|
||||
|
||||
from .entities import MetadataFilteringCondition
|
||||
|
||||
|
||||
class SourceChildChunk(BaseModel):
|
||||
id: str = Field(default="", description="Child chunk ID")
|
||||
@@ -28,7 +29,7 @@ class SourceMetadata(BaseModel):
|
||||
segment_id: str | None = Field(default=None, description="Segment unique identifier")
|
||||
retriever_from: str = Field(default="workflow", description="Retriever source context")
|
||||
score: float = Field(default=0.0, description="Retrieval relevance score")
|
||||
child_chunks: list[SourceChildChunk] = Field(default=[], description="List of child chunks")
|
||||
child_chunks: list[SourceChildChunk] = Field(default_factory=list, description="List of child chunks")
|
||||
segment_hit_count: int | None = Field(default=0, description="Number of times segment was retrieved")
|
||||
segment_word_count: int | None = Field(default=0, description="Word count of the segment")
|
||||
segment_position: int | None = Field(default=0, description="Position of segment in document")
|
||||
@@ -81,28 +82,7 @@ class KnowledgeRetrievalRequest(BaseModel):
|
||||
|
||||
|
||||
class RAGRetrievalProtocol(Protocol):
|
||||
"""Protocol for RAG-based knowledge retrieval implementations.
|
||||
|
||||
Implementations of this protocol handle knowledge retrieval from datasets
|
||||
including rate limiting, dataset filtering, and document retrieval.
|
||||
"""
|
||||
|
||||
@property
|
||||
def llm_usage(self) -> LLMUsage:
|
||||
"""Return accumulated LLM usage for retrieval operations."""
|
||||
...
|
||||
def llm_usage(self) -> LLMUsage: ...
|
||||
|
||||
def knowledge_retrieval(self, request: KnowledgeRetrievalRequest) -> list[Source]:
|
||||
"""Retrieve knowledge from datasets based on the provided request.
|
||||
|
||||
Args:
|
||||
request: Knowledge retrieval request with search parameters
|
||||
|
||||
Returns:
|
||||
List of sources matching the search criteria
|
||||
|
||||
Raises:
|
||||
RateLimitExceededError: If rate limit is exceeded
|
||||
ModelNotExistError: If specified model doesn't exist
|
||||
"""
|
||||
...
|
||||
def knowledge_retrieval(self, request: KnowledgeRetrievalRequest) -> list[Source]: ...
|
||||
@@ -3,16 +3,18 @@ from typing import Any, Literal, Union
|
||||
|
||||
from pydantic import BaseModel, Field, ValidationInfo, field_validator
|
||||
|
||||
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
|
||||
from core.trigger.entities.entities import EventParameter
|
||||
from dify_graph.entities.base_node_data import BaseNodeData
|
||||
from dify_graph.enums import NodeType
|
||||
from dify_graph.nodes.trigger_plugin.exc import TriggerEventParameterError
|
||||
|
||||
from .exc import TriggerEventParameterError
|
||||
|
||||
|
||||
class TriggerEventNodeData(BaseNodeData):
|
||||
"""Plugin trigger node data"""
|
||||
|
||||
type: NodeType = NodeType.TRIGGER_PLUGIN
|
||||
type: NodeType = TRIGGER_PLUGIN_NODE_TYPE
|
||||
|
||||
class TriggerEventInput(BaseModel):
|
||||
value: Union[Any, list[str]]
|
||||
@@ -1,8 +1,10 @@
|
||||
from collections.abc import Mapping
|
||||
from typing import Any, cast
|
||||
|
||||
from core.trigger.constants import TRIGGER_INFO_METADATA_KEY, TRIGGER_PLUGIN_NODE_TYPE
|
||||
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
|
||||
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
|
||||
from dify_graph.enums import NodeExecutionType, NodeType
|
||||
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
|
||||
from dify_graph.enums import NodeExecutionType, WorkflowNodeExecutionMetadataKey
|
||||
from dify_graph.node_events import NodeRunResult
|
||||
from dify_graph.nodes.base.node import Node
|
||||
|
||||
@@ -10,7 +12,7 @@ from .entities import TriggerEventNodeData
|
||||
|
||||
|
||||
class TriggerEventNode(Node[TriggerEventNodeData]):
|
||||
node_type = NodeType.TRIGGER_PLUGIN
|
||||
node_type = TRIGGER_PLUGIN_NODE_TYPE
|
||||
execution_type = NodeExecutionType.ROOT
|
||||
|
||||
@classmethod
|
||||
@@ -32,6 +34,9 @@ class TriggerEventNode(Node[TriggerEventNodeData]):
|
||||
def version(cls) -> str:
|
||||
return "1"
|
||||
|
||||
def populate_start_event(self, event) -> None:
|
||||
event.provider_id = self.node_data.provider_id
|
||||
|
||||
def _run(self) -> NodeRunResult:
|
||||
"""
|
||||
Run the plugin trigger node.
|
||||
@@ -41,8 +46,8 @@ class TriggerEventNode(Node[TriggerEventNodeData]):
|
||||
"""
|
||||
|
||||
# Get trigger data passed when workflow was triggered
|
||||
metadata = {
|
||||
WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: {
|
||||
metadata: dict[WorkflowNodeExecutionMetadataKey, Any] = {
|
||||
cast(WorkflowNodeExecutionMetadataKey, TRIGGER_INFO_METADATA_KEY): {
|
||||
"provider_id": self.node_data.provider_id,
|
||||
"event_name": self.node_data.event_name,
|
||||
"plugin_unique_identifier": self.node_data.plugin_unique_identifier,
|
||||
3
api/core/workflow/nodes/trigger_schedule/__init__.py
Normal file
3
api/core/workflow/nodes/trigger_schedule/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .trigger_schedule_node import TriggerScheduleNode
|
||||
|
||||
__all__ = ["TriggerScheduleNode"]
|
||||
@@ -2,6 +2,7 @@ from typing import Literal, Union
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
|
||||
from dify_graph.entities.base_node_data import BaseNodeData
|
||||
from dify_graph.enums import NodeType
|
||||
|
||||
@@ -11,7 +12,7 @@ class TriggerScheduleNodeData(BaseNodeData):
|
||||
Trigger Schedule Node Data
|
||||
"""
|
||||
|
||||
type: NodeType = NodeType.TRIGGER_SCHEDULE
|
||||
type: NodeType = TRIGGER_SCHEDULE_NODE_TYPE
|
||||
mode: str = Field(default="visual", description="Schedule mode: visual or cron")
|
||||
frequency: str | None = Field(default=None, description="Frequency for visual mode: hourly, daily, weekly, monthly")
|
||||
cron_expression: str | None = Field(default=None, description="Cron expression for cron mode")
|
||||
@@ -1,15 +1,17 @@
|
||||
from collections.abc import Mapping
|
||||
|
||||
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
|
||||
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
|
||||
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
|
||||
from dify_graph.enums import NodeExecutionType, NodeType
|
||||
from dify_graph.enums import NodeExecutionType
|
||||
from dify_graph.node_events import NodeRunResult
|
||||
from dify_graph.nodes.base.node import Node
|
||||
from dify_graph.nodes.trigger_schedule.entities import TriggerScheduleNodeData
|
||||
|
||||
from .entities import TriggerScheduleNodeData
|
||||
|
||||
|
||||
class TriggerScheduleNode(Node[TriggerScheduleNodeData]):
|
||||
node_type = NodeType.TRIGGER_SCHEDULE
|
||||
node_type = TRIGGER_SCHEDULE_NODE_TYPE
|
||||
execution_type = NodeExecutionType.ROOT
|
||||
|
||||
@classmethod
|
||||
@@ -19,7 +21,7 @@ class TriggerScheduleNode(Node[TriggerScheduleNodeData]):
|
||||
@classmethod
|
||||
def get_default_config(cls, filters: Mapping[str, object] | None = None) -> Mapping[str, object]:
|
||||
return {
|
||||
"type": "trigger-schedule",
|
||||
"type": TRIGGER_SCHEDULE_NODE_TYPE,
|
||||
"config": {
|
||||
"mode": "visual",
|
||||
"frequency": "daily",
|
||||
@@ -3,6 +3,7 @@ from enum import StrEnum
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
from core.trigger.constants import TRIGGER_WEBHOOK_NODE_TYPE
|
||||
from dify_graph.entities.base_node_data import BaseNodeData
|
||||
from dify_graph.enums import NodeType
|
||||
from dify_graph.variables.types import SegmentType
|
||||
@@ -93,7 +94,7 @@ class WebhookData(BaseNodeData):
|
||||
class SyncMode(StrEnum):
|
||||
SYNC = "async" # only support
|
||||
|
||||
type: NodeType = NodeType.TRIGGER_WEBHOOK
|
||||
type: NodeType = TRIGGER_WEBHOOK_NODE_TYPE
|
||||
method: Method = Method.GET
|
||||
content_type: ContentType = Field(default=ContentType.JSON)
|
||||
headers: Sequence[WebhookParameter] = Field(default_factory=list)
|
||||
@@ -2,9 +2,10 @@ import logging
|
||||
from collections.abc import Mapping
|
||||
from typing import Any
|
||||
|
||||
from core.trigger.constants import TRIGGER_WEBHOOK_NODE_TYPE
|
||||
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
|
||||
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
|
||||
from dify_graph.enums import NodeExecutionType, NodeType
|
||||
from dify_graph.enums import NodeExecutionType
|
||||
from dify_graph.file import FileTransferMethod
|
||||
from dify_graph.node_events import NodeRunResult
|
||||
from dify_graph.nodes.base.node import Node
|
||||
@@ -19,7 +20,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TriggerWebhookNode(Node[WebhookData]):
|
||||
node_type = NodeType.TRIGGER_WEBHOOK
|
||||
node_type = TRIGGER_WEBHOOK_NODE_TYPE
|
||||
execution_type = NodeExecutionType.ROOT
|
||||
|
||||
@classmethod
|
||||
@@ -8,7 +8,7 @@ from core.app.apps.exc import GenerateTaskStoppedError
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_dify_run_context
|
||||
from core.app.workflow.layers.llm_quota import LLMQuotaLayer
|
||||
from core.app.workflow.layers.observability import ObservabilityLayer
|
||||
from core.workflow.node_factory import DifyNodeFactory
|
||||
from core.workflow.node_factory import DifyNodeFactory, resolve_workflow_node_class
|
||||
from dify_graph.constants import ENVIRONMENT_VARIABLE_NODE_ID
|
||||
from dify_graph.entities import GraphInitParams
|
||||
from dify_graph.entities.graph_config import NodeConfigDictAdapter
|
||||
@@ -21,9 +21,8 @@ from dify_graph.graph_engine.layers import DebugLoggingLayer, ExecutionLimitsLay
|
||||
from dify_graph.graph_engine.layers.base import GraphEngineLayer
|
||||
from dify_graph.graph_engine.protocols.command_channel import CommandChannel
|
||||
from dify_graph.graph_events import GraphEngineEvent, GraphNodeEventBase, GraphRunFailedEvent
|
||||
from dify_graph.nodes import NodeType
|
||||
from dify_graph.nodes import BuiltinNodeTypes
|
||||
from dify_graph.nodes.base.node import Node
|
||||
from dify_graph.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING
|
||||
from dify_graph.runtime import ChildGraphNotFoundError, GraphRuntimeState, VariablePool
|
||||
from dify_graph.system_variable import SystemVariable
|
||||
from dify_graph.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool
|
||||
@@ -253,7 +252,7 @@ class WorkflowEntry:
|
||||
variable_mapping=variable_mapping,
|
||||
user_inputs=user_inputs,
|
||||
)
|
||||
if node_type != NodeType.DATASOURCE:
|
||||
if node_type != BuiltinNodeTypes.DATASOURCE:
|
||||
cls.mapping_user_inputs_to_variable_pool(
|
||||
variable_mapping=variable_mapping,
|
||||
user_inputs=user_inputs,
|
||||
@@ -303,7 +302,7 @@ class WorkflowEntry:
|
||||
"height": node_height,
|
||||
"type": "custom",
|
||||
"data": {
|
||||
"type": NodeType.START,
|
||||
"type": BuiltinNodeTypes.START,
|
||||
"title": "Start",
|
||||
"desc": "Start",
|
||||
},
|
||||
@@ -339,11 +338,11 @@ class WorkflowEntry:
|
||||
# Create a minimal graph for single node execution
|
||||
graph_dict = cls._create_single_node_graph(node_id, node_data)
|
||||
|
||||
node_type = NodeType(node_data.get("type", ""))
|
||||
if node_type not in {NodeType.PARAMETER_EXTRACTOR, NodeType.QUESTION_CLASSIFIER}:
|
||||
node_type = node_data.get("type", "")
|
||||
if node_type not in {BuiltinNodeTypes.PARAMETER_EXTRACTOR, BuiltinNodeTypes.QUESTION_CLASSIFIER}:
|
||||
raise ValueError(f"Node type {node_type} not supported")
|
||||
|
||||
node_cls = NODE_TYPE_CLASSES_MAPPING[node_type]["1"]
|
||||
node_cls = resolve_workflow_node_class(node_type=node_type, node_version="1")
|
||||
if not node_cls:
|
||||
raise ValueError(f"Node class not found for node type {node_type}")
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user