Compare commits

..

45 Commits

Author SHA1 Message Date
Xiyuan Chen
5a8a68cab8 feat: enterprise otel exporter (#33138)
Some checks are pending
autofix.ci / autofix (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Waiting to run
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Blocked by required conditions
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Blocked by required conditions
Main CI Pipeline / Skip Duplicate Checks (push) Waiting to run
Main CI Pipeline / Check Changed Files (push) Blocked by required conditions
Main CI Pipeline / Run API Tests (push) Blocked by required conditions
Main CI Pipeline / Skip API Tests (push) Blocked by required conditions
Main CI Pipeline / API Tests (push) Blocked by required conditions
Main CI Pipeline / Run Web Tests (push) Blocked by required conditions
Main CI Pipeline / Skip Web Tests (push) Blocked by required conditions
Main CI Pipeline / Web Tests (push) Blocked by required conditions
Main CI Pipeline / Style Check (push) Blocked by required conditions
Main CI Pipeline / Run VDB Tests (push) Blocked by required conditions
Main CI Pipeline / Skip VDB Tests (push) Blocked by required conditions
Main CI Pipeline / VDB Tests (push) Blocked by required conditions
Main CI Pipeline / Run DB Migration Test (push) Blocked by required conditions
Main CI Pipeline / Skip DB Migration Test (push) Blocked by required conditions
Main CI Pipeline / DB Migration Test (push) Blocked by required conditions
Co-authored-by: QuantumGhost <obelisk.reg+git@gmail.com>
Co-authored-by: Yunlu Wen <yunlu.wen@dify.ai>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-27 07:56:31 +00:00
wangxiaolei
689761bfcb feat: return correct dify-plugin-daemon error message (#34171) 2026-03-27 06:02:29 +00:00
Stephen Zhou
2394e45ec7 ci: skip duplicate actions (#34168) 2026-03-27 02:44:57 +00:00
1Ckpwee
01e6a3a9d9 chore(ci): remove Python 3.11 from CI test workflows (#34164) 2026-03-27 02:41:19 +00:00
Stephen Zhou
07f4950cb3 test: use happy dom (#34154)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / Run API Tests (push) Has been cancelled
Main CI Pipeline / Skip API Tests (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Run Web Tests (push) Has been cancelled
Main CI Pipeline / Skip Web Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / Run VDB Tests (push) Has been cancelled
Main CI Pipeline / Skip VDB Tests (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / Run DB Migration Test (push) Has been cancelled
Main CI Pipeline / Skip DB Migration Test (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
2026-03-27 01:46:19 +00:00
非法操作
368896d84d feat: add copy/delete to multi nodes context menu (#34138) 2026-03-27 01:20:39 +00:00
YBoy
408f650b0c test: migrate auth integration tests to testcontainers (#34089)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Asuka Minato <i@asukaminato.eu.org>
2026-03-26 23:25:36 +00:00
dependabot[bot]
7c2e1fa3e2 chore(deps): bump brace-expansion from 5.0.4 to 5.0.5 in /sdks/nodejs-client (#34159)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-26 23:21:18 +00:00
YBoy
1da66b9a8c test: migrate api token service tests to testcontainers (#34148)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-26 21:02:09 +00:00
dependabot[bot]
4953762f4e chore(deps): bump requests from 2.32.5 to 2.33.0 in /api (#34116)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-26 20:59:35 +00:00
YBoy
97764c4a57 test: migrate plugin service tests to testcontainers (#34098)
Co-authored-by: Asuka Minato <i@asukaminato.eu.org>
2026-03-26 20:36:12 +00:00
tmimmanuel
2ea85d3ba2 refactor: use EnumText for model_type and WorkflowNodeExecution.status (#34093)
Co-authored-by: Krishna Chaitanya <krishnabkc15@gmail.com>
2026-03-26 20:34:44 +00:00
dependabot[bot]
1f11300175 chore(deps-dev): bump nltk from 3.9.3 to 3.9.4 in /api (#34117)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-26 20:31:40 +00:00
YBoy
f317db525f test: migrate api key auth service tests to testcontainers (#34147)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-26 20:31:18 +00:00
YBoy
3fa0538f72 test: migrate human input delivery test service tests to testcontainers (#34092)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-26 20:29:20 +00:00
99
fcfc96ca05 chore: remove stale mypy suppressions and align dataset service tests (#34130)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / Run API Tests (push) Has been cancelled
Main CI Pipeline / Skip API Tests (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Run Web Tests (push) Has been cancelled
Main CI Pipeline / Skip Web Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / Run VDB Tests (push) Has been cancelled
Main CI Pipeline / Skip VDB Tests (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / Run DB Migration Test (push) Has been cancelled
Main CI Pipeline / Skip DB Migration Test (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
2026-03-26 12:34:44 +00:00
-LAN-
69c2b422de chore: Keep main CI lane checks stable when skipped (#34143)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / Run API Tests (push) Has been cancelled
Main CI Pipeline / Skip API Tests (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Run Web Tests (push) Has been cancelled
Main CI Pipeline / Skip Web Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / Run VDB Tests (push) Has been cancelled
Main CI Pipeline / Skip VDB Tests (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / Run DB Migration Test (push) Has been cancelled
Main CI Pipeline / Skip DB Migration Test (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
2026-03-26 09:29:41 +00:00
-LAN-
496baa9335 chore(api): remove backend utcnow usage (#34131) 2026-03-26 08:51:49 +00:00
-LAN-
e8657cc3de chore: Support merge queue status checks in required CI workflows (#34133) 2026-03-26 16:42:27 +08:00
QuantumGhost
e08c06cbc3 fix: import path (#34124)
Co-authored-by: -LAN- <laipz8200@outlook.com>
2026-03-26 16:13:53 +08:00
Mahmoud Hamdy
8ca54ddf94 refactor(web): convert 7 enums to as-const objects (batch 5) (#33960)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-26 15:50:54 +08:00
非法操作
3e073404cc fix: the menu of multi nodes always display on left top corner (#34120)
Co-authored-by: yyh <yuanyouhuilyz@gmail.com>
2026-03-26 15:49:42 +08:00
Wu Tianwei
0acabf5f73 chore(deps): update picomatch version in nodejs-client and web packages (#34123)
Co-authored-by: Stephen Zhou <38493346+hyoban@users.noreply.github.com>
2026-03-26 15:49:19 +08:00
Stephen Zhou
38285aa1ac chore: enable no-barrel-files (#34121) 2026-03-26 15:11:25 +08:00
Achieve3318
5341cd015b fix: dataset query created_by empty UUID in iteration subgraph (#34004) (#34044)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-26 14:57:19 +08:00
Stephen Zhou
c32eebf57d refactor: use ungh for github api (#34108) 2026-03-26 14:37:17 +08:00
dependabot[bot]
554ba6b8f3 chore(deps): bump pypdf from 6.9.1 to 6.9.2 in /api (#34099)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-26 13:27:04 +09:00
Renzo
a69b8c1e96 refactor: select in service API dataset document and segment controllers (#34101)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-26 13:24:54 +09:00
Wu Tianwei
6f3fcf2276 fix(prompt-editor): fix unexpected blur effect in prompt editor (#34069)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-26 10:53:18 +08:00
非法操作
3df4bba280 fix: datasource api-key modal z-index incorrect (#34103)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2026-03-26 09:28:36 +08:00
Krishna Chaitanya
7c0d2e1d98 fix: handle null email in GitHub OAuth sign-in (#34043)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
When a GitHub user's profile email is null (hidden/private), the OAuth callback fails with HTTP 400 because `GitHubRawUserInfo` validates `email` as a required non-null string. Even after the type was relaxed to `NotRequired[str | None]` in #33882, the flow still raises a `ValueError` when no email can be resolved, blocking sign-in entirely.

This PR improves the email resolution strategy so that users with private GitHub emails can still sign in.
2026-03-26 00:41:18 +08:00
Rajat Agarwal
a9336b74fd test: Unit test case for services.dataset_services.py (#33212) 2026-03-26 00:28:48 +08:00
YBoy
518937b87f test: migrate plugin parameter service tests to testcontainers (#34090) 2026-03-25 23:11:14 +09:00
YBoy
e6ab9abf19 test: migrate metadata partial update tests to testcontainers (#34088) 2026-03-25 23:10:48 +09:00
YBoy
87a25e326c test: migrate account deletion sync tests to testcontainers (#34091)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-25 23:09:10 +09:00
YBoy
baf7d2c7c0 test: migrate database retrieval tests to testcontainers (#34087)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-25 23:06:46 +09:00
Renzo
22dd0aa20c refactor: select in service API wraps, file_preview, and site controllers (#34086)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-25 23:01:05 +09:00
99
52e7492cbc refactor(api): rename dify_graph to graphon (#34095) 2026-03-25 21:58:56 +08:00
Desel72
7e9d00a5a6 test: migrate workflow converter tests to testcontainers (#34038)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-25 22:28:25 +09:00
Faiz Khairi
ff9cf6c7a4 refactor: replace dict with BedrockRetrievalSetting BaseModel in knowledge_service (#34080)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-25 21:33:24 +09:00
-LAN-
56593f20b0 refactor(api): continue decoupling dify_graph from API concerns (#33580)
Signed-off-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: WH-2099 <wh2099@pm.me>
2026-03-25 20:32:24 +08:00
YBoy
b7b9b003c9 test: migrate restore archived workflow run tests to testcontainers (#34083) 2026-03-25 21:31:53 +09:00
-LAN-
59639ca9b2 chore: bump Dify to 1.13.3 and sandbox to 0.2.13 (#34079)
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Signed-off-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-25 20:03:15 +08:00
Xin Zhang
66b8c42a25 feat: add inner API endpoints for admin DSL import/export (#34059) 2026-03-25 19:48:53 +08:00
Coding On Star
449d8c7768 test(workflow-app): enhance unit tests for workflow components and hooks (#34065)
Co-authored-by: CodingOnStar <hanxujiang@dify.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: lif <1835304752@qq.com>
Co-authored-by: hjlarry <hjlarry@163.com>
Co-authored-by: Stephen Zhou <hi@hyoban.cc>
Co-authored-by: tmimmanuel <14046872+tmimmanuel@users.noreply.github.com>
Co-authored-by: Desel72 <pedroluiscolmenares722@gmail.com>
Co-authored-by: Renzo <170978465+RenzoMXD@users.noreply.github.com>
Co-authored-by: Krishna Chaitanya <krishnabkc15@gmail.com>
Co-authored-by: yyh <92089059+lyzno1@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-25 18:34:32 +08:00
1232 changed files with 46743 additions and 18133 deletions

2
.github/CODEOWNERS vendored
View File

@@ -36,7 +36,7 @@
/api/core/workflow/graph/ @laipz8200 @QuantumGhost
/api/core/workflow/graph_events/ @laipz8200 @QuantumGhost
/api/core/workflow/node_events/ @laipz8200 @QuantumGhost
/api/dify_graph/model_runtime/ @laipz8200 @QuantumGhost
/api/graphon/model_runtime/ @laipz8200 @WH-2099
# Backend - Workflow - Nodes (Agent, Iteration, Loop, LLM)
/api/core/workflow/nodes/agent/ @Nov1c444

View File

@@ -25,7 +25,6 @@ jobs:
strategy:
matrix:
python-version:
- "3.11"
- "3.12"
steps:

View File

@@ -2,6 +2,9 @@ name: autofix.ci
on:
pull_request:
branches: ["main"]
merge_group:
branches: ["main"]
types: [checks_requested]
push:
branches: ["main"]
permissions:
@@ -12,9 +15,15 @@ jobs:
if: github.repository == 'langgenius/dify'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Complete merge group check
if: github.event_name == 'merge_group'
run: echo "autofix.ci updates pull request branches, not merge group refs."
- if: github.event_name != 'merge_group'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Check Docker Compose inputs
if: github.event_name != 'merge_group'
id: docker-compose-changes
uses: tj-actions/changed-files@22103cc46bda19c2b464ffe86db46df6922fd323 # v47.0.5
with:
@@ -24,30 +33,34 @@ jobs:
docker/docker-compose-template.yaml
docker/docker-compose.yaml
- name: Check web inputs
if: github.event_name != 'merge_group'
id: web-changes
uses: tj-actions/changed-files@22103cc46bda19c2b464ffe86db46df6922fd323 # v47.0.5
with:
files: |
web/**
- name: Check api inputs
if: github.event_name != 'merge_group'
id: api-changes
uses: tj-actions/changed-files@22103cc46bda19c2b464ffe86db46df6922fd323 # v47.0.5
with:
files: |
api/**
- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
- if: github.event_name != 'merge_group'
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
with:
python-version: "3.11"
- uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # v7.6.0
- if: github.event_name != 'merge_group'
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # v7.6.0
- name: Generate Docker Compose
if: steps.docker-compose-changes.outputs.any_changed == 'true'
if: github.event_name != 'merge_group' && steps.docker-compose-changes.outputs.any_changed == 'true'
run: |
cd docker
./generate_docker_compose
- if: steps.api-changes.outputs.any_changed == 'true'
- if: github.event_name != 'merge_group' && steps.api-changes.outputs.any_changed == 'true'
run: |
cd api
uv sync --dev
@@ -59,13 +72,13 @@ jobs:
uv run ruff format ..
- name: count migration progress
if: steps.api-changes.outputs.any_changed == 'true'
if: github.event_name != 'merge_group' && steps.api-changes.outputs.any_changed == 'true'
run: |
cd api
./cnt_base.sh
- name: ast-grep
if: steps.api-changes.outputs.any_changed == 'true'
if: github.event_name != 'merge_group' && steps.api-changes.outputs.any_changed == 'true'
run: |
# ast-grep exits 1 if no matches are found; allow idempotent runs.
uvx --from ast-grep-cli ast-grep --pattern 'db.session.query($WHATEVER).filter($HERE)' --rewrite 'db.session.query($WHATEVER).where($HERE)' -l py --update-all || true
@@ -95,13 +108,14 @@ jobs:
find . -name "*.py.bak" -type f -delete
- name: Setup web environment
if: steps.web-changes.outputs.any_changed == 'true'
if: github.event_name != 'merge_group' && steps.web-changes.outputs.any_changed == 'true'
uses: ./.github/actions/setup-web
- name: ESLint autofix
if: steps.web-changes.outputs.any_changed == 'true'
if: github.event_name != 'merge_group' && steps.web-changes.outputs.any_changed == 'true'
run: |
cd web
vp exec eslint --concurrency=2 --prune-suppressions --quiet || true
- uses: autofix-ci/action@7a166d7532b277f34e16238930461bf77f9d7ed8 # v1.3.3
- if: github.event_name != 'merge_group'
uses: autofix-ci/action@7a166d7532b277f34e16238930461bf77f9d7ed8 # v1.3.3

View File

@@ -3,10 +3,14 @@ name: Main CI Pipeline
on:
pull_request:
branches: ["main"]
merge_group:
branches: ["main"]
types: [checks_requested]
push:
branches: ["main"]
permissions:
actions: write
contents: write
pull-requests: write
checks: write
@@ -17,9 +21,24 @@ concurrency:
cancel-in-progress: true
jobs:
pre_job:
name: Skip Duplicate Checks
runs-on: ubuntu-latest
outputs:
should_skip: ${{ steps.skip_check.outputs.should_skip || 'false' }}
steps:
- id: skip_check
continue-on-error: true
uses: fkirc/skip-duplicate-actions@f75f66ce1886f00957d99748a42c724f4330bdcf # v5.3.1
with:
cancel_others: 'true'
concurrent_skipping: same_content_newer
# Check which paths were changed to determine which tests to run
check-changes:
name: Check Changed Files
needs: pre_job
if: needs.pre_job.outputs.should_skip != 'true'
runs-on: ubuntu-latest
outputs:
api-changed: ${{ steps.changes.outputs.api }}
@@ -50,33 +69,247 @@ jobs:
- 'api/migrations/**'
- '.github/workflows/db-migration-test.yml'
# Run tests in parallel
api-tests:
name: API Tests
needs: check-changes
if: needs.check-changes.outputs.api-changed == 'true'
# Run tests in parallel while always emitting stable required checks.
api-tests-run:
name: Run API Tests
needs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.api-changed == 'true'
uses: ./.github/workflows/api-tests.yml
secrets: inherit
web-tests:
name: Web Tests
needs: check-changes
if: needs.check-changes.outputs.web-changed == 'true'
api-tests-skip:
name: Skip API Tests
needs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.api-changed != 'true'
runs-on: ubuntu-latest
steps:
- name: Report skipped API tests
run: echo "No API-related changes detected; skipping API tests."
api-tests:
name: API Tests
if: ${{ always() }}
needs:
- pre_job
- check-changes
- api-tests-run
- api-tests-skip
runs-on: ubuntu-latest
steps:
- name: Finalize API Tests status
env:
SHOULD_SKIP_WORKFLOW: ${{ needs.pre_job.outputs.should_skip }}
TESTS_CHANGED: ${{ needs.check-changes.outputs.api-changed }}
RUN_RESULT: ${{ needs.api-tests-run.result }}
SKIP_RESULT: ${{ needs.api-tests-skip.result }}
run: |
if [[ "$SHOULD_SKIP_WORKFLOW" == 'true' ]]; then
echo "API tests were skipped because this workflow run duplicated a successful or newer run."
exit 0
fi
if [[ "$TESTS_CHANGED" == 'true' ]]; then
if [[ "$RUN_RESULT" == 'success' ]]; then
echo "API tests ran successfully."
exit 0
fi
echo "API tests were required but finished with result: $RUN_RESULT" >&2
exit 1
fi
if [[ "$SKIP_RESULT" == 'success' ]]; then
echo "API tests were skipped because no API-related files changed."
exit 0
fi
echo "API tests were not required, but the skip job finished with result: $SKIP_RESULT" >&2
exit 1
web-tests-run:
name: Run Web Tests
needs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.web-changed == 'true'
uses: ./.github/workflows/web-tests.yml
secrets: inherit
web-tests-skip:
name: Skip Web Tests
needs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.web-changed != 'true'
runs-on: ubuntu-latest
steps:
- name: Report skipped web tests
run: echo "No web-related changes detected; skipping web tests."
web-tests:
name: Web Tests
if: ${{ always() }}
needs:
- pre_job
- check-changes
- web-tests-run
- web-tests-skip
runs-on: ubuntu-latest
steps:
- name: Finalize Web Tests status
env:
SHOULD_SKIP_WORKFLOW: ${{ needs.pre_job.outputs.should_skip }}
TESTS_CHANGED: ${{ needs.check-changes.outputs.web-changed }}
RUN_RESULT: ${{ needs.web-tests-run.result }}
SKIP_RESULT: ${{ needs.web-tests-skip.result }}
run: |
if [[ "$SHOULD_SKIP_WORKFLOW" == 'true' ]]; then
echo "Web tests were skipped because this workflow run duplicated a successful or newer run."
exit 0
fi
if [[ "$TESTS_CHANGED" == 'true' ]]; then
if [[ "$RUN_RESULT" == 'success' ]]; then
echo "Web tests ran successfully."
exit 0
fi
echo "Web tests were required but finished with result: $RUN_RESULT" >&2
exit 1
fi
if [[ "$SKIP_RESULT" == 'success' ]]; then
echo "Web tests were skipped because no web-related files changed."
exit 0
fi
echo "Web tests were not required, but the skip job finished with result: $SKIP_RESULT" >&2
exit 1
style-check:
name: Style Check
needs: pre_job
if: needs.pre_job.outputs.should_skip != 'true'
uses: ./.github/workflows/style.yml
vdb-tests-run:
name: Run VDB Tests
needs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.vdb-changed == 'true'
uses: ./.github/workflows/vdb-tests.yml
vdb-tests-skip:
name: Skip VDB Tests
needs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.vdb-changed != 'true'
runs-on: ubuntu-latest
steps:
- name: Report skipped VDB tests
run: echo "No VDB-related changes detected; skipping VDB tests."
vdb-tests:
name: VDB Tests
needs: check-changes
if: needs.check-changes.outputs.vdb-changed == 'true'
uses: ./.github/workflows/vdb-tests.yml
if: ${{ always() }}
needs:
- pre_job
- check-changes
- vdb-tests-run
- vdb-tests-skip
runs-on: ubuntu-latest
steps:
- name: Finalize VDB Tests status
env:
SHOULD_SKIP_WORKFLOW: ${{ needs.pre_job.outputs.should_skip }}
TESTS_CHANGED: ${{ needs.check-changes.outputs.vdb-changed }}
RUN_RESULT: ${{ needs.vdb-tests-run.result }}
SKIP_RESULT: ${{ needs.vdb-tests-skip.result }}
run: |
if [[ "$SHOULD_SKIP_WORKFLOW" == 'true' ]]; then
echo "VDB tests were skipped because this workflow run duplicated a successful or newer run."
exit 0
fi
if [[ "$TESTS_CHANGED" == 'true' ]]; then
if [[ "$RUN_RESULT" == 'success' ]]; then
echo "VDB tests ran successfully."
exit 0
fi
echo "VDB tests were required but finished with result: $RUN_RESULT" >&2
exit 1
fi
if [[ "$SKIP_RESULT" == 'success' ]]; then
echo "VDB tests were skipped because no VDB-related files changed."
exit 0
fi
echo "VDB tests were not required, but the skip job finished with result: $SKIP_RESULT" >&2
exit 1
db-migration-test-run:
name: Run DB Migration Test
needs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.migration-changed == 'true'
uses: ./.github/workflows/db-migration-test.yml
db-migration-test-skip:
name: Skip DB Migration Test
needs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.migration-changed != 'true'
runs-on: ubuntu-latest
steps:
- name: Report skipped DB migration tests
run: echo "No migration-related changes detected; skipping DB migration tests."
db-migration-test:
name: DB Migration Test
needs: check-changes
if: needs.check-changes.outputs.migration-changed == 'true'
uses: ./.github/workflows/db-migration-test.yml
if: ${{ always() }}
needs:
- pre_job
- check-changes
- db-migration-test-run
- db-migration-test-skip
runs-on: ubuntu-latest
steps:
- name: Finalize DB Migration Test status
env:
SHOULD_SKIP_WORKFLOW: ${{ needs.pre_job.outputs.should_skip }}
TESTS_CHANGED: ${{ needs.check-changes.outputs.migration-changed }}
RUN_RESULT: ${{ needs.db-migration-test-run.result }}
SKIP_RESULT: ${{ needs.db-migration-test-skip.result }}
run: |
if [[ "$SHOULD_SKIP_WORKFLOW" == 'true' ]]; then
echo "DB migration tests were skipped because this workflow run duplicated a successful or newer run."
exit 0
fi
if [[ "$TESTS_CHANGED" == 'true' ]]; then
if [[ "$RUN_RESULT" == 'success' ]]; then
echo "DB migration tests ran successfully."
exit 0
fi
echo "DB migration tests were required but finished with result: $RUN_RESULT" >&2
exit 1
fi
if [[ "$SKIP_RESULT" == 'success' ]]; then
echo "DB migration tests were skipped because no migration-related files changed."
exit 0
fi
echo "DB migration tests were not required, but the skip job finished with result: $SKIP_RESULT" >&2
exit 1

View File

@@ -7,6 +7,9 @@ on:
- edited
- reopened
- synchronize
merge_group:
branches: ["main"]
types: [checks_requested]
jobs:
lint:
@@ -15,7 +18,11 @@ jobs:
pull-requests: read
runs-on: ubuntu-latest
steps:
- name: Complete merge group check
if: github.event_name == 'merge_group'
run: echo "Semantic PR title validation is handled on pull requests."
- name: Check title
if: github.event_name == 'pull_request'
uses: amannn/action-semantic-pull-request@48f256284bd46cdaab1048c3721360e808335d50 # v6.1.1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -14,7 +14,6 @@ jobs:
strategy:
matrix:
python-version:
- "3.11"
- "3.12"
steps:

View File

@@ -1,10 +1,14 @@
[importlinter]
root_packages =
core
dify_graph
constants
context
graphon
configs
controllers
extensions
factories
libs
models
tasks
services
@@ -22,40 +26,30 @@ layers =
runtime
entities
containers =
dify_graph
graphon
ignore_imports =
dify_graph.nodes.base.node -> dify_graph.graph_events
dify_graph.nodes.iteration.iteration_node -> dify_graph.graph_events
dify_graph.nodes.loop.loop_node -> dify_graph.graph_events
graphon.nodes.base.node -> graphon.graph_events
graphon.nodes.iteration.iteration_node -> graphon.graph_events
graphon.nodes.loop.loop_node -> graphon.graph_events
dify_graph.nodes.iteration.iteration_node -> dify_graph.graph_engine
dify_graph.nodes.loop.loop_node -> dify_graph.graph_engine
graphon.nodes.iteration.iteration_node -> graphon.graph_engine
graphon.nodes.loop.loop_node -> graphon.graph_engine
# TODO(QuantumGhost): fix the import violation later
dify_graph.entities.pause_reason -> dify_graph.nodes.human_input.entities
[importlinter:contract:workflow-infrastructure-dependencies]
name = Workflow Infrastructure Dependencies
type = forbidden
source_modules =
dify_graph
forbidden_modules =
extensions.ext_database
extensions.ext_redis
allow_indirect_imports = True
ignore_imports =
dify_graph.nodes.llm.node -> extensions.ext_database
dify_graph.model_runtime.model_providers.__base.ai_model -> extensions.ext_redis
dify_graph.model_runtime.model_providers.model_provider_factory -> extensions.ext_redis
graphon.entities.pause_reason -> graphon.nodes.human_input.entities
[importlinter:contract:workflow-external-imports]
name = Workflow External Imports
type = forbidden
source_modules =
dify_graph
graphon
forbidden_modules =
constants
configs
context
controllers
extensions
factories
libs
models
services
tasks
@@ -88,46 +82,14 @@ forbidden_modules =
core.tools
core.trigger
core.variables
ignore_imports =
dify_graph.nodes.llm.llm_utils -> core.model_manager
dify_graph.nodes.llm.protocols -> core.model_manager
dify_graph.nodes.llm.llm_utils -> dify_graph.model_runtime.model_providers.__base.large_language_model
dify_graph.nodes.llm.node -> core.tools.signature
dify_graph.nodes.tool.tool_node -> core.callback_handler.workflow_tool_callback_handler
dify_graph.nodes.tool.tool_node -> core.tools.tool_engine
dify_graph.nodes.tool.tool_node -> core.tools.tool_manager
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.advanced_prompt_transform
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.simple_prompt_transform
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> dify_graph.model_runtime.model_providers.__base.large_language_model
dify_graph.nodes.question_classifier.question_classifier_node -> core.prompt.simple_prompt_transform
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.model_manager
dify_graph.nodes.question_classifier.question_classifier_node -> core.model_manager
dify_graph.nodes.tool.tool_node -> core.tools.utils.message_transformer
dify_graph.nodes.llm.node -> core.llm_generator.output_parser.errors
dify_graph.nodes.llm.node -> core.llm_generator.output_parser.structured_output
dify_graph.nodes.llm.node -> core.model_manager
dify_graph.nodes.llm.entities -> core.prompt.entities.advanced_prompt_entities
dify_graph.nodes.llm.node -> core.prompt.entities.advanced_prompt_entities
dify_graph.nodes.llm.node -> core.prompt.utils.prompt_message_util
dify_graph.nodes.parameter_extractor.entities -> core.prompt.entities.advanced_prompt_entities
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.entities.advanced_prompt_entities
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.utils.prompt_message_util
dify_graph.nodes.question_classifier.entities -> core.prompt.entities.advanced_prompt_entities
dify_graph.nodes.question_classifier.question_classifier_node -> core.prompt.utils.prompt_message_util
dify_graph.nodes.llm.node -> models.dataset
dify_graph.nodes.llm.file_saver -> core.tools.signature
dify_graph.nodes.llm.file_saver -> core.tools.tool_file_manager
dify_graph.nodes.tool.tool_node -> core.tools.errors
dify_graph.nodes.llm.node -> extensions.ext_database
dify_graph.nodes.llm.node -> models.model
dify_graph.nodes.tool.tool_node -> services
dify_graph.model_runtime.model_providers.__base.ai_model -> configs
dify_graph.model_runtime.model_providers.__base.ai_model -> extensions.ext_redis
dify_graph.model_runtime.model_providers.__base.large_language_model -> configs
dify_graph.model_runtime.model_providers.__base.text_embedding_model -> core.entities.embedding_type
dify_graph.model_runtime.model_providers.model_provider_factory -> configs
dify_graph.model_runtime.model_providers.model_provider_factory -> extensions.ext_redis
dify_graph.model_runtime.model_providers.model_provider_factory -> models.provider_ids
[importlinter:contract:workflow-third-party-imports]
name = Workflow Third-Party Imports
type = forbidden
source_modules =
graphon
forbidden_modules =
sqlalchemy
[importlinter:contract:rsc]
name = RSC
@@ -136,7 +98,7 @@ layers =
graph_engine
response_coordinator
containers =
dify_graph.graph_engine
graphon.graph_engine
[importlinter:contract:worker]
name = Worker
@@ -145,7 +107,7 @@ layers =
graph_engine
worker
containers =
dify_graph.graph_engine
graphon.graph_engine
[importlinter:contract:graph-engine-architecture]
name = Graph Engine Architecture
@@ -161,28 +123,28 @@ layers =
worker_management
domain
containers =
dify_graph.graph_engine
graphon.graph_engine
[importlinter:contract:domain-isolation]
name = Domain Model Isolation
type = forbidden
source_modules =
dify_graph.graph_engine.domain
graphon.graph_engine.domain
forbidden_modules =
dify_graph.graph_engine.worker_management
dify_graph.graph_engine.command_channels
dify_graph.graph_engine.layers
dify_graph.graph_engine.protocols
graphon.graph_engine.worker_management
graphon.graph_engine.command_channels
graphon.graph_engine.layers
graphon.graph_engine.protocols
[importlinter:contract:worker-management]
name = Worker Management
type = forbidden
source_modules =
dify_graph.graph_engine.worker_management
graphon.graph_engine.worker_management
forbidden_modules =
dify_graph.graph_engine.orchestration
dify_graph.graph_engine.command_processing
dify_graph.graph_engine.event_management
graphon.graph_engine.orchestration
graphon.graph_engine.command_processing
graphon.graph_engine.event_management
[importlinter:contract:graph-traversal-components]
@@ -192,11 +154,11 @@ layers =
edge_processor
skip_propagator
containers =
dify_graph.graph_engine.graph_traversal
graphon.graph_engine.graph_traversal
[importlinter:contract:command-channels]
name = Command Channels Independence
type = independence
modules =
dify_graph.graph_engine.command_channels.in_memory_channel
dify_graph.graph_engine.command_channels.redis_channel
graphon.graph_engine.command_channels.in_memory_channel
graphon.graph_engine.command_channels.redis_channel

View File

@@ -100,7 +100,7 @@ ignore = [
"configs/*" = [
"N802", # invalid-function-name
]
"dify_graph/model_runtime/callbacks/base_callback.py" = ["T201"]
"graphon/model_runtime/callbacks/base_callback.py" = ["T201"]
"core/workflow/callbacks/workflow_logging_callback.py" = ["T201"]
"libs/gmpy2_pkcs10aep_cipher.py" = [
"N803", # invalid-argument-name

View File

@@ -143,6 +143,7 @@ def initialize_extensions(app: DifyApp):
ext_commands,
ext_compress,
ext_database,
ext_enterprise_telemetry,
ext_fastopenapi,
ext_forward_refs,
ext_hosting_provider,
@@ -193,6 +194,7 @@ def initialize_extensions(app: DifyApp):
ext_commands,
ext_fastopenapi,
ext_otel,
ext_enterprise_telemetry,
ext_request_logging,
ext_session_factory,
]

View File

@@ -8,7 +8,7 @@ from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, Settings
from libs.file_utils import search_file_upwards
from .deploy import DeploymentConfig
from .enterprise import EnterpriseFeatureConfig
from .enterprise import EnterpriseFeatureConfig, EnterpriseTelemetryConfig
from .extra import ExtraServiceConfig
from .feature import FeatureConfig
from .middleware import MiddlewareConfig
@@ -73,6 +73,8 @@ class DifyConfig(
# Enterprise feature configs
# **Before using, please contact business@dify.ai by email to inquire about licensing matters.**
EnterpriseFeatureConfig,
# Enterprise telemetry configs
EnterpriseTelemetryConfig,
):
model_config = SettingsConfigDict(
# read from dotenv format config file

View File

@@ -22,3 +22,52 @@ class EnterpriseFeatureConfig(BaseSettings):
ENTERPRISE_REQUEST_TIMEOUT: int = Field(
ge=1, description="Maximum timeout in seconds for enterprise requests", default=5
)
class EnterpriseTelemetryConfig(BaseSettings):
"""
Configuration for enterprise telemetry.
"""
ENTERPRISE_TELEMETRY_ENABLED: bool = Field(
description="Enable enterprise telemetry collection (also requires ENTERPRISE_ENABLED=true).",
default=False,
)
ENTERPRISE_OTLP_ENDPOINT: str = Field(
description="Enterprise OTEL collector endpoint.",
default="",
)
ENTERPRISE_OTLP_HEADERS: str = Field(
description="Auth headers for OTLP export (key=value,key2=value2).",
default="",
)
ENTERPRISE_OTLP_PROTOCOL: str = Field(
description="OTLP protocol: 'http' or 'grpc' (default: http).",
default="http",
)
ENTERPRISE_OTLP_API_KEY: str = Field(
description="Bearer token for enterprise OTLP export authentication.",
default="",
)
ENTERPRISE_INCLUDE_CONTENT: bool = Field(
description="Include input/output content in traces (privacy toggle).",
# Setting the default value to False to avoid accidentally log PII data in traces.
default=False,
)
ENTERPRISE_SERVICE_NAME: str = Field(
description="Service name for OTEL resource.",
default="dify",
)
ENTERPRISE_OTEL_SAMPLING_RATE: float = Field(
description="Sampling rate for enterprise traces (0.0 to 1.0, default 1.0 = 100%).",
default=1.0,
ge=0.0,
le=1.0,
)

View File

@@ -1,74 +1,36 @@
"""
Core Context - Framework-agnostic context management.
Application-layer context adapters.
This module provides context management that is independent of any specific
web framework. Framework-specific implementations register their context
capture functions at application initialization time.
This ensures the workflow layer remains completely decoupled from Flask
or any other web framework.
Concrete execution-context implementations live here so `graphon` only
depends on injected context managers rather than framework state capture.
"""
import contextvars
from collections.abc import Callable
from dify_graph.context.execution_context import (
from context.execution_context import (
AppContext,
ContextProviderNotFoundError,
ExecutionContext,
ExecutionContextBuilder,
IExecutionContext,
NullAppContext,
capture_current_context,
read_context,
register_context,
register_context_capturer,
reset_context_provider,
)
# Global capturer function - set by framework-specific modules
_capturer: Callable[[], IExecutionContext] | None = None
def register_context_capturer(capturer: Callable[[], IExecutionContext]) -> None:
"""
Register a context capture function.
This should be called by framework-specific modules (e.g., Flask)
during application initialization.
Args:
capturer: Function that captures current context and returns IExecutionContext
"""
global _capturer
_capturer = capturer
def capture_current_context() -> IExecutionContext:
"""
Capture current execution context.
This function uses the registered context capturer. If no capturer
is registered, it returns a minimal context with only contextvars
(suitable for non-framework environments like tests or standalone scripts).
Returns:
IExecutionContext with captured context
"""
if _capturer is None:
# No framework registered - return minimal context
return ExecutionContext(
app_context=NullAppContext(),
context_vars=contextvars.copy_context(),
)
return _capturer()
def reset_context_provider() -> None:
"""
Reset the context capturer.
This is primarily useful for testing to ensure a clean state.
"""
global _capturer
_capturer = None
from context.models import SandboxContext
__all__ = [
"AppContext",
"ContextProviderNotFoundError",
"ExecutionContext",
"ExecutionContextBuilder",
"IExecutionContext",
"NullAppContext",
"SandboxContext",
"capture_current_context",
"read_context",
"register_context",
"register_context_capturer",
"reset_context_provider",
]

View File

@@ -1,5 +1,8 @@
"""
Execution Context - Abstracted context management for workflow execution.
Application-layer execution context adapters.
Concrete context capture lives outside `graphon` so the graph package only
consumes injected context managers when it needs to preserve thread-local state.
"""
import contextvars
@@ -16,33 +19,33 @@ class AppContext(ABC):
"""
Abstract application context interface.
This abstraction allows workflow execution to work with or without Flask
by providing a common interface for application context management.
Application adapters can implement this to restore framework-specific state
such as Flask app context around worker execution.
"""
@abstractmethod
def get_config(self, key: str, default: Any = None) -> Any:
"""Get configuration value by key."""
pass
raise NotImplementedError
@abstractmethod
def get_extension(self, name: str) -> Any:
"""Get Flask extension by name (e.g., 'db', 'cache')."""
pass
"""Get application extension by name."""
raise NotImplementedError
@abstractmethod
def enter(self) -> AbstractContextManager[None]:
"""Enter the application context."""
pass
raise NotImplementedError
@runtime_checkable
class IExecutionContext(Protocol):
"""
Protocol for execution context.
Protocol for enterable execution context objects.
This protocol defines the interface that all execution contexts must implement,
allowing both ExecutionContext and FlaskExecutionContext to be used interchangeably.
Concrete implementations may carry extra framework state, but callers only
depend on standard context-manager behavior plus optional user metadata.
"""
def __enter__(self) -> "IExecutionContext":
@@ -62,14 +65,10 @@ class IExecutionContext(Protocol):
@final
class ExecutionContext:
"""
Execution context for workflow execution in worker threads.
Generic execution context used by application-layer adapters.
This class encapsulates all context needed for workflow execution:
- Application context (Flask app or standalone)
- Context variables for Python contextvars
- User information (optional)
It is designed to be serializable and passable to worker threads.
It restores captured `contextvars` and optionally enters an application
context before the worker executes graph logic.
"""
def __init__(
@@ -78,14 +77,6 @@ class ExecutionContext:
context_vars: contextvars.Context | None = None,
user: Any = None,
) -> None:
"""
Initialize execution context.
Args:
app_context: Application context (Flask or standalone)
context_vars: Python contextvars to preserve
user: User object (optional)
"""
self._app_context = app_context
self._context_vars = context_vars
self._user = user
@@ -98,27 +89,21 @@ class ExecutionContext:
@property
def context_vars(self) -> contextvars.Context | None:
"""Get context variables."""
"""Get captured context variables."""
return self._context_vars
@property
def user(self) -> Any:
"""Get user object."""
"""Get captured user object."""
return self._user
@contextmanager
def enter(self) -> Generator[None, None, None]:
"""
Enter this execution context.
This is a convenience method that creates a context manager.
"""
# Restore context variables if provided
"""Enter this execution context."""
if self._context_vars:
for var, val in self._context_vars.items():
var.set(val)
# Enter app context if available
if self._app_context is not None:
with self._app_context.enter():
yield
@@ -141,18 +126,10 @@ class ExecutionContext:
class NullAppContext(AppContext):
"""
Null implementation of AppContext for non-Flask environments.
This is used when running without Flask (e.g., in tests or standalone mode).
Null application context for non-framework environments.
"""
def __init__(self, config: dict[str, Any] | None = None) -> None:
"""
Initialize null app context.
Args:
config: Optional configuration dictionary
"""
self._config = config or {}
self._extensions: dict[str, Any] = {}
@@ -165,7 +142,7 @@ class NullAppContext(AppContext):
return self._extensions.get(name)
def set_extension(self, name: str, extension: Any) -> None:
"""Set extension by name."""
"""Register an extension for tests or standalone execution."""
self._extensions[name] = extension
@contextmanager
@@ -176,9 +153,7 @@ class NullAppContext(AppContext):
class ExecutionContextBuilder:
"""
Builder for creating ExecutionContext instances.
This provides a fluent API for building execution contexts.
Builder for creating `ExecutionContext` instances.
"""
def __init__(self) -> None:
@@ -211,63 +186,42 @@ class ExecutionContextBuilder:
_capturer: Callable[[], IExecutionContext] | None = None
# Tenant-scoped providers using tuple keys for clarity and constant-time lookup.
# Key mapping:
# (name, tenant_id) -> provider
# - name: namespaced identifier (recommend prefixing, e.g. "workflow.sandbox")
# - tenant_id: tenant identifier string
# Value:
# provider: Callable[[], BaseModel] returning the typed context value
# Type-safety note:
# - This registry cannot enforce that all providers for a given name return the same BaseModel type.
# - Implementors SHOULD provide typed wrappers around register/read (like Go's context best practice),
# e.g. def register_sandbox_ctx(tenant_id: str, p: Callable[[], SandboxContext]) and
# def read_sandbox_ctx(tenant_id: str) -> SandboxContext.
_tenant_context_providers: dict[tuple[str, str], Callable[[], BaseModel]] = {}
T = TypeVar("T", bound=BaseModel)
class ContextProviderNotFoundError(KeyError):
"""Raised when a tenant-scoped context provider is missing for a given (name, tenant_id)."""
"""Raised when a tenant-scoped context provider is missing."""
pass
def register_context_capturer(capturer: Callable[[], IExecutionContext]) -> None:
"""Register a single enterable execution context capturer (e.g., Flask)."""
"""Register an enterable execution context capturer."""
global _capturer
_capturer = capturer
def register_context(name: str, tenant_id: str, provider: Callable[[], BaseModel]) -> None:
"""Register a tenant-specific provider for a named context.
Tip: use a namespaced "name" (e.g., "workflow.sandbox") to avoid key collisions.
Consider adding a typed wrapper for this registration in your feature module.
"""
"""Register a tenant-specific provider for a named context."""
_tenant_context_providers[(name, tenant_id)] = provider
def read_context(name: str, *, tenant_id: str) -> BaseModel:
"""
Read a context value for a specific tenant.
Raises KeyError if the provider for (name, tenant_id) is not registered.
"""
prov = _tenant_context_providers.get((name, tenant_id))
if prov is None:
"""Read a context value for a specific tenant."""
provider = _tenant_context_providers.get((name, tenant_id))
if provider is None:
raise ContextProviderNotFoundError(f"Context provider '{name}' not registered for tenant '{tenant_id}'")
return prov()
return provider()
def capture_current_context() -> IExecutionContext:
"""
Capture current execution context from the calling environment.
If a capturer is registered (e.g., Flask), use it. Otherwise, return a minimal
context with NullAppContext + copy of current contextvars.
If no framework adapter is registered, return a minimal context that only
restores `contextvars`.
"""
if _capturer is None:
return ExecutionContext(
@@ -278,7 +232,22 @@ def capture_current_context() -> IExecutionContext:
def reset_context_provider() -> None:
"""Reset the capturer and all tenant-scoped context providers (primarily for tests)."""
"""Reset the capturer and tenant-scoped providers."""
global _capturer
_capturer = None
_tenant_context_providers.clear()
__all__ = [
"AppContext",
"ContextProviderNotFoundError",
"ExecutionContext",
"ExecutionContextBuilder",
"IExecutionContext",
"NullAppContext",
"capture_current_context",
"read_context",
"register_context",
"register_context_capturer",
"reset_context_provider",
]

View File

@@ -10,11 +10,7 @@ from typing import Any, final
from flask import Flask, current_app, g
from dify_graph.context import register_context_capturer
from dify_graph.context.execution_context import (
AppContext,
IExecutionContext,
)
from context.execution_context import AppContext, IExecutionContext, register_context_capturer
@final

View File

@@ -6,7 +6,6 @@ from contexts.wrapper import RecyclableContextVar
if TYPE_CHECKING:
from core.datasource.__base.datasource_provider import DatasourcePluginProviderController
from core.plugin.entities.plugin_daemon import PluginModelProviderEntity
from core.tools.plugin_tool.provider import PluginToolProviderController
from core.trigger.provider import PluginTriggerProviderController
@@ -20,14 +19,6 @@ plugin_tool_providers: RecyclableContextVar[dict[str, "PluginToolProviderControl
plugin_tool_providers_lock: RecyclableContextVar[Lock] = RecyclableContextVar(ContextVar("plugin_tool_providers_lock"))
plugin_model_providers: RecyclableContextVar[list["PluginModelProviderEntity"] | None] = RecyclableContextVar(
ContextVar("plugin_model_providers")
)
plugin_model_providers_lock: RecyclableContextVar[Lock] = RecyclableContextVar(
ContextVar("plugin_model_providers_lock")
)
datasource_plugin_providers: RecyclableContextVar[dict[str, "DatasourcePluginProviderController"]] = (
RecyclableContextVar(ContextVar("datasource_plugin_providers"))
)

View File

@@ -4,7 +4,7 @@ from typing import Any, TypeAlias
from pydantic import BaseModel, ConfigDict, computed_field
from dify_graph.file import helpers as file_helpers
from graphon.file import helpers as file_helpers
from models.model import IconType
JSONValue: TypeAlias = str | int | float | bool | None | dict[str, Any] | list[Any]

View File

@@ -26,9 +26,9 @@ from controllers.console.wraps import (
from core.ops.ops_trace_manager import OpsTraceManager
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.trigger.constants import TRIGGER_NODE_TYPES
from dify_graph.enums import WorkflowExecutionStatus
from dify_graph.file import helpers as file_helpers
from extensions.ext_database import db
from graphon.enums import WorkflowExecutionStatus
from graphon.file import helpers as file_helpers
from libs.login import current_account_with_tenant, login_required
from models import App, DatasetPermissionEnum, Workflow
from models.model import IconType

View File

@@ -22,7 +22,7 @@ from controllers.console.app.error import (
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from dify_graph.model_runtime.errors.invoke import InvokeError
from graphon.model_runtime.errors.invoke import InvokeError
from libs.login import login_required
from models import App, AppMode
from services.audio_service import AudioService

View File

@@ -26,7 +26,7 @@ from core.errors.error import (
QuotaExceededError,
)
from core.helper.trace_id_helper import get_external_trace_id
from dify_graph.model_runtime.errors.invoke import InvokeError
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import uuid_value
from libs.login import current_user, login_required

View File

@@ -18,8 +18,8 @@ from core.helper.code_executor.javascript.javascript_code_provider import Javasc
from core.helper.code_executor.python3.python3_code_provider import Python3CodeProvider
from core.llm_generator.entities import RuleCodeGeneratePayload, RuleGeneratePayload, RuleStructuredOutputPayload
from core.llm_generator.llm_generator import LLMGenerator
from dify_graph.model_runtime.errors.invoke import InvokeError
from extensions.ext_database import db
from graphon.model_runtime.errors.invoke import InvokeError
from libs.login import current_account_with_tenant, login_required
from models import App
from services.workflow_service import WorkflowService

View File

@@ -24,9 +24,9 @@ from controllers.console.wraps import (
)
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from dify_graph.model_runtime.errors.invoke import InvokeError
from extensions.ext_database import db
from fields.raws import FilesContainedField
from graphon.model_runtime.errors.invoke import InvokeError
from libs.helper import TimestampField, uuid_value
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from libs.login import current_account_with_tenant, login_required

View File

@@ -88,6 +88,7 @@ class ModelConfigResource(Resource):
tenant_id=current_tenant_id,
app_id=app_model.id,
agent_tool=agent_tool_entity,
user_id=current_user.id,
)
manager = ToolParameterConfigurationManager(
tenant_id=current_tenant_id,
@@ -127,6 +128,7 @@ class ModelConfigResource(Resource):
tenant_id=current_tenant_id,
app_id=app_model.id,
agent_tool=agent_tool_entity,
user_id=current_user.id,
)
except Exception:
continue

View File

@@ -20,6 +20,7 @@ from core.app.app_config.features.file_upload.manager import FileUploadConfigMan
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.file_access import DatabaseFileAccessController
from core.helper.trace_id_helper import get_external_trace_id
from core.plugin.impl.exc import PluginInvokeError
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
@@ -29,15 +30,15 @@ from core.trigger.debug.event_selectors import (
create_event_poller,
select_trigger_debug_events,
)
from dify_graph.enums import NodeType
from dify_graph.file.models import File
from dify_graph.graph_engine.manager import GraphEngineManager
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from factories import file_factory, variable_factory
from fields.member_fields import simple_account_fields
from fields.workflow_fields import workflow_fields, workflow_pagination_fields
from graphon.enums import NodeType
from graphon.file.models import File
from graphon.graph_engine.manager import GraphEngineManager
from graphon.model_runtime.utils.encoders import jsonable_encoder
from libs import helper
from libs.datetime_utils import naive_utc_now
from libs.helper import TimestampField, uuid_value
@@ -51,6 +52,7 @@ from services.errors.llm import InvokeRateLimitError
from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError, WorkflowService
logger = logging.getLogger(__name__)
_file_access_controller = DatabaseFileAccessController()
LISTENING_RETRY_IN = 2000
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE = "source workflow must be published"
@@ -204,6 +206,7 @@ def _parse_file(workflow: Workflow, files: list[dict] | None = None) -> Sequence
mappings=files,
tenant_id=workflow.tenant_id,
config=file_extra_config,
access_controller=_file_access_controller,
)
return file_objs

View File

@@ -9,12 +9,12 @@ from sqlalchemy.orm import Session
from controllers.console import console_ns
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from dify_graph.enums import WorkflowExecutionStatus
from extensions.ext_database import db
from fields.workflow_app_log_fields import (
build_workflow_app_log_pagination_model,
build_workflow_archived_log_pagination_model,
)
from graphon.enums import WorkflowExecutionStatus
from libs.login import login_required
from models import App
from models.model import AppMode

View File

@@ -15,14 +15,15 @@ from controllers.console.app.error import (
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, edit_permission_required, setup_required
from controllers.web.error import InvalidArgumentError, NotFoundError
from dify_graph.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
from dify_graph.file import helpers as file_helpers
from dify_graph.variables.segment_group import SegmentGroup
from dify_graph.variables.segments import ArrayFileSegment, FileSegment, Segment
from dify_graph.variables.types import SegmentType
from core.app.file_access import DatabaseFileAccessController
from core.workflow.variable_prefixes import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
from extensions.ext_database import db
from factories.file_factory import build_from_mapping, build_from_mappings
from factories.variable_factory import build_segment_with_type
from graphon.file import helpers as file_helpers
from graphon.variables.segment_group import SegmentGroup
from graphon.variables.segments import ArrayFileSegment, FileSegment, Segment
from graphon.variables.types import SegmentType
from libs.login import current_user, login_required
from models import App, AppMode
from models.workflow import WorkflowDraftVariable
@@ -30,6 +31,7 @@ from services.workflow_draft_variable_service import WorkflowDraftVariableList,
from services.workflow_service import WorkflowService
logger = logging.getLogger(__name__)
_file_access_controller = DatabaseFileAccessController()
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
@@ -389,13 +391,21 @@ class VariableApi(Resource):
if variable.value_type == SegmentType.FILE:
if not isinstance(raw_value, dict):
raise InvalidArgumentError(description=f"expected dict for file, got {type(raw_value)}")
raw_value = build_from_mapping(mapping=raw_value, tenant_id=app_model.tenant_id)
raw_value = build_from_mapping(
mapping=raw_value,
tenant_id=app_model.tenant_id,
access_controller=_file_access_controller,
)
elif variable.value_type == SegmentType.ARRAY_FILE:
if not isinstance(raw_value, list):
raise InvalidArgumentError(description=f"expected list for files, got {type(raw_value)}")
if len(raw_value) > 0 and not isinstance(raw_value[0], dict):
raise InvalidArgumentError(description=f"expected dict for files[0], got {type(raw_value)}")
raw_value = build_from_mappings(mappings=raw_value, tenant_id=app_model.tenant_id)
raw_value = build_from_mappings(
mappings=raw_value,
tenant_id=app_model.tenant_id,
access_controller=_file_access_controller,
)
new_value = build_segment_with_type(variable.value_type, raw_value)
draft_var_srv.update_variable(variable, name=new_name, value=new_value)
db.session.commit()

View File

@@ -1,5 +1,5 @@
from datetime import UTC, datetime, timedelta
from typing import Literal, cast
from typing import Literal, TypedDict, cast
from flask import request
from flask_restx import Resource, fields, marshal_with
@@ -12,8 +12,7 @@ from controllers.console import console_ns
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from controllers.web.error import NotFoundError
from dify_graph.entities.pause_reason import HumanInputRequired
from dify_graph.enums import WorkflowExecutionStatus
from core.workflow.human_input_forms import load_form_tokens_by_form_id as _load_form_tokens_by_form_id
from extensions.ext_database import db
from fields.end_user_fields import simple_end_user_fields
from fields.member_fields import simple_account_fields
@@ -27,6 +26,8 @@ from fields.workflow_run_fields import (
workflow_run_node_execution_list_fields,
workflow_run_pagination_fields,
)
from graphon.entities.pause_reason import HumanInputRequired
from graphon.enums import WorkflowExecutionStatus
from libs.archive_storage import ArchiveStorageNotConfiguredError, get_archive_storage
from libs.custom_inputs import time_duration
from libs.helper import uuid_value
@@ -172,6 +173,23 @@ console_ns.schema_model(
)
class HumanInputPauseTypeResponse(TypedDict):
type: Literal["human_input"]
form_id: str
backstage_input_url: str | None
class PausedNodeResponse(TypedDict):
node_id: str
node_title: str
pause_type: HumanInputPauseTypeResponse
class WorkflowPauseDetailsResponse(TypedDict):
paused_at: str | None
paused_nodes: list[PausedNodeResponse]
@console_ns.route("/apps/<uuid:app_id>/advanced-chat/workflow-runs")
class AdvancedChatAppWorkflowRunListApi(Resource):
@console_ns.doc("get_advanced_chat_workflow_runs")
@@ -489,18 +507,22 @@ class ConsoleWorkflowPauseDetailsApi(Resource):
# Check if workflow is suspended
is_paused = workflow_run.status == WorkflowExecutionStatus.PAUSED
if not is_paused:
return {
empty_response: WorkflowPauseDetailsResponse = {
"paused_at": None,
"paused_nodes": [],
}, 200
}
return empty_response, 200
pause_entity = workflow_run_repo.get_workflow_pause(workflow_run_id)
pause_reasons = pause_entity.get_pause_reasons() if pause_entity else []
form_tokens_by_form_id = _load_form_tokens_by_form_id(
[reason.form_id for reason in pause_reasons if isinstance(reason, HumanInputRequired)]
)
# Build response
paused_at = pause_entity.paused_at if pause_entity else None
paused_nodes = []
response = {
paused_nodes: list[PausedNodeResponse] = []
response: WorkflowPauseDetailsResponse = {
"paused_at": paused_at.isoformat() + "Z" if paused_at else None,
"paused_nodes": paused_nodes,
}
@@ -514,7 +536,9 @@ class ConsoleWorkflowPauseDetailsApi(Resource):
"pause_type": {
"type": "human_input",
"form_id": reason.form_id,
"backstage_input_url": _build_backstage_input_url(reason.form_token),
"backstage_input_url": _build_backstage_input_url(
form_tokens_by_form_id.get(reason.form_id)
),
},
}
)

View File

@@ -8,7 +8,7 @@ from pydantic import BaseModel
from werkzeug.exceptions import BadRequest, NotFound
from controllers.console.wraps import account_initialization_required, setup_required
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from graphon.model_runtime.utils.encoders import jsonable_encoder
from libs.login import current_account_with_tenant, login_required
from models import Account
from models.model import OAuthProviderApp

View File

@@ -25,13 +25,12 @@ from controllers.console.wraps import (
)
from core.errors.error import LLMBadRequestError, ProviderTokenNotInitError
from core.indexing_runner import IndexingRunner
from core.provider_manager import ProviderManager
from core.plugin.impl.model_runtime_factory import create_plugin_provider_manager
from core.rag.datasource.vdb.vector_type import VectorType
from core.rag.extractor.entity.datasource_type import DatasourceType
from core.rag.extractor.entity.extract_setting import ExtractSetting, NotionInfo, WebsiteInfo
from core.rag.index_processor.constant.index_type import IndexTechniqueType
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from dify_graph.model_runtime.entities.model_entities import ModelType
from extensions.ext_database import db
from fields.app_fields import app_detail_kernel_fields, related_app_list
from fields.dataset_fields import (
@@ -52,6 +51,7 @@ from fields.dataset_fields import (
weighted_score_fields,
)
from fields.document_fields import document_status_fields
from graphon.model_runtime.entities.model_entities import ModelType
from libs.login import current_account_with_tenant, login_required
from models import ApiToken, Dataset, Document, DocumentSegment, UploadFile
from models.dataset import DatasetPermission, DatasetPermissionEnum
@@ -332,7 +332,7 @@ class DatasetListApi(Resource):
)
# check embedding setting
provider_manager = ProviderManager()
provider_manager = create_plugin_provider_manager(tenant_id=current_tenant_id)
configurations = provider_manager.get_configurations(tenant_id=current_tenant_id)
embedding_models = configurations.get_models(model_type=ModelType.TEXT_EMBEDDING, only_active=True)
@@ -446,7 +446,7 @@ class DatasetApi(Resource):
data.update({"partial_member_list": part_users_list})
# check embedding setting
provider_manager = ProviderManager()
provider_manager = create_plugin_provider_manager(tenant_id=current_tenant_id)
configurations = provider_manager.get_configurations(tenant_id=current_tenant_id)
embedding_models = configurations.get_models(model_type=ModelType.TEXT_EMBEDDING, only_active=True)

View File

@@ -28,8 +28,6 @@ from core.plugin.impl.exc import PluginDaemonClientSideError
from core.rag.extractor.entity.datasource_type import DatasourceType
from core.rag.extractor.entity.extract_setting import ExtractSetting, NotionInfo, WebsiteInfo
from core.rag.index_processor.constant.index_type import IndexTechniqueType
from dify_graph.model_runtime.entities.model_entities import ModelType
from dify_graph.model_runtime.errors.invoke import InvokeAuthorizationError
from extensions.ext_database import db
from fields.dataset_fields import dataset_fields
from fields.document_fields import (
@@ -39,6 +37,8 @@ from fields.document_fields import (
document_status_fields,
document_with_segments_fields,
)
from graphon.model_runtime.entities.model_entities import ModelType
from graphon.model_runtime.errors.invoke import InvokeAuthorizationError
from libs.datetime_utils import naive_utc_now
from libs.login import current_account_with_tenant, login_required
from models import DatasetProcessRule, Document, DocumentSegment, UploadFile
@@ -454,7 +454,7 @@ class DatasetInitApi(Resource):
if knowledge_config.embedding_model is None or knowledge_config.embedding_model_provider is None:
raise ValueError("embedding model and embedding model provider are required for high quality indexing.")
try:
model_manager = ModelManager()
model_manager = ModelManager.for_tenant(tenant_id=current_tenant_id)
model_manager.get_model_instance(
tenant_id=current_tenant_id,
provider=knowledge_config.embedding_model_provider,

View File

@@ -27,10 +27,10 @@ from controllers.console.wraps import (
from core.errors.error import LLMBadRequestError, ProviderTokenNotInitError
from core.model_manager import ModelManager
from core.rag.index_processor.constant.index_type import IndexTechniqueType
from dify_graph.model_runtime.entities.model_entities import ModelType
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from fields.segment_fields import child_chunk_fields, segment_fields
from graphon.model_runtime.entities.model_entities import ModelType
from libs.helper import escape_like_pattern
from libs.login import current_account_with_tenant, login_required
from models.dataset import ChildChunk, DocumentSegment
@@ -283,7 +283,7 @@ class DatasetDocumentSegmentApi(Resource):
if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY:
# check embedding model setting
try:
model_manager = ModelManager()
model_manager = ModelManager.for_tenant(tenant_id=current_tenant_id)
model_manager.get_model_instance(
tenant_id=current_tenant_id,
provider=dataset.embedding_model_provider,
@@ -336,7 +336,7 @@ class DatasetDocumentSegmentAddApi(Resource):
# check embedding model setting
if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY:
try:
model_manager = ModelManager()
model_manager = ModelManager.for_tenant(tenant_id=current_tenant_id)
model_manager.get_model_instance(
tenant_id=current_tenant_id,
provider=dataset.embedding_model_provider,
@@ -387,7 +387,7 @@ class DatasetDocumentSegmentUpdateApi(Resource):
if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY:
# check embedding model setting
try:
model_manager = ModelManager()
model_manager = ModelManager.for_tenant(tenant_id=current_tenant_id)
model_manager.get_model_instance(
tenant_id=current_tenant_id,
provider=dataset.embedding_model_provider,
@@ -572,7 +572,7 @@ class ChildChunkAddApi(Resource):
# check embedding model setting
if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY:
try:
model_manager = ModelManager()
model_manager = ModelManager.for_tenant(tenant_id=current_tenant_id)
model_manager.get_model_instance(
tenant_id=current_tenant_id,
provider=dataset.embedding_model_provider,

View File

@@ -25,7 +25,7 @@ from libs.login import current_account_with_tenant, login_required
from services.dataset_service import DatasetService
from services.external_knowledge_service import ExternalDatasetService
from services.hit_testing_service import HitTestingService
from services.knowledge_service import ExternalDatasetTestService
from services.knowledge_service import BedrockRetrievalSetting, ExternalDatasetTestService
def _build_dataset_detail_model():
@@ -86,7 +86,7 @@ class ExternalHitTestingPayload(BaseModel):
class BedrockRetrievalPayload(BaseModel):
retrieval_setting: dict[str, object]
retrieval_setting: "BedrockRetrievalSetting"
query: str
knowledge_id: str

View File

@@ -19,8 +19,8 @@ from core.errors.error import (
ProviderTokenNotInitError,
QuotaExceededError,
)
from dify_graph.model_runtime.errors.invoke import InvokeError
from fields.hit_testing_fields import hit_testing_record_fields
from graphon.model_runtime.errors.invoke import InvokeError
from libs.login import current_user
from models.account import Account
from services.dataset_service import DatasetService

View File

@@ -10,8 +10,8 @@ from controllers.common.schema import register_schema_models
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, edit_permission_required, setup_required
from core.plugin.impl.oauth import OAuthHandler
from dify_graph.model_runtime.errors.validate import CredentialsValidateFailedError
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from graphon.model_runtime.errors.validate import CredentialsValidateFailedError
from graphon.model_runtime.utils.encoders import jsonable_encoder
from libs.login import current_account_with_tenant, login_required
from models.provider_ids import DatasourceProviderID
from services.datasource_provider_service import DatasourceProviderService

View File

@@ -21,11 +21,12 @@ from controllers.console.app.workflow_draft_variable import (
from controllers.console.datasets.wraps import get_rag_pipeline
from controllers.console.wraps import account_initialization_required, setup_required
from controllers.web.error import InvalidArgumentError, NotFoundError
from dify_graph.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
from dify_graph.variables.types import SegmentType
from core.app.file_access import DatabaseFileAccessController
from core.workflow.variable_prefixes import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
from extensions.ext_database import db
from factories.file_factory import build_from_mapping, build_from_mappings
from factories.variable_factory import build_segment_with_type
from graphon.variables.types import SegmentType
from libs.login import current_user, login_required
from models import Account
from models.dataset import Pipeline
@@ -33,6 +34,7 @@ from services.rag_pipeline.rag_pipeline import RagPipelineService
from services.workflow_draft_variable_service import WorkflowDraftVariableList, WorkflowDraftVariableService
logger = logging.getLogger(__name__)
_file_access_controller = DatabaseFileAccessController()
def _create_pagination_parser():
@@ -223,13 +225,21 @@ class RagPipelineVariableApi(Resource):
if variable.value_type == SegmentType.FILE:
if not isinstance(raw_value, dict):
raise InvalidArgumentError(description=f"expected dict for file, got {type(raw_value)}")
raw_value = build_from_mapping(mapping=raw_value, tenant_id=pipeline.tenant_id)
raw_value = build_from_mapping(
mapping=raw_value,
tenant_id=pipeline.tenant_id,
access_controller=_file_access_controller,
)
elif variable.value_type == SegmentType.ARRAY_FILE:
if not isinstance(raw_value, list):
raise InvalidArgumentError(description=f"expected list for files, got {type(raw_value)}")
if len(raw_value) > 0 and not isinstance(raw_value[0], dict):
raise InvalidArgumentError(description=f"expected dict for files[0], got {type(raw_value)}")
raw_value = build_from_mappings(mappings=raw_value, tenant_id=pipeline.tenant_id)
raw_value = build_from_mappings(
mappings=raw_value,
tenant_id=pipeline.tenant_id,
access_controller=_file_access_controller,
)
new_value = build_segment_with_type(variable.value_type, raw_value)
draft_var_srv.update_variable(variable, name=new_name, value=new_value)
db.session.commit()

View File

@@ -37,9 +37,9 @@ from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpErr
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.apps.pipeline.pipeline_generator import PipelineGenerator
from core.app.entities.app_invoke_entities import InvokeFrom
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from extensions.ext_database import db
from factories import variable_factory
from graphon.model_runtime.utils.encoders import jsonable_encoder
from libs import helper
from libs.helper import TimestampField, UUIDStrOrEmpty
from libs.login import current_account_with_tenant, current_user, login_required

View File

@@ -19,7 +19,7 @@ from controllers.console.app.error import (
)
from controllers.console.explore.wraps import InstalledAppResource
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from dify_graph.model_runtime.errors.invoke import InvokeError
from graphon.model_runtime.errors.invoke import InvokeError
from services.audio_service import AudioService
from services.errors.audio import (
AudioTooLargeServiceError,

View File

@@ -24,8 +24,8 @@ from core.errors.error import (
ProviderTokenNotInitError,
QuotaExceededError,
)
from dify_graph.model_runtime.errors.invoke import InvokeError
from extensions.ext_database import db
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.datetime_utils import naive_utc_now
from libs.login import current_user

View File

@@ -21,9 +21,9 @@ from controllers.console.explore.error import (
from controllers.console.explore.wraps import InstalledAppResource
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from dify_graph.model_runtime.errors.invoke import InvokeError
from fields.conversation_fields import ResultResponse
from fields.message_fields import MessageInfiniteScrollPagination, MessageListItem, SuggestedQuestionsResponse
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import UUIDStrOrEmpty
from libs.login import current_account_with_tenant

View File

@@ -42,8 +42,6 @@ from core.errors.error import (
ProviderTokenNotInitError,
QuotaExceededError,
)
from dify_graph.graph_engine.manager import GraphEngineManager
from dify_graph.model_runtime.errors.invoke import InvokeError
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from fields.app_fields import (
@@ -61,6 +59,8 @@ from fields.workflow_fields import (
workflow_fields,
workflow_partial_fields,
)
from graphon.graph_engine.manager import GraphEngineManager
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import uuid_value
from libs.login import current_user

View File

@@ -21,9 +21,9 @@ from core.errors.error import (
ProviderTokenNotInitError,
QuotaExceededError,
)
from dify_graph.graph_engine.manager import GraphEngineManager
from dify_graph.model_runtime.errors.invoke import InvokeError
from extensions.ext_redis import redis_client
from graphon.graph_engine.manager import GraphEngineManager
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.login import current_account_with_tenant
from models.model import AppMode, InstalledApp

View File

@@ -15,6 +15,7 @@ from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, setup_required
from controllers.web.error import InvalidArgumentError, NotFoundError
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
from core.app.apps.base_app_generator import BaseAppGenerator
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.apps.message_generator import MessageGenerator
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
@@ -166,6 +167,7 @@ class ConsoleWorkflowEventsApi(Resource):
else:
msg_generator = MessageGenerator()
generator: BaseAppGenerator
if app.mode == AppMode.ADVANCED_CHAT:
generator = AdvancedChatAppGenerator()
elif app.mode == AppMode.WORKFLOW:
@@ -202,7 +204,7 @@ class ConsoleWorkflowEventsApi(Resource):
)
def _retrieve_app_for_workflow_run(session: Session, workflow_run: WorkflowRun):
def _retrieve_app_for_workflow_run(session: Session, workflow_run: WorkflowRun) -> App:
query = select(App).where(
App.id == workflow_run.app_id,
App.tenant_id == workflow_run.tenant_id,

View File

@@ -13,9 +13,9 @@ from controllers.common.errors import (
)
from controllers.console import console_ns
from core.helper import ssrf_proxy
from dify_graph.file import helpers as file_helpers
from extensions.ext_database import db
from fields.file_fields import FileWithSignedUrl, RemoteFileInfo
from graphon.file import helpers as file_helpers
from libs.login import current_account_with_tenant, login_required
from services.file_service import FileService

View File

@@ -2,7 +2,7 @@ from flask_restx import Resource, fields
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, setup_required
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from graphon.model_runtime.utils.encoders import jsonable_encoder
from libs.login import current_account_with_tenant, login_required
from services.agent_service import AgentService

View File

@@ -8,7 +8,7 @@ from controllers.common.schema import register_schema_models
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, is_admin_or_owner_required, setup_required
from core.plugin.impl.exc import PluginPermissionDeniedError
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from graphon.model_runtime.utils.encoders import jsonable_encoder
from libs.login import current_account_with_tenant, login_required
from services.plugin.endpoint_service import EndpointService

View File

@@ -5,8 +5,8 @@ from werkzeug.exceptions import Forbidden
from controllers.common.schema import register_schema_models
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, setup_required
from dify_graph.model_runtime.entities.model_entities import ModelType
from dify_graph.model_runtime.errors.validate import CredentialsValidateFailedError
from graphon.model_runtime.entities.model_entities import ModelType
from graphon.model_runtime.errors.validate import CredentialsValidateFailedError
from libs.login import current_account_with_tenant, login_required
from models import TenantAccountRole
from services.model_load_balancing_service import ModelLoadBalancingService

View File

@@ -7,9 +7,9 @@ from pydantic import BaseModel, Field, field_validator
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, is_admin_or_owner_required, setup_required
from dify_graph.model_runtime.entities.model_entities import ModelType
from dify_graph.model_runtime.errors.validate import CredentialsValidateFailedError
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from graphon.model_runtime.entities.model_entities import ModelType
from graphon.model_runtime.errors.validate import CredentialsValidateFailedError
from graphon.model_runtime.utils.encoders import jsonable_encoder
from libs.helper import uuid_value
from libs.login import current_account_with_tenant, login_required
from services.billing_service import BillingService

View File

@@ -8,9 +8,9 @@ from pydantic import BaseModel, Field, field_validator
from controllers.common.schema import register_enum_models, register_schema_models
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, is_admin_or_owner_required, setup_required
from dify_graph.model_runtime.entities.model_entities import ModelType
from dify_graph.model_runtime.errors.validate import CredentialsValidateFailedError
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from graphon.model_runtime.entities.model_entities import ModelType
from graphon.model_runtime.errors.validate import CredentialsValidateFailedError
from graphon.model_runtime.utils.encoders import jsonable_encoder
from libs.helper import uuid_value
from libs.login import current_account_with_tenant, login_required
from services.model_load_balancing_service import ModelLoadBalancingService
@@ -282,14 +282,18 @@ class ModelProviderModelCredentialApi(Resource):
)
if args.config_from == "predefined-model":
available_credentials = model_provider_service.provider_manager.get_provider_available_credentials(
tenant_id=tenant_id, provider_name=provider
available_credentials = model_provider_service.get_provider_available_credentials(
tenant_id=tenant_id,
provider=provider,
)
else:
# Normalize model_type to the origin value stored in DB (e.g., "text-generation" for LLM)
normalized_model_type = args.model_type.to_origin_model_type()
available_credentials = model_provider_service.provider_manager.get_provider_model_available_credentials(
tenant_id=tenant_id, provider_name=provider, model_type=normalized_model_type, model_name=args.model
available_credentials = model_provider_service.get_provider_model_available_credentials(
tenant_id=tenant_id,
provider=provider,
model_type=normalized_model_type,
model=args.model,
)
return jsonable_encoder(

View File

@@ -14,7 +14,7 @@ from controllers.console import console_ns
from controllers.console.workspace import plugin_permission_required
from controllers.console.wraps import account_initialization_required, is_admin_or_owner_required, setup_required
from core.plugin.impl.exc import PluginDaemonClientSideError
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from graphon.model_runtime.utils.encoders import jsonable_encoder
from libs.login import current_account_with_tenant, login_required
from models.account import TenantPluginAutoUpgradeStrategy, TenantPluginPermission
from services.plugin.plugin_auto_upgrade_service import PluginAutoUpgradeService
@@ -200,7 +200,7 @@ class PluginDebuggingKeyApi(Resource):
"port": dify_config.PLUGIN_REMOTE_INSTALL_PORT,
}
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
@console_ns.route("/workspaces/current/plugin/list")
@@ -215,7 +215,7 @@ class PluginListApi(Resource):
try:
plugins_with_total = PluginService.list_with_total(tenant_id, args.page, args.page_size)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
return jsonable_encoder({"plugins": plugins_with_total.list, "total": plugins_with_total.total})
@@ -232,7 +232,7 @@ class PluginListLatestVersionsApi(Resource):
try:
versions = PluginService.list_latest_versions(args.plugin_ids)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
return jsonable_encoder({"versions": versions})
@@ -251,7 +251,7 @@ class PluginListInstallationsFromIdsApi(Resource):
try:
plugins = PluginService.list_installations_from_ids(tenant_id, args.plugin_ids)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
return jsonable_encoder({"plugins": plugins})
@@ -266,7 +266,7 @@ class PluginIconApi(Resource):
try:
icon_bytes, mimetype = PluginService.get_asset(args.tenant_id, args.filename)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
icon_cache_max_age = dify_config.TOOL_ICON_CACHE_MAX_AGE
return send_file(io.BytesIO(icon_bytes), mimetype=mimetype, max_age=icon_cache_max_age)
@@ -286,7 +286,7 @@ class PluginAssetApi(Resource):
binary = PluginService.extract_asset(tenant_id, args.plugin_unique_identifier, args.file_name)
return send_file(io.BytesIO(binary), mimetype="application/octet-stream")
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
@console_ns.route("/workspaces/current/plugin/upload/pkg")
@@ -303,7 +303,7 @@ class PluginUploadFromPkgApi(Resource):
try:
response = PluginService.upload_pkg(tenant_id, content)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
return jsonable_encoder(response)
@@ -323,7 +323,7 @@ class PluginUploadFromGithubApi(Resource):
try:
response = PluginService.upload_pkg_from_github(tenant_id, args.repo, args.version, args.package)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
return jsonable_encoder(response)
@@ -361,7 +361,7 @@ class PluginInstallFromPkgApi(Resource):
try:
response = PluginService.install_from_local_pkg(tenant_id, args.plugin_unique_identifiers)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
return jsonable_encoder(response)
@@ -387,7 +387,7 @@ class PluginInstallFromGithubApi(Resource):
args.package,
)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
return jsonable_encoder(response)
@@ -407,7 +407,7 @@ class PluginInstallFromMarketplaceApi(Resource):
try:
response = PluginService.install_from_marketplace_pkg(tenant_id, args.plugin_unique_identifiers)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
return jsonable_encoder(response)
@@ -433,7 +433,7 @@ class PluginFetchMarketplacePkgApi(Resource):
}
)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
@console_ns.route("/workspaces/current/plugin/fetch-manifest")
@@ -453,7 +453,7 @@ class PluginFetchManifestApi(Resource):
{"manifest": PluginService.fetch_plugin_manifest(tenant_id, args.plugin_unique_identifier).model_dump()}
)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
@console_ns.route("/workspaces/current/plugin/tasks")
@@ -471,7 +471,7 @@ class PluginFetchInstallTasksApi(Resource):
try:
return jsonable_encoder({"tasks": PluginService.fetch_install_tasks(tenant_id, args.page, args.page_size)})
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
@console_ns.route("/workspaces/current/plugin/tasks/<task_id>")
@@ -486,7 +486,7 @@ class PluginFetchInstallTaskApi(Resource):
try:
return jsonable_encoder({"task": PluginService.fetch_install_task(tenant_id, task_id)})
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
@console_ns.route("/workspaces/current/plugin/tasks/<task_id>/delete")
@@ -501,7 +501,7 @@ class PluginDeleteInstallTaskApi(Resource):
try:
return {"success": PluginService.delete_install_task(tenant_id, task_id)}
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
@console_ns.route("/workspaces/current/plugin/tasks/delete_all")
@@ -516,7 +516,7 @@ class PluginDeleteAllInstallTaskItemsApi(Resource):
try:
return {"success": PluginService.delete_all_install_task_items(tenant_id)}
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
@console_ns.route("/workspaces/current/plugin/tasks/<task_id>/delete/<path:identifier>")
@@ -531,7 +531,7 @@ class PluginDeleteInstallTaskItemApi(Resource):
try:
return {"success": PluginService.delete_install_task_item(tenant_id, task_id, identifier)}
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
@console_ns.route("/workspaces/current/plugin/upgrade/marketplace")
@@ -553,7 +553,7 @@ class PluginUpgradeFromMarketplaceApi(Resource):
)
)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
@console_ns.route("/workspaces/current/plugin/upgrade/github")
@@ -580,7 +580,7 @@ class PluginUpgradeFromGithubApi(Resource):
)
)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
@console_ns.route("/workspaces/current/plugin/uninstall")
@@ -598,7 +598,7 @@ class PluginUninstallApi(Resource):
try:
return {"success": PluginService.uninstall(tenant_id, args.plugin_installation_id)}
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
@console_ns.route("/workspaces/current/plugin/permission/change")
@@ -674,7 +674,7 @@ class PluginFetchDynamicSelectOptionsApi(Resource):
provider_type=args.provider_type,
)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
return jsonable_encoder({"options": options})
@@ -705,7 +705,7 @@ class PluginFetchDynamicSelectOptionsWithCredentialsApi(Resource):
credentials=args.credentials,
)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return {"code": "plugin_error", "message": e.description}, 400
return jsonable_encoder({"options": options})

View File

@@ -26,8 +26,8 @@ from core.mcp.mcp_client import MCPClient
from core.plugin.entities.plugin_daemon import CredentialType
from core.plugin.impl.oauth import OAuthHandler
from core.tools.entities.tool_entities import ApiProviderSchemaType, WorkflowToolParameterConfiguration
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from extensions.ext_database import db
from graphon.model_runtime.utils.encoders import jsonable_encoder
from libs.helper import alphanumeric, uuid_value
from libs.login import current_account_with_tenant, login_required
from models.provider_ids import ToolProviderID

View File

@@ -14,8 +14,8 @@ from core.plugin.entities.plugin_daemon import CredentialType
from core.plugin.impl.oauth import OAuthHandler
from core.trigger.entities.entities import SubscriptionBuilderUpdater
from core.trigger.trigger_manager import TriggerManager
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from extensions.ext_database import db
from graphon.model_runtime.utils.encoders import jsonable_encoder
from libs.login import current_user, login_required
from models.account import Account
from models.provider_ids import TriggerProviderID

View File

@@ -70,22 +70,25 @@ class ToolFileApi(Resource):
except Exception:
raise UnsupportedFileTypeError()
mime_type = tool_file.mime_type
filename = tool_file.filename
response = Response(
stream,
mimetype=tool_file.mimetype,
mimetype=mime_type,
direct_passthrough=True,
headers={},
)
if tool_file.size > 0:
response.headers["Content-Length"] = str(tool_file.size)
if args.as_attachment:
encoded_filename = quote(tool_file.name)
if args.as_attachment and filename:
encoded_filename = quote(filename)
response.headers["Content-Disposition"] = f"attachment; filename*=UTF-8''{encoded_filename}"
enforce_download_for_html(
response,
mime_type=tool_file.mimetype,
filename=tool_file.name,
mime_type=mime_type,
filename=filename,
extension=extension,
)

View File

@@ -7,8 +7,8 @@ from pydantic import BaseModel, Field
from werkzeug.exceptions import Forbidden
import services
from core.tools.signature import verify_plugin_file_signature
from core.tools.tool_file_manager import ToolFileManager
from dify_graph.file.helpers import verify_plugin_file_signature
from fields.file_fields import FileResponse
from ..common.errors import (

View File

@@ -16,12 +16,14 @@ api = ExternalApi(
inner_api_ns = Namespace("inner_api", description="Internal API operations", path="/")
from . import mail as _mail
from .app import dsl as _app_dsl
from .plugin import plugin as _plugin
from .workspace import workspace as _workspace
api.add_namespace(inner_api_ns)
__all__ = [
"_app_dsl",
"_mail",
"_plugin",
"_workspace",

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,110 @@
"""Inner API endpoints for app DSL import/export.
Called by the enterprise admin-api service. Import requires ``creator_email``
to attribute the created app; workspace/membership validation is done by the
Go admin-api caller.
"""
from flask import request
from flask_restx import Resource
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from controllers.common.schema import register_schema_model
from controllers.console.wraps import setup_required
from controllers.inner_api import inner_api_ns
from controllers.inner_api.wraps import enterprise_inner_api_only
from extensions.ext_database import db
from models import Account, App
from models.account import AccountStatus
from services.app_dsl_service import AppDslService, ImportMode, ImportStatus
class InnerAppDSLImportPayload(BaseModel):
yaml_content: str = Field(description="YAML DSL content")
creator_email: str = Field(description="Email of the workspace member who will own the imported app")
name: str | None = Field(default=None, description="Override app name from DSL")
description: str | None = Field(default=None, description="Override app description from DSL")
register_schema_model(inner_api_ns, InnerAppDSLImportPayload)
@inner_api_ns.route("/enterprise/workspaces/<string:workspace_id>/dsl/import")
class EnterpriseAppDSLImport(Resource):
@setup_required
@enterprise_inner_api_only
@inner_api_ns.doc("enterprise_app_dsl_import")
@inner_api_ns.expect(inner_api_ns.models[InnerAppDSLImportPayload.__name__])
@inner_api_ns.doc(
responses={
200: "Import completed",
202: "Import pending (DSL version mismatch requires confirmation)",
400: "Import failed (business error)",
404: "Creator account not found or inactive",
}
)
def post(self, workspace_id: str):
"""Import a DSL into a workspace on behalf of a specified creator."""
args = InnerAppDSLImportPayload.model_validate(inner_api_ns.payload or {})
account = _get_active_account(args.creator_email)
if account is None:
return {"message": f"account '{args.creator_email}' not found or inactive"}, 404
account.set_tenant_id(workspace_id)
with Session(db.engine) as session:
dsl_service = AppDslService(session)
result = dsl_service.import_app(
account=account,
import_mode=ImportMode.YAML_CONTENT,
yaml_content=args.yaml_content,
name=args.name,
description=args.description,
)
session.commit()
if result.status == ImportStatus.FAILED:
return result.model_dump(mode="json"), 400
if result.status == ImportStatus.PENDING:
return result.model_dump(mode="json"), 202
return result.model_dump(mode="json"), 200
@inner_api_ns.route("/enterprise/apps/<string:app_id>/dsl")
class EnterpriseAppDSLExport(Resource):
@setup_required
@enterprise_inner_api_only
@inner_api_ns.doc(
"enterprise_app_dsl_export",
responses={
200: "Export successful",
404: "App not found",
},
)
def get(self, app_id: str):
"""Export an app's DSL as YAML."""
include_secret = request.args.get("include_secret", "false").lower() == "true"
app_model = db.session.query(App).filter_by(id=app_id).first()
if not app_model:
return {"message": "app not found"}, 404
data = AppDslService.export_dsl(
app_model=app_model,
include_secret=include_secret,
)
return {"data": data}, 200
def _get_active_account(email: str) -> Account | None:
"""Look up an active account by email.
Workspace membership is already validated by the Go admin-api caller.
"""
account = db.session.query(Account).filter_by(email=email).first()
if account is None or account.status != AccountStatus.ACTIVE:
return None
return account

View File

@@ -28,8 +28,8 @@ from core.plugin.entities.request import (
RequestRequestUploadFile,
)
from core.tools.entities.tool_entities import ToolProviderType
from dify_graph.file.helpers import get_signed_file_url_for_plugin
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from core.tools.signature import get_signed_file_url_for_plugin
from graphon.model_runtime.utils.encoders import jsonable_encoder
from libs.helper import length_prefixed_response
from models import Account, Tenant
from models.model import EndUser

View File

@@ -9,8 +9,8 @@ from controllers.common.schema import register_schema_model
from controllers.mcp import mcp_ns
from core.mcp import types as mcp_types
from core.mcp.server.streamable_http import handle_mcp_request
from dify_graph.variables.input_entities import VariableEntity
from extensions.ext_database import db
from graphon.variables.input_entities import VariableEntity
from libs import helper
from models.enums import AppMCPServerStatus
from models.model import App, AppMCPServer, AppMode, EndUser

View File

@@ -21,7 +21,7 @@ from controllers.service_api.app.error import (
)
from controllers.service_api.wraps import FetchUserArg, WhereisUserArg, validate_app_token
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from dify_graph.model_runtime.errors.invoke import InvokeError
from graphon.model_runtime.errors.invoke import InvokeError
from models.model import App, EndUser
from services.audio_service import AudioService
from services.errors.audio import (

View File

@@ -28,7 +28,7 @@ from core.errors.error import (
QuotaExceededError,
)
from core.helper.trace_id_helper import get_external_trace_id
from dify_graph.model_runtime.errors.invoke import InvokeError
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import UUIDStrOrEmpty
from models.model import App, AppMode, EndUser

View File

@@ -4,6 +4,7 @@ from urllib.parse import quote
from flask import Response, request
from flask_restx import Resource
from pydantic import BaseModel, Field
from sqlalchemy import select
from controllers.common.file_response import enforce_download_for_html
from controllers.common.schema import register_schema_model
@@ -102,27 +103,27 @@ class FilePreviewApi(Resource):
raise FileAccessDeniedError("Invalid file or app identifier")
# First, find the MessageFile that references this upload file
message_file = db.session.query(MessageFile).where(MessageFile.upload_file_id == file_id).first()
message_file = db.session.scalar(select(MessageFile).where(MessageFile.upload_file_id == file_id).limit(1))
if not message_file:
raise FileNotFoundError("File not found in message context")
# Get the message and verify it belongs to the requesting app
message = (
db.session.query(Message).where(Message.id == message_file.message_id, Message.app_id == app_id).first()
message = db.session.scalar(
select(Message).where(Message.id == message_file.message_id, Message.app_id == app_id).limit(1)
)
if not message:
raise FileAccessDeniedError("File access denied: not owned by requesting app")
# Get the actual upload file record
upload_file = db.session.query(UploadFile).where(UploadFile.id == file_id).first()
upload_file = db.session.get(UploadFile, file_id)
if not upload_file:
raise FileNotFoundError("Upload file record not found")
# Additional security: verify tenant isolation
app = db.session.query(App).where(App.id == app_id).first()
app = db.session.get(App, app_id)
if app and upload_file.tenant_id != app.tenant_id:
raise FileAccessDeniedError("File access denied: tenant mismatch")

View File

@@ -1,4 +1,5 @@
from flask_restx import Resource
from sqlalchemy import select
from werkzeug.exceptions import Forbidden
from controllers.common.fields import Site as SiteResponse
@@ -28,7 +29,7 @@ class AppSiteApi(Resource):
Returns the site configuration for the application including theme, icons, and text.
"""
site = db.session.query(Site).where(Site.app_id == app_model.id).first()
site = db.session.scalar(select(Site).where(Site.app_id == app_model.id).limit(1))
if not site:
raise Forbidden()

View File

@@ -27,12 +27,12 @@ from core.errors.error import (
QuotaExceededError,
)
from core.helper.trace_id_helper import get_external_trace_id
from dify_graph.enums import WorkflowExecutionStatus
from dify_graph.graph_engine.manager import GraphEngineManager
from dify_graph.model_runtime.errors.invoke import InvokeError
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from fields.workflow_app_log_fields import build_workflow_app_log_pagination_model
from graphon.enums import WorkflowExecutionStatus
from graphon.graph_engine.manager import GraphEngineManager
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import OptionalTimestampField, TimestampField
from models.model import App, AppMode, EndUser

View File

@@ -14,11 +14,11 @@ from controllers.service_api.wraps import (
DatasetApiResource,
cloud_edition_billing_rate_limit_check,
)
from core.provider_manager import ProviderManager
from core.plugin.impl.model_runtime_factory import create_plugin_provider_manager
from core.rag.index_processor.constant.index_type import IndexTechniqueType
from dify_graph.model_runtime.entities.model_entities import ModelType
from fields.dataset_fields import dataset_detail_fields
from fields.tag_fields import DataSetTag
from graphon.model_runtime.entities.model_entities import ModelType
from libs.login import current_user
from models.account import Account
from models.dataset import DatasetPermissionEnum
@@ -140,10 +140,10 @@ class DatasetListApi(DatasetApiResource):
query.page, query.limit, tenant_id, current_user, query.keyword, query.tag_ids, query.include_all
)
# check embedding setting
provider_manager = ProviderManager()
assert isinstance(current_user, Account)
cid = current_user.current_tenant_id
assert cid is not None
provider_manager = create_plugin_provider_manager(tenant_id=cid)
configurations = provider_manager.get_configurations(tenant_id=cid)
embedding_models = configurations.get_models(model_type=ModelType.TEXT_EMBEDDING, only_active=True)
@@ -259,10 +259,10 @@ class DatasetApi(DatasetApiResource):
raise Forbidden(str(e))
data = cast(dict[str, Any], marshal(dataset, dataset_detail_fields))
# check embedding setting
provider_manager = ProviderManager()
assert isinstance(current_user, Account)
cid = current_user.current_tenant_id
assert cid is not None
provider_manager = create_plugin_provider_manager(tenant_id=cid)
configurations = provider_manager.get_configurations(tenant_id=cid)
embedding_models = configurations.get_models(model_type=ModelType.TEXT_EMBEDDING, only_active=True)

View File

@@ -6,7 +6,7 @@ from uuid import UUID
from flask import request, send_file
from flask_restx import marshal
from pydantic import BaseModel, Field, field_validator, model_validator
from sqlalchemy import desc, select
from sqlalchemy import desc, func, select
from werkzeug.exceptions import Forbidden, NotFound
import services
@@ -155,7 +155,9 @@ class DocumentAddByTextApi(DatasetApiResource):
dataset_id = str(dataset_id)
tenant_id = str(tenant_id)
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise ValueError("Dataset does not exist.")
@@ -238,7 +240,9 @@ class DocumentUpdateByTextApi(DatasetApiResource):
def post(self, tenant_id: str, dataset_id: UUID, document_id: UUID):
"""Update document by text."""
payload = DocumentTextUpdate.model_validate(service_api_ns.payload or {})
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == str(dataset_id)).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == str(dataset_id)).limit(1)
)
args = payload.model_dump(exclude_none=True)
if not dataset:
raise ValueError("Dataset does not exist.")
@@ -315,7 +319,9 @@ class DocumentAddByFileApi(DatasetApiResource):
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id):
"""Create document by upload file."""
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise ValueError("Dataset does not exist.")
@@ -425,7 +431,9 @@ class DocumentUpdateByFileApi(DatasetApiResource):
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id, document_id):
"""Update document by upload file."""
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise ValueError("Dataset does not exist.")
@@ -515,7 +523,9 @@ class DocumentListApi(DatasetApiResource):
dataset_id = str(dataset_id)
tenant_id = str(tenant_id)
query_params = DocumentListQuery.model_validate(request.args.to_dict())
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")
@@ -609,7 +619,9 @@ class DocumentIndexingStatusApi(DatasetApiResource):
batch = str(batch)
tenant_id = str(tenant_id)
# get dataset
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")
# get documents
@@ -619,20 +631,23 @@ class DocumentIndexingStatusApi(DatasetApiResource):
documents_status = []
for document in documents:
completed_segments = (
db.session.query(DocumentSegment)
.where(
DocumentSegment.completed_at.isnot(None),
DocumentSegment.document_id == str(document.id),
DocumentSegment.status != SegmentStatus.RE_SEGMENT,
db.session.scalar(
select(func.count(DocumentSegment.id)).where(
DocumentSegment.completed_at.isnot(None),
DocumentSegment.document_id == str(document.id),
DocumentSegment.status != SegmentStatus.RE_SEGMENT,
)
)
.count()
or 0
)
total_segments = (
db.session.query(DocumentSegment)
.where(
DocumentSegment.document_id == str(document.id), DocumentSegment.status != SegmentStatus.RE_SEGMENT
db.session.scalar(
select(func.count(DocumentSegment.id)).where(
DocumentSegment.document_id == str(document.id),
DocumentSegment.status != SegmentStatus.RE_SEGMENT,
)
)
.count()
or 0
)
# Create a dictionary with document attributes and additional fields
document_dict = {
@@ -822,7 +837,9 @@ class DocumentApi(DatasetApiResource):
tenant_id = str(tenant_id)
# get dataset info
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise ValueError("Dataset does not exist.")

View File

@@ -3,6 +3,7 @@ from typing import Any
from flask import request
from flask_restx import marshal
from pydantic import BaseModel, Field
from sqlalchemy import select
from werkzeug.exceptions import NotFound
from configs import dify_config
@@ -18,9 +19,9 @@ from controllers.service_api.wraps import (
from core.errors.error import LLMBadRequestError, ProviderTokenNotInitError
from core.model_manager import ModelManager
from core.rag.index_processor.constant.index_type import IndexTechniqueType
from dify_graph.model_runtime.entities.model_entities import ModelType
from extensions.ext_database import db
from fields.segment_fields import child_chunk_fields, segment_fields
from graphon.model_runtime.entities.model_entities import ModelType
from libs.login import current_account_with_tenant
from models.dataset import Dataset
from services.dataset_service import DatasetService, DocumentService, SegmentService
@@ -92,7 +93,9 @@ class SegmentApi(DatasetApiResource):
_, current_tenant_id = current_account_with_tenant()
"""Create single segment."""
# check dataset
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")
# check document
@@ -106,7 +109,7 @@ class SegmentApi(DatasetApiResource):
# check embedding model setting
if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY:
try:
model_manager = ModelManager()
model_manager = ModelManager.for_tenant(tenant_id=current_tenant_id)
model_manager.get_model_instance(
tenant_id=current_tenant_id,
provider=dataset.embedding_model_provider,
@@ -150,7 +153,9 @@ class SegmentApi(DatasetApiResource):
# check dataset
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=20, type=int)
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")
# check document
@@ -160,7 +165,7 @@ class SegmentApi(DatasetApiResource):
# check embedding model setting
if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY:
try:
model_manager = ModelManager()
model_manager = ModelManager.for_tenant(tenant_id=current_tenant_id)
model_manager.get_model_instance(
tenant_id=current_tenant_id,
provider=dataset.embedding_model_provider,
@@ -220,7 +225,9 @@ class DatasetSegmentApi(DatasetApiResource):
def delete(self, tenant_id: str, dataset_id: str, document_id: str, segment_id: str):
_, current_tenant_id = current_account_with_tenant()
# check dataset
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")
# check user's model setting
@@ -254,7 +261,9 @@ class DatasetSegmentApi(DatasetApiResource):
def post(self, tenant_id: str, dataset_id: str, document_id: str, segment_id: str):
_, current_tenant_id = current_account_with_tenant()
# check dataset
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")
# check user's model setting
@@ -266,7 +275,7 @@ class DatasetSegmentApi(DatasetApiResource):
if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY:
# check embedding model setting
try:
model_manager = ModelManager()
model_manager = ModelManager.for_tenant(tenant_id=current_tenant_id)
model_manager.get_model_instance(
tenant_id=current_tenant_id,
provider=dataset.embedding_model_provider,
@@ -301,7 +310,9 @@ class DatasetSegmentApi(DatasetApiResource):
def get(self, tenant_id: str, dataset_id: str, document_id: str, segment_id: str):
_, current_tenant_id = current_account_with_tenant()
# check dataset
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")
# check user's model setting
@@ -344,7 +355,9 @@ class ChildChunkApi(DatasetApiResource):
_, current_tenant_id = current_account_with_tenant()
"""Create child chunk."""
# check dataset
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")
@@ -361,7 +374,7 @@ class ChildChunkApi(DatasetApiResource):
# check embedding model setting
if dataset.indexing_technique == IndexTechniqueType.HIGH_QUALITY:
try:
model_manager = ModelManager()
model_manager = ModelManager.for_tenant(tenant_id=current_tenant_id)
model_manager.get_model_instance(
tenant_id=current_tenant_id,
provider=dataset.embedding_model_provider,
@@ -402,7 +415,9 @@ class ChildChunkApi(DatasetApiResource):
_, current_tenant_id = current_account_with_tenant()
"""Get child chunks."""
# check dataset
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")
@@ -468,7 +483,9 @@ class DatasetChildChunkApi(DatasetApiResource):
_, current_tenant_id = current_account_with_tenant()
"""Delete child chunk."""
# check dataset
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")
@@ -527,7 +544,9 @@ class DatasetChildChunkApi(DatasetApiResource):
_, current_tenant_id = current_account_with_tenant()
"""Update child chunk."""
# check dataset
dataset = db.session.query(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")

View File

@@ -3,7 +3,7 @@ from flask_restx import Resource
from controllers.service_api import service_api_ns
from controllers.service_api.wraps import validate_dataset_token
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from graphon.model_runtime.utils.encoders import jsonable_encoder
from services.model_provider_service import ModelProviderService

View File

@@ -9,6 +9,7 @@ from flask import current_app, request
from flask_login import user_logged_in
from flask_restx import Resource
from pydantic import BaseModel
from sqlalchemy import select
from werkzeug.exceptions import Forbidden, NotFound, Unauthorized
from enums.cloud_plan import CloudPlan
@@ -62,7 +63,7 @@ def validate_app_token(
def decorated_view(*args: P.args, **kwargs: P.kwargs) -> R:
api_token = validate_and_get_api_token("app")
app_model = db.session.query(App).where(App.id == api_token.app_id).first()
app_model = db.session.get(App, api_token.app_id)
if not app_model:
raise Forbidden("The app no longer exists.")
@@ -72,7 +73,7 @@ def validate_app_token(
if not app_model.enable_api:
raise Forbidden("The app's API service has been disabled.")
tenant = db.session.query(Tenant).where(Tenant.id == app_model.tenant_id).first()
tenant = db.session.get(Tenant, app_model.tenant_id)
if tenant is None:
raise ValueError("Tenant does not exist.")
if tenant.status == TenantStatus.ARCHIVE:
@@ -106,8 +107,8 @@ def validate_app_token(
else:
# For service API without end-user context, ensure an Account is logged in
# so services relying on current_account_with_tenant() work correctly.
tenant_owner_info = (
db.session.query(Tenant, Account)
tenant_owner_info = db.session.execute(
select(Tenant, Account)
.join(TenantAccountJoin, Tenant.id == TenantAccountJoin.tenant_id)
.join(Account, TenantAccountJoin.account_id == Account.id)
.where(
@@ -115,8 +116,7 @@ def validate_app_token(
TenantAccountJoin.role == "owner",
Tenant.status == TenantStatus.NORMAL,
)
.one_or_none()
)
).one_or_none()
if tenant_owner_info:
tenant_model, account = tenant_owner_info
@@ -277,29 +277,28 @@ def validate_dataset_token(
# Validate dataset if dataset_id is provided
if dataset_id:
dataset_id = str(dataset_id)
dataset = (
db.session.query(Dataset)
dataset = db.session.scalar(
select(Dataset)
.where(
Dataset.id == dataset_id,
Dataset.tenant_id == api_token.tenant_id,
)
.first()
.limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")
if not dataset.enable_api:
raise Forbidden("Dataset api access is not enabled.")
tenant_account_join = (
db.session.query(Tenant, TenantAccountJoin)
tenant_account_join = db.session.execute(
select(Tenant, TenantAccountJoin)
.where(Tenant.id == api_token.tenant_id)
.where(TenantAccountJoin.tenant_id == Tenant.id)
.where(TenantAccountJoin.role.in_(["owner"]))
.where(Tenant.status == TenantStatus.NORMAL)
.one_or_none()
) # TODO: only owner information is required, so only one is returned.
).one_or_none() # TODO: only owner information is required, so only one is returned.
if tenant_account_join:
tenant, ta = tenant_account_join
account = db.session.query(Account).where(Account.id == ta.account_id).first()
account = db.session.get(Account, ta.account_id)
# Login admin
if account:
account.current_tenant = tenant
@@ -360,7 +359,9 @@ class DatasetApiResource(Resource):
method_decorators = [validate_dataset_token]
def get_dataset(self, dataset_id: str, tenant_id: str) -> Dataset:
dataset = db.session.query(Dataset).where(Dataset.id == dataset_id, Dataset.tenant_id == tenant_id).first()
dataset = db.session.scalar(
select(Dataset).where(Dataset.id == dataset_id, Dataset.tenant_id == tenant_id).limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")

View File

@@ -20,7 +20,7 @@ from controllers.web.error import (
)
from controllers.web.wraps import WebApiResource
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from dify_graph.model_runtime.errors.invoke import InvokeError
from graphon.model_runtime.errors.invoke import InvokeError
from libs.helper import uuid_value
from models.model import App
from services.audio_service import AudioService

View File

@@ -25,7 +25,7 @@ from core.errors.error import (
ProviderTokenNotInitError,
QuotaExceededError,
)
from dify_graph.model_runtime.errors.invoke import InvokeError
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import uuid_value
from models.model import AppMode

View File

@@ -20,9 +20,9 @@ from controllers.web.error import (
from controllers.web.wraps import WebApiResource
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from dify_graph.model_runtime.errors.invoke import InvokeError
from fields.conversation_fields import ResultResponse
from fields.message_fields import SuggestedQuestionsResponse, WebMessageInfiniteScrollPagination, WebMessageListItem
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import uuid_value
from models.enums import FeedbackRating

View File

@@ -11,9 +11,9 @@ from controllers.common.errors import (
UnsupportedFileTypeError,
)
from core.helper import ssrf_proxy
from dify_graph.file import helpers as file_helpers
from extensions.ext_database import db
from fields.file_fields import FileWithSignedUrl, RemoteFileInfo
from graphon.file import helpers as file_helpers
from services.file_service import FileService
from ..common.schema import register_schema_models

View File

@@ -22,9 +22,9 @@ from core.errors.error import (
ProviderTokenNotInitError,
QuotaExceededError,
)
from dify_graph.graph_engine.manager import GraphEngineManager
from dify_graph.model_runtime.errors.invoke import InvokeError
from extensions.ext_redis import redis_client
from graphon.graph_engine.manager import GraphEngineManager
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from models.model import App, AppMode, EndUser
from services.app_generate_service import AppGenerateService

View File

@@ -15,6 +15,7 @@ from core.app.entities.app_invoke_entities import (
AgentChatAppGenerateEntity,
ModelConfigWithCredentialsEntity,
)
from core.app.file_access import DatabaseFileAccessController
from core.callback_handler.agent_tool_callback_handler import DifyAgentCallbackHandler
from core.callback_handler.index_tool_callback_handler import DatasetIndexToolCallbackHandler
from core.memory.token_buffer_memory import TokenBufferMemory
@@ -26,8 +27,10 @@ from core.tools.entities.tool_entities import (
)
from core.tools.tool_manager import ToolManager
from core.tools.utils.dataset_retriever_tool import DatasetRetrieverTool
from dify_graph.file import file_manager
from dify_graph.model_runtime.entities import (
from extensions.ext_database import db
from factories import file_factory
from graphon.file import file_manager
from graphon.model_runtime.entities import (
AssistantPromptMessage,
LLMUsage,
PromptMessage,
@@ -37,15 +40,14 @@ from dify_graph.model_runtime.entities import (
ToolPromptMessage,
UserPromptMessage,
)
from dify_graph.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
from dify_graph.model_runtime.entities.model_entities import ModelFeature
from dify_graph.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from extensions.ext_database import db
from factories import file_factory
from graphon.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
from graphon.model_runtime.entities.model_entities import ModelFeature
from graphon.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from models.enums import CreatorUserRole
from models.model import Conversation, Message, MessageAgentThought, MessageFile
logger = logging.getLogger(__name__)
_file_access_controller = DatabaseFileAccessController()
class BaseAgentRunner(AppRunner):
@@ -138,6 +140,7 @@ class BaseAgentRunner(AppRunner):
tenant_id=self.tenant_id,
app_id=self.app_config.app_id,
agent_tool=tool,
user_id=self.user_id,
invoke_from=self.application_generate_entity.invoke_from,
)
assert tool_entity.entity.description
@@ -524,7 +527,10 @@ class BaseAgentRunner(AppRunner):
image_detail_config = image_detail_config or ImagePromptMessageContent.DETAIL.LOW
file_objs = file_factory.build_from_message_files(
message_files=files, tenant_id=self.tenant_id, config=file_extra_config
message_files=files,
tenant_id=self.tenant_id,
config=file_extra_config,
access_controller=_file_access_controller,
)
if not file_objs:
return UserPromptMessage(content=message.query)

View File

@@ -15,8 +15,8 @@ from core.prompt.agent_history_prompt_transform import AgentHistoryPromptTransfo
from core.tools.__base.tool import Tool
from core.tools.entities.tool_entities import ToolInvokeMeta
from core.tools.tool_engine import ToolEngine
from dify_graph.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
from dify_graph.model_runtime.entities.message_entities import (
from graphon.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
from graphon.model_runtime.entities.message_entities import (
AssistantPromptMessage,
PromptMessage,
PromptMessageTool,
@@ -122,7 +122,6 @@ class CotAgentRunner(BaseAgentRunner, ABC):
tools=[],
stop=app_generate_entity.model_conf.stop,
stream=True,
user=self.user_id,
callbacks=[],
)

View File

@@ -1,16 +1,16 @@
import json
from core.agent.cot_agent_runner import CotAgentRunner
from dify_graph.file import file_manager
from dify_graph.model_runtime.entities import (
from graphon.file import file_manager
from graphon.model_runtime.entities import (
AssistantPromptMessage,
PromptMessage,
SystemPromptMessage,
TextPromptMessageContent,
UserPromptMessage,
)
from dify_graph.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from graphon.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
from graphon.model_runtime.utils.encoders import jsonable_encoder
class CotChatAgentRunner(CotAgentRunner):

View File

@@ -1,13 +1,13 @@
import json
from core.agent.cot_agent_runner import CotAgentRunner
from dify_graph.model_runtime.entities.message_entities import (
from graphon.model_runtime.entities.message_entities import (
AssistantPromptMessage,
PromptMessage,
TextPromptMessageContent,
UserPromptMessage,
)
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from graphon.model_runtime.utils.encoders import jsonable_encoder
class CotCompletionAgentRunner(CotAgentRunner):

View File

@@ -11,8 +11,8 @@ from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessag
from core.prompt.agent_history_prompt_transform import AgentHistoryPromptTransform
from core.tools.entities.tool_entities import ToolInvokeMeta
from core.tools.tool_engine import ToolEngine
from dify_graph.file import file_manager
from dify_graph.model_runtime.entities import (
from graphon.file import file_manager
from graphon.model_runtime.entities import (
AssistantPromptMessage,
LLMResult,
LLMResultChunk,
@@ -25,7 +25,7 @@ from dify_graph.model_runtime.entities import (
ToolPromptMessage,
UserPromptMessage,
)
from dify_graph.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
from graphon.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
from models.model import Message
logger = logging.getLogger(__name__)
@@ -96,7 +96,6 @@ class FunctionCallAgentRunner(BaseAgentRunner):
tools=prompt_messages_tools,
stop=app_generate_entity.model_conf.stop,
stream=self.stream_tool_call,
user=self.user_id,
callbacks=[],
)

View File

@@ -4,7 +4,7 @@ from collections.abc import Generator
from typing import Union
from core.agent.entities import AgentScratchpadUnit
from dify_graph.model_runtime.entities.llm_entities import LLMResultChunk
from graphon.model_runtime.entities.llm_entities import LLMResultChunk
class CotAgentOutputParser:

View File

@@ -4,10 +4,10 @@ from core.app.app_config.entities import EasyUIBasedAppConfig
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
from core.entities.model_entities import ModelStatus
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.provider_manager import ProviderManager
from dify_graph.model_runtime.entities.llm_entities import LLMMode
from dify_graph.model_runtime.entities.model_entities import ModelPropertyKey, ModelType
from dify_graph.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from core.plugin.impl.model_runtime_factory import create_plugin_provider_manager
from graphon.model_runtime.entities.llm_entities import LLMMode
from graphon.model_runtime.entities.model_entities import ModelPropertyKey, ModelType
from graphon.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
class ModelConfigConverter:
@@ -21,7 +21,7 @@ class ModelConfigConverter:
"""
model_config = app_config.model
provider_manager = ProviderManager()
provider_manager = create_plugin_provider_manager(tenant_id=app_config.tenant_id)
provider_model_bundle = provider_manager.get_provider_model_bundle(
tenant_id=app_config.tenant_id, provider=model_config.provider, model_type=ModelType.LLM
)

View File

@@ -2,9 +2,8 @@ from collections.abc import Mapping
from typing import Any
from core.app.app_config.entities import ModelConfigEntity
from core.provider_manager import ProviderManager
from dify_graph.model_runtime.entities.model_entities import ModelPropertyKey, ModelType
from dify_graph.model_runtime.model_providers.model_provider_factory import ModelProviderFactory
from core.plugin.impl.model_runtime_factory import create_plugin_model_assembly
from graphon.model_runtime.entities.model_entities import ModelPropertyKey, ModelType
from models.model import AppModelConfigDict
from models.provider_ids import ModelProviderID
@@ -54,9 +53,12 @@ class ModelConfigManager:
if not isinstance(config["model"], dict):
raise ValueError("model must be of object type")
# Keep provider discovery and provider-backed model listing on the same
# request-scoped runtime so caller scope and provider caches stay aligned.
assembly = create_plugin_model_assembly(tenant_id=tenant_id)
# model.provider
model_provider_factory = ModelProviderFactory(tenant_id)
provider_entities = model_provider_factory.get_providers()
provider_entities = assembly.model_provider_factory.get_providers()
model_provider_names = [provider.provider for provider in provider_entities]
if "provider" not in config["model"]:
raise ValueError(f"model.provider is required and must be in {str(model_provider_names)}")
@@ -71,8 +73,7 @@ class ModelConfigManager:
if "name" not in config["model"]:
raise ValueError("model.name is required")
provider_manager = ProviderManager()
models = provider_manager.get_configurations(tenant_id).get_models(
models = assembly.provider_manager.get_configurations(tenant_id).get_models(
provider=config["model"]["provider"], model_type=ModelType.LLM
)

View File

@@ -7,7 +7,7 @@ from core.app.app_config.entities import (
PromptTemplateEntity,
)
from core.prompt.simple_prompt_transform import ModelMode
from dify_graph.model_runtime.entities.message_entities import PromptMessageRole
from graphon.model_runtime.entities.message_entities import PromptMessageRole
from models.model import AppMode, AppModelConfigDict

View File

@@ -3,7 +3,7 @@ from typing import cast
from core.app.app_config.entities import ExternalDataVariableEntity
from core.external_data_tool.factory import ExternalDataToolFactory
from dify_graph.variables.input_entities import VariableEntity, VariableEntityType
from graphon.variables.input_entities import VariableEntity, VariableEntityType
from models.model import AppModelConfigDict
_ALLOWED_VARIABLE_ENTITY_TYPE = frozenset(

View File

@@ -5,10 +5,10 @@ from typing import Any, Literal
from pydantic import BaseModel, Field
from core.rag.data_post_processor.data_post_processor import RerankingModelDict, WeightsDict
from dify_graph.file import FileUploadConfig
from dify_graph.model_runtime.entities.llm_entities import LLMMode
from dify_graph.model_runtime.entities.message_entities import PromptMessageRole
from dify_graph.variables.input_entities import VariableEntity as WorkflowVariableEntity
from graphon.file import FileUploadConfig
from graphon.model_runtime.entities.llm_entities import LLMMode
from graphon.model_runtime.entities.message_entities import PromptMessageRole
from graphon.variables.input_entities import VariableEntity as WorkflowVariableEntity
from models.model import AppMode

View File

@@ -2,7 +2,7 @@ from collections.abc import Mapping
from typing import Any
from constants import DEFAULT_FILE_NUMBER_LIMITS
from dify_graph.file import FileUploadConfig
from graphon.file import FileUploadConfig
class FileUploadConfigManager:

View File

@@ -1,7 +1,7 @@
import re
from core.app.app_config.entities import RagPipelineVariableEntity
from dify_graph.variables.input_entities import VariableEntity
from graphon.variables.input_entities import VariableEntity
from models.workflow import Workflow

View File

@@ -22,8 +22,14 @@ from core.app.app_config.features.file_upload.manager import FileUploadConfigMan
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.advanced_chat.app_runner import AdvancedChatAppRunner
from core.app.apps.advanced_chat.generate_response_converter import AdvancedChatAppGenerateResponseConverter
from core.app.apps.advanced_chat.generate_task_pipeline import AdvancedChatAppGenerateTaskPipeline
from core.app.apps.advanced_chat.generate_task_pipeline import (
AdvancedChatAppGenerateTaskPipeline,
ConversationSnapshot,
MessageSnapshot,
WorkflowSnapshot,
)
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.draft_variable_saver import DraftVariableSaverFactory
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
@@ -34,17 +40,13 @@ from core.helper.trace_id_helper import extract_external_trace_id_from_args
from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
from core.repositories import DifyCoreRepositoryFactory
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.model_runtime.errors.invoke import InvokeAuthorizationError
from dify_graph.repositories.draft_variable_repository import (
DraftVariableSaverFactory,
)
from dify_graph.repositories.workflow_execution_repository import WorkflowExecutionRepository
from dify_graph.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from dify_graph.runtime import GraphRuntimeState
from dify_graph.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from extensions.ext_database import db
from factories import file_factory
from graphon.graph_engine.layers.base import GraphEngineLayer
from graphon.model_runtime.errors.invoke import InvokeAuthorizationError
from graphon.runtime import GraphRuntimeState
from graphon.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
from libs.flask_utils import preserve_flask_contexts
from models import Account, App, Conversation, EndUser, Message, Workflow, WorkflowNodeExecutionTriggeredFrom
from models.enums import WorkflowRunTriggeredFrom
@@ -149,85 +151,87 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
files = args["files"] if args.get("files") else []
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
with self._bind_file_access_scope(tenant_id=app_model.tenant_id, user=user, invoke_from=invoke_from):
files = args["files"] if args.get("files") else []
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
access_controller=self._file_access_controller,
)
else:
file_objs = []
# convert to app config
app_config = AdvancedChatAppConfigManager.get_app_config(app_model=app_model, workflow=workflow)
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
else:
file_objs = []
# convert to app config
app_config = AdvancedChatAppConfigManager.get_app_config(app_model=app_model, workflow=workflow)
if invoke_from == InvokeFrom.DEBUGGER:
# always enable retriever resource in debugger mode
app_config.additional_features.show_retrieve_source = True # type: ignore
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
# init application generate entity
application_generate_entity = AdvancedChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
file_upload_config=file_extra_config,
conversation_id=conversation.id if conversation else None,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
extras=extras,
trace_manager=trace_manager,
workflow_run_id=str(workflow_run_id),
)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())
if invoke_from == InvokeFrom.DEBUGGER:
# always enable retriever resource in debugger mode
app_config.additional_features.show_retrieve_source = True # type: ignore
# Create repositories
#
# Create session factory
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
# Create workflow execution(aka workflow run) repository
if invoke_from == InvokeFrom.DEBUGGER:
workflow_triggered_from = WorkflowRunTriggeredFrom.DEBUGGING
else:
workflow_triggered_from = WorkflowRunTriggeredFrom.APP_RUN
workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=workflow_triggered_from,
)
# Create workflow node execution repository
workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
)
# init application generate entity
application_generate_entity = AdvancedChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
file_upload_config=file_extra_config,
conversation_id=conversation.id if conversation else None,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
extras=extras,
trace_manager=trace_manager,
workflow_run_id=str(workflow_run_id),
)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())
# Create repositories
#
# Create session factory
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
# Create workflow execution(aka workflow run) repository
if invoke_from == InvokeFrom.DEBUGGER:
workflow_triggered_from = WorkflowRunTriggeredFrom.DEBUGGING
else:
workflow_triggered_from = WorkflowRunTriggeredFrom.APP_RUN
workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=workflow_triggered_from,
)
# Create workflow node execution repository
workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
)
return self._generate(
workflow=workflow,
user=user,
invoke_from=invoke_from,
application_generate_entity=application_generate_entity,
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
conversation=conversation,
stream=streaming,
pause_state_config=pause_state_config,
)
return self._generate(
workflow=workflow,
user=user,
invoke_from=invoke_from,
application_generate_entity=application_generate_entity,
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
conversation=conversation,
stream=streaming,
pause_state_config=pause_state_config,
)
def resume(
self,
@@ -459,96 +463,91 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
:param conversation: conversation
:param stream: is stream
"""
is_first_conversation = conversation is None
with self._bind_file_access_scope(
tenant_id=application_generate_entity.app_config.tenant_id,
user=user,
invoke_from=invoke_from,
):
is_first_conversation = conversation is None
if conversation is not None and message is not None:
pass
else:
conversation, message = self._init_generate_records(application_generate_entity, conversation)
if conversation is not None and message is not None:
pass
else:
conversation, message = self._init_generate_records(application_generate_entity, conversation)
if is_first_conversation:
# update conversation features
conversation.override_model_configs = workflow.features
db.session.commit()
db.session.refresh(conversation)
if is_first_conversation:
# update conversation features
conversation.override_model_configs = workflow.features
db.session.commit()
db.session.refresh(conversation)
# get conversation dialogue count
# NOTE: dialogue_count should not start from 0,
# because during the first conversation, dialogue_count should be 1.
self._dialogue_count = get_thread_messages_length(conversation.id) + 1
# get conversation dialogue count
# NOTE: dialogue_count should not start from 0,
# because during the first conversation, dialogue_count should be 1.
self._dialogue_count = get_thread_messages_length(conversation.id) + 1
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
graph_layers: list[GraphEngineLayer] = list(graph_engine_layers)
if pause_state_config is not None:
graph_layers.append(
PauseStatePersistenceLayer(
session_factory=pause_state_config.session_factory,
generate_entity=application_generate_entity,
state_owner_user_id=pause_state_config.state_owner_user_id,
)
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
# new thread with request context and contextvars
context = contextvars.copy_context()
graph_layers: list[GraphEngineLayer] = list(graph_engine_layers)
if pause_state_config is not None:
graph_layers.append(
PauseStatePersistenceLayer(
session_factory=pause_state_config.session_factory,
generate_entity=application_generate_entity,
state_owner_user_id=pause_state_config.state_owner_user_id,
)
)
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"conversation_id": conversation.id,
"message_id": message.id,
"context": context,
"variable_loader": variable_loader,
"workflow_execution_repository": workflow_execution_repository,
"workflow_node_execution_repository": workflow_node_execution_repository,
"graph_engine_layers": tuple(graph_layers),
"graph_runtime_state": graph_runtime_state,
},
)
# new thread with request context and contextvars
context = contextvars.copy_context()
worker_thread.start()
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"conversation_id": conversation.id,
"message_id": message.id,
"context": context,
"variable_loader": variable_loader,
"workflow_execution_repository": workflow_execution_repository,
"workflow_node_execution_repository": workflow_node_execution_repository,
"graph_engine_layers": tuple(graph_layers),
"graph_runtime_state": graph_runtime_state,
},
)
# release database connection, because the following new thread operations may take a long time
with Session(bind=db.engine, expire_on_commit=False) as session:
workflow = _refresh_model(session=session, model=workflow)
message = _refresh_model(session=session, model=message)
if message is None:
raise RuntimeError("Failed to refresh Message; _refresh_model returned None.")
# workflow_ = session.get(Workflow, workflow.id)
# assert workflow_ is not None
# workflow = workflow_
# message_ = session.get(Message, message.id)
# assert message_ is not None
# message = message_
# db.session.refresh(workflow)
# db.session.refresh(message)
# db.session.refresh(user)
db.session.close()
worker_thread.start()
# return response or stream generator
response = self._handle_advanced_chat_response(
application_generate_entity=application_generate_entity,
workflow=workflow,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=stream,
draft_var_saver_factory=self._get_draft_var_saver_factory(invoke_from, account=user),
)
# Capture the scalar fields needed by the response pipeline before
# releasing the request-scoped SQLAlchemy session.
workflow_snapshot = WorkflowSnapshot.from_workflow(workflow)
conversation_snapshot = ConversationSnapshot.from_conversation(conversation)
message_snapshot = MessageSnapshot.from_message(message)
db.session.close()
return AdvancedChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
# return response or stream generator
response = self._handle_advanced_chat_response(
application_generate_entity=application_generate_entity,
workflow=workflow_snapshot,
queue_manager=queue_manager,
conversation=conversation_snapshot,
message=message_snapshot,
user=user,
stream=stream,
draft_var_saver_factory=self._get_draft_var_saver_factory(invoke_from, account=user),
)
return AdvancedChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
def _generate_worker(
self,
@@ -649,10 +648,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
self,
*,
application_generate_entity: AdvancedChatAppGenerateEntity,
workflow: Workflow,
workflow: WorkflowSnapshot,
queue_manager: AppQueueManager,
conversation: Conversation,
message: Message,
conversation: ConversationSnapshot,
message: MessageSnapshot,
user: Union[Account, EndUser],
draft_var_saver_factory: DraftVariableSaverFactory,
stream: bool = False,
@@ -689,23 +688,3 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
else:
logger.exception("Failed to process generate task pipeline, conversation_id: %s", conversation.id)
raise e
@overload
def _refresh_model(*, session: Session | None = None, model: Workflow) -> Workflow: ...
@overload
def _refresh_model(*, session: Session | None = None, model: Message) -> Message: ...
def _refresh_model(*, session: Session | None = None, model: Any) -> Any:
if session is not None:
detached_model = session.get(type(model), model.id)
assert detached_model is not None
return detached_model
with Session(bind=db.engine, expire_on_commit=False) as refresh_session:
detached_model = refresh_session.get(type(model), model.id)
assert detached_model is not None
return detached_model

View File

@@ -25,19 +25,24 @@ from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, Workfl
from core.db.session_factory import session_factory
from core.moderation.base import ModerationError
from core.moderation.input_moderation import InputModeration
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import get_default_root_node_id
from core.workflow.system_variables import (
build_bootstrap_variables,
build_system_variables,
system_variables_to_mapping,
)
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.enums import WorkflowType
from dify_graph.graph_engine.command_channels.redis_channel import RedisChannel
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.repositories.workflow_execution_repository import WorkflowExecutionRepository
from dify_graph.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from dify_graph.runtime import GraphRuntimeState, VariablePool
from dify_graph.system_variable import SystemVariable
from dify_graph.variable_loader import VariableLoader
from dify_graph.variables.variables import Variable
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from extensions.otel import WorkflowAppRunnerHandler, trace_span
from graphon.enums import WorkflowType
from graphon.graph_engine.command_channels.redis_channel import RedisChannel
from graphon.graph_engine.layers.base import GraphEngineLayer
from graphon.runtime import GraphRuntimeState, VariablePool
from graphon.variable_loader import VariableLoader
from graphon.variables.variables import Variable
from models import Workflow
from models.model import App, Conversation, Message, MessageAnnotation
from models.workflow import ConversationVariable
@@ -90,7 +95,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
app_config = self.application_generate_entity.app_config
app_config = cast(AdvancedChatAppConfig, app_config)
system_inputs = SystemVariable(
system_inputs = build_system_variables(
query=self.application_generate_entity.query,
files=self.application_generate_entity.files,
conversation_id=self.conversation.id,
@@ -132,6 +137,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
workflow=self._workflow,
single_iteration_run=self.application_generate_entity.single_iteration_run,
single_loop_run=self.application_generate_entity.single_loop_run,
user_id=self.application_generate_entity.user_id,
)
else:
inputs = self.application_generate_entity.inputs
@@ -150,7 +156,10 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
self.application_generate_entity.inputs = new_inputs
self.application_generate_entity.query = new_query
system_inputs.query = new_query
system_inputs = build_system_variables(
system_variables_to_mapping(system_inputs),
query=new_query,
)
# annotation reply
if self.handle_annotation_reply(
@@ -166,14 +175,17 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
# Create a variable pool.
# init variable pool
variable_pool = VariablePool(
system_variables=system_inputs,
user_inputs=new_inputs,
environment_variables=self._workflow.environment_variables,
# Based on the definition of `Variable`,
# `VariableBase` instances can be safely used as `Variable` since they are compatible.
conversation_variables=conversation_variables,
variable_pool = VariablePool()
add_variables_to_pool(
variable_pool,
build_bootstrap_variables(
system_variables=system_inputs,
environment_variables=self._workflow.environment_variables,
conversation_variables=conversation_variables,
),
)
root_node_id = get_default_root_node_id(self._workflow.graph_dict)
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=new_inputs)
# init graph
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.time())
@@ -185,6 +197,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
user_id=self.application_generate_entity.user_id,
user_from=user_from,
invoke_from=invoke_from,
root_node_id=root_node_id,
)
db.session.close()

View File

@@ -1,4 +1,4 @@
from collections.abc import Generator, Iterator
from collections.abc import Generator
from typing import Any, cast
from core.app.apps.base_app_generate_response_converter import AppGenerateResponseConverter
@@ -56,8 +56,8 @@ class AdvancedChatAppGenerateResponseConverter(AppGenerateResponseConverter):
@classmethod
def convert_stream_full_response(
cls, stream_response: Iterator[AppStreamResponse]
) -> Generator[dict | str, None, None]:
cls, stream_response: Generator[AppStreamResponse, None, None]
) -> Generator[dict | str, Any, None]:
"""
Convert stream full response.
:param stream_response: stream response
@@ -87,8 +87,8 @@ class AdvancedChatAppGenerateResponseConverter(AppGenerateResponseConverter):
@classmethod
def convert_stream_simple_response(
cls, stream_response: Iterator[AppStreamResponse]
) -> Generator[dict | str, None, None]:
cls, stream_response: Generator[AppStreamResponse, None, None]
) -> Generator[dict | str, Any, None]:
"""
Convert stream simple response.
:param stream_response: stream response

View File

@@ -4,6 +4,8 @@ import re
import time
from collections.abc import Callable, Generator, Mapping
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime
from threading import Thread
from typing import Any, Union
@@ -14,6 +16,7 @@ from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.common.graph_runtime_state_support import GraphRuntimeStateSupport
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.apps.draft_variable_saver import DraftVariableSaverFactory
from core.app.entities.app_invoke_entities import (
AdvancedChatAppGenerateEntity,
InvokeFrom,
@@ -65,24 +68,72 @@ from core.app.task_pipeline.message_cycle_manager import MessageCycleManager
from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.ops.ops_trace_manager import TraceQueueManager
from core.repositories.human_input_repository import HumanInputFormRepositoryImpl
from dify_graph.entities.pause_reason import HumanInputRequired
from dify_graph.enums import WorkflowExecutionStatus
from dify_graph.model_runtime.entities.llm_entities import LLMUsage
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from dify_graph.nodes import BuiltinNodeTypes
from dify_graph.repositories.draft_variable_repository import DraftVariableSaverFactory
from dify_graph.runtime import GraphRuntimeState
from dify_graph.system_variable import SystemVariable
from core.workflow.file_reference import resolve_file_record_id
from core.workflow.system_variables import build_system_variables
from extensions.ext_database import db
from graphon.entities.pause_reason import HumanInputRequired
from graphon.enums import WorkflowExecutionStatus
from graphon.model_runtime.entities.llm_entities import LLMUsage
from graphon.model_runtime.utils.encoders import jsonable_encoder
from graphon.nodes import BuiltinNodeTypes
from graphon.runtime import GraphRuntimeState
from libs.datetime_utils import naive_utc_now
from models import Account, Conversation, EndUser, Message, MessageFile
from models.enums import CreatorUserRole, MessageFileBelongsTo, MessageStatus
from models.execution_extra_content import HumanInputContent
from models.model import AppMode
from models.workflow import Workflow
logger = logging.getLogger(__name__)
@dataclass(frozen=True, slots=True)
class WorkflowSnapshot:
id: str
tenant_id: str
features_dict: Mapping[str, Any]
@classmethod
def from_workflow(cls, workflow: Workflow) -> "WorkflowSnapshot":
return cls(
id=workflow.id,
tenant_id=workflow.tenant_id,
features_dict=dict(workflow.features_dict),
)
@dataclass(frozen=True, slots=True)
class ConversationSnapshot:
id: str
mode: AppMode
@classmethod
def from_conversation(cls, conversation: Conversation) -> "ConversationSnapshot":
return cls(
id=conversation.id,
mode=conversation.mode,
)
@dataclass(frozen=True, slots=True)
class MessageSnapshot:
id: str
query: str
created_at: datetime
status: MessageStatus
answer: str
@classmethod
def from_message(cls, message: Message) -> "MessageSnapshot":
return cls(
id=message.id,
query=message.query,
created_at=message.created_at,
status=message.status,
answer=message.answer,
)
class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
"""
AdvancedChatAppGenerateTaskPipeline is a class that generate stream output and state management for Application.
@@ -91,10 +142,10 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
def __init__(
self,
application_generate_entity: AdvancedChatAppGenerateEntity,
workflow: Workflow,
workflow: WorkflowSnapshot,
queue_manager: AppQueueManager,
conversation: Conversation,
message: Message,
conversation: ConversationSnapshot,
message: MessageSnapshot,
user: Union[Account, EndUser],
stream: bool,
dialogue_count: int,
@@ -117,7 +168,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
else:
raise NotImplementedError(f"User type not supported: {type(user)}")
self._workflow_system_variables = SystemVariable(
self._workflow_system_variables = build_system_variables(
query=message.query,
files=application_generate_entity.files,
conversation_id=conversation.id,
@@ -155,7 +206,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
self._message_saved_on_pause = False
self._seed_graph_runtime_state_from_queue_manager()
def _seed_task_state_from_message(self, message: Message) -> None:
def _seed_task_state_from_message(self, message: MessageSnapshot) -> None:
if message.status == MessageStatus.PAUSED and message.answer:
self._task_state.answer = message.answer
@@ -741,8 +792,9 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
def _load_human_input_form_id(self, *, node_id: str) -> str | None:
form_repository = HumanInputFormRepositoryImpl(
tenant_id=self._workflow_tenant_id,
workflow_execution_id=self._workflow_run_id,
)
form = form_repository.get_form(self._workflow_run_id, node_id)
form = form_repository.get_form(node_id)
if form is None:
return None
return form.id
@@ -933,21 +985,23 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
metadata = self._task_state.metadata.model_dump()
message.message_metadata = json.dumps(jsonable_encoder(metadata))
message_files = [
MessageFile(
message_id=message.id,
type=file["type"],
transfer_method=file["transfer_method"],
url=file["remote_url"],
belongs_to=MessageFileBelongsTo.ASSISTANT,
upload_file_id=file["related_id"],
created_by_role=CreatorUserRole.ACCOUNT
if message.invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER}
else CreatorUserRole.END_USER,
created_by=message.from_account_id or message.from_end_user_id or "",
message_files: list[MessageFile] = []
for file in self._recorded_files:
reference = file.get("reference") or file.get("related_id")
message_files.append(
MessageFile(
message_id=message.id,
type=file["type"],
transfer_method=file["transfer_method"],
url=file["remote_url"],
belongs_to=MessageFileBelongsTo.ASSISTANT,
upload_file_id=resolve_file_record_id(reference if isinstance(reference, str) else None),
created_by_role=CreatorUserRole.ACCOUNT
if message.invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER}
else CreatorUserRole.END_USER,
created_by=message.from_account_id or message.from_end_user_id or "",
)
)
for file in self._recorded_files
]
session.add_all(message_files)
def _seed_graph_runtime_state_from_queue_manager(self) -> None:
@@ -1003,13 +1057,11 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
return message
def _save_output_for_event(self, event: QueueNodeSucceededEvent | QueueNodeExceptionEvent, node_execution_id: str):
with Session(db.engine) as session, session.begin():
saver = self._draft_var_saver_factory(
session=session,
app_id=self._application_generate_entity.app_config.app_id,
node_id=event.node_id,
node_type=event.node_type,
node_execution_id=node_execution_id,
enclosing_node_id=event.in_loop_id or event.in_iteration_id,
)
saver.save(event.process_data, event.outputs)
saver = self._draft_var_saver_factory(
app_id=self._application_generate_entity.app_config.app_id,
node_id=event.node_id,
node_type=event.node_type,
node_execution_id=node_execution_id,
enclosing_node_id=event.in_loop_id or event.in_iteration_id,
)
saver.save(event.process_data, event.outputs)

View File

@@ -21,9 +21,9 @@ from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity, InvokeFrom
from core.ops.ops_trace_manager import TraceQueueManager
from dify_graph.model_runtime.errors.invoke import InvokeAuthorizationError
from extensions.ext_database import db
from factories import file_factory
from graphon.model_runtime.errors.invoke import InvokeAuthorizationError
from libs.flask_utils import preserve_flask_contexts
from models import Account, App, EndUser
from services.conversation_service import ConversationService
@@ -129,89 +129,93 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
files = args.get("files") or []
file_extra_config = FileUploadConfigManager.convert(override_model_config_dict or app_model_config.to_dict())
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
with self._bind_file_access_scope(tenant_id=app_model.tenant_id, user=user, invoke_from=invoke_from):
files = args.get("files") or []
file_extra_config = FileUploadConfigManager.convert(
override_model_config_dict or app_model_config.to_dict()
)
else:
file_objs = []
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
access_controller=self._file_access_controller,
)
else:
file_objs = []
# convert to app config
app_config = AgentChatAppConfigManager.get_app_config(
app_model=app_model,
app_model_config=app_model_config,
conversation=conversation,
override_config_dict=override_model_config_dict,
)
# convert to app config
app_config = AgentChatAppConfigManager.get_app_config(
app_model=app_model,
app_model_config=app_model_config,
conversation=conversation,
override_config_dict=override_model_config_dict,
)
# get tracing instance
trace_manager = TraceQueueManager(app_model.id, user.id if isinstance(user, Account) else user.session_id)
# get tracing instance
trace_manager = TraceQueueManager(app_model.id, user.id if isinstance(user, Account) else user.session_id)
# init application generate entity
application_generate_entity = AgentChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config),
file_upload_config=file_extra_config,
conversation_id=conversation.id if conversation else None,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
extras=extras,
call_depth=0,
trace_manager=trace_manager,
)
# init application generate entity
application_generate_entity = AgentChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config),
file_upload_config=file_extra_config,
conversation_id=conversation.id if conversation else None,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
extras=extras,
call_depth=0,
trace_manager=trace_manager,
)
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
# new thread with request context and contextvars
context = contextvars.copy_context()
# new thread with request context and contextvars
context = contextvars.copy_context()
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"context": context,
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"conversation_id": conversation.id,
"message_id": message.id,
},
)
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"context": context,
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"conversation_id": conversation.id,
"message_id": message.id,
},
)
worker_thread.start()
worker_thread.start()
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=streaming,
)
return AgentChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=streaming,
)
return AgentChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
def _generate_worker(
self,

View File

@@ -15,10 +15,10 @@ from core.app.entities.queue_entities import QueueAnnotationReplyEvent
from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance
from core.moderation.base import ModerationError
from dify_graph.model_runtime.entities.llm_entities import LLMMode
from dify_graph.model_runtime.entities.model_entities import ModelFeature, ModelPropertyKey
from dify_graph.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from extensions.ext_database import db
from graphon.model_runtime.entities.llm_entities import LLMMode
from graphon.model_runtime.entities.model_entities import ModelFeature, ModelPropertyKey
from graphon.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from models.model import App, Conversation, Message
logger = logging.getLogger(__name__)

View File

@@ -1,4 +1,4 @@
from collections.abc import Generator, Iterator
from collections.abc import Generator
from typing import cast
from core.app.apps.base_app_generate_response_converter import AppGenerateResponseConverter
@@ -55,7 +55,7 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter):
@classmethod
def convert_stream_full_response(
cls, stream_response: Iterator[AppStreamResponse]
cls, stream_response: Generator[AppStreamResponse, None, None]
) -> Generator[dict | str, None, None]:
"""
Convert stream full response.
@@ -86,7 +86,7 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter):
@classmethod
def convert_stream_simple_response(
cls, stream_response: Iterator[AppStreamResponse]
cls, stream_response: Generator[AppStreamResponse, None, None]
) -> Generator[dict | str, None, None]:
"""
Convert stream simple response.

View File

@@ -1,12 +1,12 @@
import logging
from abc import ABC, abstractmethod
from collections.abc import Generator, Iterator, Mapping
from typing import Any
from collections.abc import Generator, Mapping
from typing import Any, Union
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.task_entities import AppBlockingResponse, AppStreamResponse
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from dify_graph.model_runtime.errors.invoke import InvokeError
from graphon.model_runtime.errors.invoke import InvokeError
logger = logging.getLogger(__name__)
@@ -16,26 +16,24 @@ class AppGenerateResponseConverter(ABC):
@classmethod
def convert(
cls, response: AppBlockingResponse | Iterator[AppStreamResponse], invoke_from: InvokeFrom
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], None, None]:
cls, response: Union[AppBlockingResponse, Generator[AppStreamResponse, Any, None]], invoke_from: InvokeFrom
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], Any, None]:
if invoke_from in {InvokeFrom.DEBUGGER, InvokeFrom.SERVICE_API}:
if isinstance(response, AppBlockingResponse):
return cls.convert_blocking_full_response(response)
else:
stream_response = response
def _generate_full_response() -> Generator[dict[str, Any] | str, None, None]:
yield from cls.convert_stream_full_response(stream_response)
def _generate_full_response() -> Generator[dict | str, Any, None]:
yield from cls.convert_stream_full_response(response)
return _generate_full_response()
else:
if isinstance(response, AppBlockingResponse):
return cls.convert_blocking_simple_response(response)
else:
stream_response = response
def _generate_simple_response() -> Generator[dict[str, Any] | str, None, None]:
yield from cls.convert_stream_simple_response(stream_response)
def _generate_simple_response() -> Generator[dict | str, Any, None]:
yield from cls.convert_stream_simple_response(response)
return _generate_simple_response()
@@ -52,14 +50,14 @@ class AppGenerateResponseConverter(ABC):
@classmethod
@abstractmethod
def convert_stream_full_response(
cls, stream_response: Iterator[AppStreamResponse]
cls, stream_response: Generator[AppStreamResponse, None, None]
) -> Generator[dict | str, None, None]:
raise NotImplementedError
@classmethod
@abstractmethod
def convert_stream_simple_response(
cls, stream_response: Iterator[AppStreamResponse]
cls, stream_response: Generator[AppStreamResponse, None, None]
) -> Generator[dict | str, None, None]:
raise NotImplementedError

View File

@@ -1,27 +1,89 @@
from collections.abc import Generator, Mapping, Sequence
from contextlib import AbstractContextManager, nullcontext
from typing import TYPE_CHECKING, Any, Union, final
from sqlalchemy.orm import Session
from core.app.entities.app_invoke_entities import InvokeFrom
from dify_graph.enums import NodeType
from dify_graph.file import File, FileUploadConfig
from dify_graph.repositories.draft_variable_repository import (
from core.app.apps.draft_variable_saver import (
DraftVariableSaver,
DraftVariableSaverFactory,
NoopDraftVariableSaver,
)
from dify_graph.variables.input_entities import VariableEntityType
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom
from core.app.file_access import DatabaseFileAccessController, FileAccessScope, bind_file_access_scope
from extensions.ext_database import db
from factories import file_factory
from graphon.enums import NodeType
from graphon.file import File, FileUploadConfig
from graphon.variables.input_entities import VariableEntityType
from libs.orjson import orjson_dumps
from models import Account, EndUser
from services.workflow_draft_variable_service import DraftVariableSaver as DraftVariableSaverImpl
if TYPE_CHECKING:
from dify_graph.variables.input_entities import VariableEntity
from graphon.variables.input_entities import VariableEntity
@final
class _DebuggerDraftVariableSaver:
"""Adapter that binds SQLAlchemy session setup outside the saver port."""
def __init__(
self,
*,
account: Account,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> None:
self._account = account
self._app_id = app_id
self._node_id = node_id
self._node_type = node_type
self._node_execution_id = node_execution_id
self._enclosing_node_id = enclosing_node_id
def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None) -> None:
with Session(db.engine) as session, session.begin():
DraftVariableSaverImpl(
session=session,
app_id=self._app_id,
node_id=self._node_id,
node_type=self._node_type,
node_execution_id=self._node_execution_id,
enclosing_node_id=self._enclosing_node_id,
user=self._account,
).save(process_data, outputs)
class BaseAppGenerator:
_file_access_controller: DatabaseFileAccessController = DatabaseFileAccessController()
@staticmethod
def _bind_file_access_scope(
*,
tenant_id: str,
user: Account | EndUser,
invoke_from: InvokeFrom,
) -> AbstractContextManager[None]:
"""Bind request-scoped file ownership markers for downstream file lookups."""
user_id = getattr(user, "id", None)
if not isinstance(user_id, str) or not user_id:
return nullcontext()
user_from = UserFrom.ACCOUNT if isinstance(user, Account) else UserFrom.END_USER
return bind_file_access_scope(
FileAccessScope(
tenant_id=tenant_id,
user_id=user_id,
user_from=user_from,
invoke_from=invoke_from,
)
)
def _prepare_user_inputs(
self,
*,
@@ -50,6 +112,7 @@ class BaseAppGenerator:
allowed_file_upload_methods=entity_dictionary[k].allowed_file_upload_methods or [],
),
strict_type_validation=strict_type_validation,
access_controller=self._file_access_controller,
)
for k, v in user_inputs.items()
if isinstance(v, dict) and entity_dictionary[k].type == VariableEntityType.FILE
@@ -64,6 +127,7 @@ class BaseAppGenerator:
allowed_file_extensions=entity_dictionary[k].allowed_file_extensions or [],
allowed_file_upload_methods=entity_dictionary[k].allowed_file_upload_methods or [],
),
access_controller=self._file_access_controller,
)
for k, v in user_inputs.items()
if isinstance(v, list)
@@ -224,35 +288,32 @@ class BaseAppGenerator:
def _get_draft_var_saver_factory(invoke_from: InvokeFrom, account: Account | EndUser) -> DraftVariableSaverFactory:
if invoke_from == InvokeFrom.DEBUGGER:
assert isinstance(account, Account)
debug_account = account
def draft_var_saver_factory(
session: Session,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> DraftVariableSaver:
return DraftVariableSaverImpl(
session=session,
return _DebuggerDraftVariableSaver(
account=account,
app_id=app_id,
node_id=node_id,
node_type=node_type,
node_execution_id=node_execution_id,
enclosing_node_id=enclosing_node_id,
user=debug_account,
)
else:
def draft_var_saver_factory(
session: Session,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> DraftVariableSaver:
_ = app_id, node_id, node_type, node_execution_id, enclosing_node_id
return NoopDraftVariableSaver()
return draft_var_saver_factory

View File

@@ -20,8 +20,8 @@ from core.app.entities.queue_entities import (
QueueStopEvent,
WorkflowQueueMessage,
)
from dify_graph.runtime import GraphRuntimeState
from extensions.ext_redis import redis_client
from graphon.runtime import GraphRuntimeState
logger = logging.getLogger(__name__)
@@ -61,27 +61,30 @@ class AppQueueManager(ABC):
listen_timeout = dify_config.APP_MAX_EXECUTION_TIME
start_time = time.time()
last_ping_time: int | float = 0
while True:
try:
message = self._q.get(timeout=1)
if message is None:
break
try:
while True:
try:
message = self._q.get(timeout=1)
if message is None:
break
yield message
except queue.Empty:
continue
finally:
elapsed_time = time.time() - start_time
if elapsed_time >= listen_timeout or self._is_stopped():
# publish two messages to make sure the client can receive the stop signal
# and stop listening after the stop signal processed
self.publish(
QueueStopEvent(stopped_by=QueueStopEvent.StopBy.USER_MANUAL), PublishFrom.TASK_PIPELINE
)
yield message
except queue.Empty:
continue
finally:
elapsed_time = time.time() - start_time
if elapsed_time >= listen_timeout or self._is_stopped():
# publish two messages to make sure the client can receive the stop signal
# and stop listening after the stop signal processed
self.publish(
QueueStopEvent(stopped_by=QueueStopEvent.StopBy.USER_MANUAL), PublishFrom.TASK_PIPELINE
)
if elapsed_time // 10 > last_ping_time:
self.publish(QueuePingEvent(), PublishFrom.TASK_PIPELINE)
last_ping_time = elapsed_time // 10
if elapsed_time // 10 > last_ping_time:
self.publish(QueuePingEvent(), PublishFrom.TASK_PIPELINE)
last_ping_time = elapsed_time // 10
finally:
self._graph_runtime_state = None # Release reference once consumers finish or close the generator.
def stop_listen(self):
"""
@@ -90,7 +93,6 @@ class AppQueueManager(ABC):
"""
self._clear_task_belong_cache()
self._q.put(None)
self._graph_runtime_state = None # Release reference to allow GC to reclaim memory
def _clear_task_belong_cache(self) -> None:
"""

View File

@@ -29,22 +29,22 @@ from core.prompt.advanced_prompt_transform import AdvancedPromptTransform
from core.prompt.entities.advanced_prompt_entities import ChatModelMessage, CompletionModelPromptTemplate, MemoryConfig
from core.prompt.simple_prompt_transform import ModelMode, SimplePromptTransform
from core.tools.tool_file_manager import ToolFileManager
from dify_graph.file.enums import FileTransferMethod, FileType
from dify_graph.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
from dify_graph.model_runtime.entities.message_entities import (
from extensions.ext_database import db
from graphon.file.enums import FileTransferMethod, FileType
from graphon.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
from graphon.model_runtime.entities.message_entities import (
AssistantPromptMessage,
ImagePromptMessageContent,
PromptMessage,
TextPromptMessageContent,
)
from dify_graph.model_runtime.entities.model_entities import ModelPropertyKey
from dify_graph.model_runtime.errors.invoke import InvokeBadRequestError
from extensions.ext_database import db
from graphon.model_runtime.entities.model_entities import ModelPropertyKey
from graphon.model_runtime.errors.invoke import InvokeBadRequestError
from models.enums import CreatorUserRole, MessageFileBelongsTo
from models.model import App, AppMode, Message, MessageAnnotation, MessageFile
if TYPE_CHECKING:
from dify_graph.file.models import File
from graphon.file.models import File
_logger = logging.getLogger(__name__)

View File

@@ -1,3 +1,4 @@
import contextvars
import logging
import threading
import uuid
@@ -20,9 +21,9 @@ from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import ChatAppGenerateEntity, InvokeFrom
from core.ops.ops_trace_manager import TraceQueueManager
from dify_graph.model_runtime.errors.invoke import InvokeAuthorizationError
from extensions.ext_database import db
from factories import file_factory
from graphon.model_runtime.errors.invoke import InvokeAuthorizationError
from models import Account
from models.model import App, EndUser
from services.conversation_service import ConversationService
@@ -120,93 +121,96 @@ class ChatAppGenerator(MessageBasedAppGenerator):
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
files = args["files"] if args.get("files") else []
file_extra_config = FileUploadConfigManager.convert(override_model_config_dict or app_model_config.to_dict())
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
with self._bind_file_access_scope(tenant_id=app_model.tenant_id, user=user, invoke_from=invoke_from):
files = args["files"] if args.get("files") else []
file_extra_config = FileUploadConfigManager.convert(
override_model_config_dict or app_model_config.to_dict()
)
else:
file_objs = []
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
access_controller=self._file_access_controller,
)
else:
file_objs = []
# convert to app config
app_config = ChatAppConfigManager.get_app_config(
app_model=app_model,
app_model_config=app_model_config,
conversation=conversation,
override_config_dict=override_model_config_dict,
)
# convert to app config
app_config = ChatAppConfigManager.get_app_config(
app_model=app_model,
app_model_config=app_model_config,
conversation=conversation,
override_config_dict=override_model_config_dict,
)
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
# init application generate entity
application_generate_entity = ChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config),
file_upload_config=file_extra_config,
conversation_id=conversation.id if conversation else None,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
invoke_from=invoke_from,
extras=extras,
trace_manager=trace_manager,
stream=streaming,
)
# init application generate entity
application_generate_entity = ChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config),
file_upload_config=file_extra_config,
conversation_id=conversation.id if conversation else None,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
invoke_from=invoke_from,
extras=extras,
trace_manager=trace_manager,
stream=streaming,
)
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
if conversation is None or message is None:
raise RuntimeError("_init_generate_records() returned None for conversation or message")
generated_conversation_id = str(conversation.id)
generated_message_id = str(message.id)
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=generated_conversation_id,
app_mode=conversation.mode,
message_id=generated_message_id,
)
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
# new thread with request context
@copy_current_request_context
def worker_with_context():
return self._generate_worker(
flask_app=current_app._get_current_object(), # type: ignore
context = contextvars.copy_context()
# new thread with request context
@copy_current_request_context
def worker_with_context():
return context.run(
self._generate_worker,
flask_app=current_app._get_current_object(), # type: ignore
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation_id=conversation.id,
message_id=message.id,
)
worker_thread = threading.Thread(target=worker_with_context)
worker_thread.start()
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation_id=generated_conversation_id,
message_id=generated_message_id,
conversation=conversation,
message=message,
user=user,
stream=streaming,
)
worker_thread = threading.Thread(target=worker_with_context)
worker_thread.start()
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=streaming,
)
return ChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
return ChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
def _generate_worker(
self,

View File

@@ -15,9 +15,9 @@ from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance
from core.moderation.base import ModerationError
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
from dify_graph.file import File
from dify_graph.model_runtime.entities.message_entities import ImagePromptMessageContent
from extensions.ext_database import db
from graphon.file import File
from graphon.model_runtime.entities.message_entities import ImagePromptMessageContent
from models.model import App, Conversation, Message
logger = logging.getLogger(__name__)
@@ -223,7 +223,6 @@ class ChatAppRunner(AppRunner):
model_parameters=application_generate_entity.model_conf.parameters,
stop=stop,
stream=application_generate_entity.stream,
user=application_generate_entity.user_id,
)
# handle invoke result

View File

@@ -1,4 +1,4 @@
from collections.abc import Generator, Iterator
from collections.abc import Generator
from typing import cast
from core.app.apps.base_app_generate_response_converter import AppGenerateResponseConverter
@@ -55,7 +55,7 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter):
@classmethod
def convert_stream_full_response(
cls, stream_response: Iterator[AppStreamResponse]
cls, stream_response: Generator[AppStreamResponse, None, None]
) -> Generator[dict | str, None, None]:
"""
Convert stream full response.
@@ -86,7 +86,7 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter):
@classmethod
def convert_stream_simple_response(
cls, stream_response: Iterator[AppStreamResponse]
cls, stream_response: Generator[AppStreamResponse, None, None]
) -> Generator[dict | str, None, None]:
"""
Convert stream simple response.

Some files were not shown because too many files have changed in this diff Show More