Compare commits

..

5 Commits

Author SHA1 Message Date
Novice Lee
81375088e9 fix: remove unused fields 2024-12-26 14:51:33 +08:00
Novice Lee
cfade297e8 fix: mypy static type checking issues 2024-12-26 09:04:29 +08:00
Novice Lee
8933dd85bf Merge branch 'main' into feat/retry-single-step-debug 2024-12-26 08:56:24 +08:00
Novice Lee
ae5e8d3160 fix: remove unused import 2024-12-25 10:40:02 +08:00
Novice Lee
fc6c0317a5 feat: add single step retry 2024-12-25 10:38:54 +08:00
1133 changed files with 16788 additions and 39288 deletions

View File

@@ -8,7 +8,7 @@ inputs:
poetry-version:
description: Poetry version to set up
required: true
default: '2.0.1'
default: '1.8.4'
poetry-lockfile:
description: Path to the Poetry lockfile to restore cache from
required: true

View File

@@ -42,23 +42,25 @@ jobs:
run: poetry install -C api --with dev
- name: Check dependencies in pyproject.toml
run: poetry run -P api bash dev/pytest/pytest_artifacts.sh
run: poetry run -C api bash dev/pytest/pytest_artifacts.sh
- name: Run Unit tests
run: poetry run -P api bash dev/pytest/pytest_unit_tests.sh
run: poetry run -C api bash dev/pytest/pytest_unit_tests.sh
- name: Run ModelRuntime
run: poetry run -P api bash dev/pytest/pytest_model_runtime.sh
run: poetry run -C api bash dev/pytest/pytest_model_runtime.sh
- name: Run dify config tests
run: poetry run -P api python dev/pytest/pytest_config_tests.py
run: poetry run -C api python dev/pytest/pytest_config_tests.py
- name: Run Tool
run: poetry run -P api bash dev/pytest/pytest_tools.sh
run: poetry run -C api bash dev/pytest/pytest_tools.sh
- name: Run mypy
run: |
poetry run -C api python -m mypy --install-types --non-interactive .
pushd api
poetry run python -m mypy --install-types --non-interactive .
popd
- name: Set up dotenvs
run: |
@@ -78,4 +80,4 @@ jobs:
ssrf_proxy
- name: Run Workflow
run: poetry run -P api bash dev/pytest/pytest_workflow.sh
run: poetry run -C api bash dev/pytest/pytest_workflow.sh

View File

@@ -5,8 +5,6 @@ on:
branches:
- "main"
- "deploy/dev"
- "deploy/enterprise"
- "e-260"
release:
types: [published]

View File

@@ -1,29 +0,0 @@
name: Deploy Enterprise
permissions:
contents: read
on:
workflow_run:
workflows: ["Build and Push API & Web"]
branches:
- "deploy/enterprise"
types:
- completed
jobs:
deploy:
runs-on: ubuntu-latest
if: |
github.event.workflow_run.conclusion == 'success' &&
github.event.workflow_run.head_branch == 'deploy/enterprise'
steps:
- name: Deploy to server
uses: appleboy/ssh-action@v0.1.8
with:
host: ${{ secrets.ENTERPRISE_SSH_HOST }}
username: ${{ secrets.ENTERPRISE_SSH_USER }}
password: ${{ secrets.ENTERPRISE_SSH_PASSWORD }}
script: |
${{ vars.ENTERPRISE_SSH_SCRIPT || secrets.ENTERPRISE_SSH_SCRIPT }}

View File

@@ -1,47 +0,0 @@
name: Build docker image
on:
pull_request:
branches:
- "main"
paths:
- api/Dockerfile
- web/Dockerfile
concurrency:
group: docker-build-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
jobs:
build-docker:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- service_name: "api-amd64"
platform: linux/amd64
context: "api"
- service_name: "api-arm64"
platform: linux/arm64
context: "api"
- service_name: "web-amd64"
platform: linux/amd64
context: "web"
- service_name: "web-arm64"
platform: linux/arm64
context: "web"
steps:
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build Docker Image
uses: docker/build-push-action@v6
with:
push: false
context: "{{defaultContext}}:${{ matrix.context }}"
platforms: ${{ matrix.platform }}
cache-from: type=gha
cache-to: type=gha,mode=max

View File

@@ -38,12 +38,12 @@ jobs:
if: steps.changed-files.outputs.any_changed == 'true'
run: |
poetry run -C api ruff --version
poetry run -C api ruff check ./
poetry run -C api ruff format --check ./
poetry run -C api ruff check ./api
poetry run -C api ruff format --check ./api
- name: Dotenv check
if: steps.changed-files.outputs.any_changed == 'true'
run: poetry run -P api dotenv-linter ./api/.env.example ./web/.env.example
run: poetry run -C api dotenv-linter ./api/.env.example ./web/.env.example
- name: Lint hints
if: failure()
@@ -82,33 +82,6 @@ jobs:
if: steps.changed-files.outputs.any_changed == 'true'
run: yarn run lint
docker-compose-template:
name: Docker Compose Template
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Check changed files
id: changed-files
uses: tj-actions/changed-files@v45
with:
files: |
docker/generate_docker_compose
docker/.env.example
docker/docker-compose-template.yaml
docker/docker-compose.yaml
- name: Generate Docker Compose
if: steps.changed-files.outputs.any_changed == 'true'
run: |
cd docker
./generate_docker_compose
- name: Check for changes
if: steps.changed-files.outputs.any_changed == 'true'
run: git diff --exit-code
superlinter:
name: SuperLinter

View File

@@ -70,4 +70,4 @@ jobs:
tidb
- name: Test Vector Stores
run: poetry run -P api bash dev/pytest/pytest_vdb.sh
run: poetry run -C api bash dev/pytest/pytest_vdb.sh

View File

@@ -1,4 +0,0 @@
{
"MD024": false,
"MD013": false
}

View File

@@ -1,45 +0,0 @@
# Changelog
All notable changes to Dify will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.15.8] - 2025-05-30
### Added
- Added gunicorn keepalive setting (#19537)
### Fixed
- Fixed database configuration to allow DB_EXTRAS to set search_path via options (#16a4f77)
- Fixed frontend third-party package security issues (#19655)
- Updated dependencies: huggingface-hub (~0.16.4 to ~0.31.0), transformers (~4.35.0 to ~4.39.0), and resend (~0.7.0 to ~2.9.0) (#19563)
- Downgrade boto3 from 1.36 to 1.35 (#19736)
## [0.15.7] - 2025-04-27
### Added
- Added support for GPT-4.1 in model providers (#18912)
- Added support for Amazon Bedrock DeepSeek-R1 model (#18908)
- Added support for Amazon Bedrock Claude Sonnet 3.7 model (#18788)
- Refined version compatibility logic in app DSL service
### Fixed
- Fixed issue with creating apps from template categories (#18807, #18868)
- Fixed DSL version check when creating apps from explore templates (#18872, #18878)
## [0.15.6] - 2025-04-22
### Security
- Fixed clickjacking vulnerability (#18552)
- Fixed reset password security issue (#18366)
- Updated reset password token when email code verification succeeds (#18362)
### Fixed
- Fixed Vertex AI Gemini 2.0 Flash 001 schema (#18405)

View File

@@ -25,9 +25,6 @@
<a href="https://twitter.com/intent/follow?screen_name=dify_ai" target="_blank">
<img src="https://img.shields.io/twitter/follow/dify_ai?logo=X&color=%20%23f5f5f5"
alt="follow on X(Twitter)"></a>
<a href="https://www.linkedin.com/company/langgenius/" target="_blank">
<img src="https://custom-icon-badges.demolab.com/badge/LinkedIn-0A66C2?logo=linkedin-white&logoColor=fff"
alt="follow on LinkedIn"></a>
<a href="https://hub.docker.com/u/langgenius" target="_blank">
<img alt="Docker Pulls" src="https://img.shields.io/docker/pulls/langgenius/dify-web?labelColor=%20%23FDB062&color=%20%23f79009"></a>
<a href="https://github.com/langgenius/dify/graphs/commit-activity" target="_blank">

View File

@@ -21,9 +21,6 @@
<a href="https://twitter.com/intent/follow?screen_name=dify_ai" target="_blank">
<img src="https://img.shields.io/twitter/follow/dify_ai?logo=X&color=%20%23f5f5f5"
alt="follow on X(Twitter)"></a>
<a href="https://www.linkedin.com/company/langgenius/" target="_blank">
<img src="https://custom-icon-badges.demolab.com/badge/LinkedIn-0A66C2?logo=linkedin-white&logoColor=fff"
alt="follow on LinkedIn"></a>
<a href="https://hub.docker.com/u/langgenius" target="_blank">
<img alt="Docker Pulls" src="https://img.shields.io/docker/pulls/langgenius/dify-web?labelColor=%20%23FDB062&color=%20%23f79009"></a>
<a href="https://github.com/langgenius/dify/graphs/commit-activity" target="_blank">

View File

@@ -21,9 +21,6 @@
<a href="https://twitter.com/intent/follow?screen_name=dify_ai" target="_blank">
<img src="https://img.shields.io/twitter/follow/dify_ai?logo=X&color=%20%23f5f5f5"
alt="follow on X(Twitter)"></a>
<a href="https://www.linkedin.com/company/langgenius/" target="_blank">
<img src="https://custom-icon-badges.demolab.com/badge/LinkedIn-0A66C2?logo=linkedin-white&logoColor=fff"
alt="follow on LinkedIn"></a>
<a href="https://hub.docker.com/u/langgenius" target="_blank">
<img alt="Docker Pulls" src="https://img.shields.io/docker/pulls/langgenius/dify-web?labelColor=%20%23FDB062&color=%20%23f79009"></a>
<a href="https://github.com/langgenius/dify/graphs/commit-activity" target="_blank">

View File

@@ -21,9 +21,6 @@
<a href="https://twitter.com/intent/follow?screen_name=dify_ai" target="_blank">
<img src="https://img.shields.io/twitter/follow/dify_ai?logo=X&color=%20%23f5f5f5"
alt="seguir en X(Twitter)"></a>
<a href="https://www.linkedin.com/company/langgenius/" target="_blank">
<img src="https://custom-icon-badges.demolab.com/badge/LinkedIn-0A66C2?logo=linkedin-white&logoColor=fff"
alt="seguir en LinkedIn"></a>
<a href="https://hub.docker.com/u/langgenius" target="_blank">
<img alt="Descargas de Docker" src="https://img.shields.io/docker/pulls/langgenius/dify-web?labelColor=%20%23FDB062&color=%20%23f79009"></a>
<a href="https://github.com/langgenius/dify/graphs/commit-activity" target="_blank">

View File

@@ -21,9 +21,6 @@
<a href="https://twitter.com/intent/follow?screen_name=dify_ai" target="_blank">
<img src="https://img.shields.io/twitter/follow/dify_ai?logo=X&color=%20%23f5f5f5"
alt="suivre sur X(Twitter)"></a>
<a href="https://www.linkedin.com/company/langgenius/" target="_blank">
<img src="https://custom-icon-badges.demolab.com/badge/LinkedIn-0A66C2?logo=linkedin-white&logoColor=fff"
alt="suivre sur LinkedIn"></a>
<a href="https://hub.docker.com/u/langgenius" target="_blank">
<img alt="Tirages Docker" src="https://img.shields.io/docker/pulls/langgenius/dify-web?labelColor=%20%23FDB062&color=%20%23f79009"></a>
<a href="https://github.com/langgenius/dify/graphs/commit-activity" target="_blank">

View File

@@ -21,9 +21,6 @@
<a href="https://twitter.com/intent/follow?screen_name=dify_ai" target="_blank">
<img src="https://img.shields.io/twitter/follow/dify_ai?logo=X&color=%20%23f5f5f5"
alt="X(Twitter)でフォロー"></a>
<a href="https://www.linkedin.com/company/langgenius/" target="_blank">
<img src="https://custom-icon-badges.demolab.com/badge/LinkedIn-0A66C2?logo=linkedin-white&logoColor=fff"
alt="LinkedInでフォロー"></a>
<a href="https://hub.docker.com/u/langgenius" target="_blank">
<img alt="Docker Pulls" src="https://img.shields.io/docker/pulls/langgenius/dify-web?labelColor=%20%23FDB062&color=%20%23f79009"></a>
<a href="https://github.com/langgenius/dify/graphs/commit-activity" target="_blank">

View File

@@ -21,9 +21,6 @@
<a href="https://twitter.com/intent/follow?screen_name=dify_ai" target="_blank">
<img src="https://img.shields.io/twitter/follow/dify_ai?logo=X&color=%20%23f5f5f5"
alt="follow on X(Twitter)"></a>
<a href="https://www.linkedin.com/company/langgenius/" target="_blank">
<img src="https://custom-icon-badges.demolab.com/badge/LinkedIn-0A66C2?logo=linkedin-white&logoColor=fff"
alt="follow on LinkedIn"></a>
<a href="https://hub.docker.com/u/langgenius" target="_blank">
<img alt="Docker Pulls" src="https://img.shields.io/docker/pulls/langgenius/dify-web?labelColor=%20%23FDB062&color=%20%23f79009"></a>
<a href="https://github.com/langgenius/dify/graphs/commit-activity" target="_blank">

View File

@@ -21,9 +21,6 @@
<a href="https://twitter.com/intent/follow?screen_name=dify_ai" target="_blank">
<img src="https://img.shields.io/twitter/follow/dify_ai?logo=X&color=%20%23f5f5f5"
alt="follow on X(Twitter)"></a>
<a href="https://www.linkedin.com/company/langgenius/" target="_blank">
<img src="https://custom-icon-badges.demolab.com/badge/LinkedIn-0A66C2?logo=linkedin-white&logoColor=fff"
alt="follow on LinkedIn"></a>
<a href="https://hub.docker.com/u/langgenius" target="_blank">
<img alt="Docker Pulls" src="https://img.shields.io/docker/pulls/langgenius/dify-web?labelColor=%20%23FDB062&color=%20%23f79009"></a>
<a href="https://github.com/langgenius/dify/graphs/commit-activity" target="_blank">

View File

@@ -25,9 +25,6 @@
<a href="https://twitter.com/intent/follow?screen_name=dify_ai" target="_blank">
<img src="https://img.shields.io/twitter/follow/dify_ai?logo=X&color=%20%23f5f5f5"
alt="follow on X(Twitter)"></a>
<a href="https://www.linkedin.com/company/langgenius/" target="_blank">
<img src="https://custom-icon-badges.demolab.com/badge/LinkedIn-0A66C2?logo=linkedin-white&logoColor=fff"
alt="follow on LinkedIn"></a>
<a href="https://hub.docker.com/u/langgenius" target="_blank">
<img alt="Docker Pulls" src="https://img.shields.io/docker/pulls/langgenius/dify-web?labelColor=%20%23FDB062&color=%20%23f79009"></a>
<a href="https://github.com/langgenius/dify/graphs/commit-activity" target="_blank">

View File

@@ -22,9 +22,6 @@
<a href="https://twitter.com/intent/follow?screen_name=dify_ai" target="_blank">
<img src="https://img.shields.io/twitter/follow/dify_ai?logo=X&color=%20%23f5f5f5"
alt="follow on X(Twitter)"></a>
<a href="https://www.linkedin.com/company/langgenius/" target="_blank">
<img src="https://custom-icon-badges.demolab.com/badge/LinkedIn-0A66C2?logo=linkedin-white&logoColor=fff"
alt="follow on LinkedIn"></a>
<a href="https://hub.docker.com/u/langgenius" target="_blank">
<img alt="Docker Pulls" src="https://img.shields.io/docker/pulls/langgenius/dify-web?labelColor=%20%23FDB062&color=%20%23f79009"></a>
<a href="https://github.com/langgenius/dify/graphs/commit-activity" target="_blank">

View File

@@ -21,9 +21,6 @@
<a href="https://twitter.com/intent/follow?screen_name=dify_ai" target="_blank">
<img src="https://img.shields.io/twitter/follow/dify_ai?logo=X&color=%20%23f5f5f5"
alt="X(Twitter)'da takip et"></a>
<a href="https://www.linkedin.com/company/langgenius/" target="_blank">
<img src="https://custom-icon-badges.demolab.com/badge/LinkedIn-0A66C2?logo=linkedin-white&logoColor=fff"
alt="LinkedIn'da takip et"></a>
<a href="https://hub.docker.com/u/langgenius" target="_blank">
<img alt="Docker Çekmeleri" src="https://img.shields.io/docker/pulls/langgenius/dify-web?labelColor=%20%23FDB062&color=%20%23f79009"></a>
<a href="https://github.com/langgenius/dify/graphs/commit-activity" target="_blank">
@@ -65,6 +62,8 @@ Görsel bir arayüz üzerinde güçlü AI iş akışları oluşturun ve test edi
![providers-v5](https://github.com/langgenius/dify/assets/13230914/5a17bdbe-097a-4100-8363-40255b70f6e3)
Özür dilerim, haklısınız. Daha anlamlı ve akıcı bir çeviri yapmaya çalışayım. İşte güncellenmiş çeviri:
**3. Prompt IDE**:
Komut istemlerini oluşturmak, model performansını karşılaştırmak ve sohbet tabanlı uygulamalara metin-konuşma gibi ek özellikler eklemek için kullanıcı dostu bir arayüz.
@@ -151,6 +150,8 @@ Görsel bir arayüz üzerinde güçlü AI iş akışları oluşturun ve test edi
## Dify'ı Kullanma
- **Cloud </br>**
İşte verdiğiniz metnin Türkçe çevirisi, kod bloğu içinde:
-
Herkesin sıfır kurulumla denemesi için bir [Dify Cloud](https://dify.ai) hizmeti sunuyoruz. Bu hizmet, kendi kendine dağıtılan versiyonun tüm yeteneklerini sağlar ve sandbox planında 200 ücretsiz GPT-4 çağrısı içerir.
- **Dify Topluluk Sürümünü Kendi Sunucunuzda Barındırma</br>**
@@ -176,6 +177,8 @@ GitHub'da Dify'a yıldız verin ve yeni sürümlerden anında haberdar olun.
>- RAM >= 4GB
</br>
İşte verdiğiniz metnin Türkçe çevirisi, kod bloğu içinde:
Dify sunucusunu başlatmanın en kolay yolu, [docker-compose.yml](docker/docker-compose.yaml) dosyamızı çalıştırmaktır. Kurulum komutunu çalıştırmadan önce, makinenizde [Docker](https://docs.docker.com/get-docker/) ve [Docker Compose](https://docs.docker.com/compose/install/)'un kurulu olduğundan emin olun:
```bash

View File

@@ -21,9 +21,6 @@
<a href="https://twitter.com/intent/follow?screen_name=dify_ai" target="_blank">
<img src="https://img.shields.io/twitter/follow/dify_ai?logo=X&color=%20%23f5f5f5"
alt="theo dõi trên X(Twitter)"></a>
<a href="https://www.linkedin.com/company/langgenius/" target="_blank">
<img src="https://custom-icon-badges.demolab.com/badge/LinkedIn-0A66C2?logo=linkedin-white&logoColor=fff"
alt="theo dõi trên LinkedIn"></a>
<a href="https://hub.docker.com/u/langgenius" target="_blank">
<img alt="Docker Pulls" src="https://img.shields.io/docker/pulls/langgenius/dify-web?labelColor=%20%23FDB062&color=%20%23f79009"></a>
<a href="https://github.com/langgenius/dify/graphs/commit-activity" target="_blank">

View File

@@ -23,9 +23,6 @@ FILES_ACCESS_TIMEOUT=300
# Access token expiration time in minutes
ACCESS_TOKEN_EXPIRE_MINUTES=60
# Refresh token expiration time in days
REFRESH_TOKEN_EXPIRE_DAYS=30
# celery configuration
CELERY_BROKER_URL=redis://:difyai123456@localhost:6379/1
@@ -430,7 +427,4 @@ CREATE_TIDB_SERVICE_JOB_ENABLED=false
# Maximum number of submitted thread count in a ThreadPool for parallel node execution
MAX_SUBMIT_COUNT=100
# Lockout duration in seconds
LOGIN_LOCKOUT_DURATION=86400
# Prevent Clickjacking
ALLOW_EMBED=false
LOGIN_LOCKOUT_DURATION=86400

View File

@@ -53,12 +53,10 @@ ignore = [
"FURB152", # math-constant
"UP007", # non-pep604-annotation
"UP032", # f-string
"UP045", # non-pep604-annotation-optional
"B005", # strip-with-multi-characters
"B006", # mutable-argument-default
"B007", # unused-loop-control-variable
"B026", # star-arg-unpacking-after-keyword-arg
"B903", # class-as-data-structure
"B904", # raise-without-from-inside-except
"B905", # zip-without-explicit-strict
"N806", # non-lowercase-variable-in-function
@@ -87,11 +85,11 @@ ignore = [
]
"tests/*" = [
"F811", # redefined-while-unused
"F401", # unused-import
]
[lint.pyflakes]
allowed-unused-imports = [
extend-generics = [
"_pytest.monkeypatch",
"tests.integration_tests",
"tests.unit_tests",
]

View File

@@ -4,7 +4,7 @@ FROM python:3.12-slim-bookworm AS base
WORKDIR /app/api
# Install Poetry
ENV POETRY_VERSION=2.0.1
ENV POETRY_VERSION=1.8.4
# if you located in China, you can use aliyun mirror to speed up
# RUN pip install --no-cache-dir poetry==${POETRY_VERSION} -i https://mirrors.aliyun.com/pypi/simple/
@@ -48,18 +48,16 @@ ENV TZ=UTC
WORKDIR /app/api
RUN \
apt-get update \
# Install dependencies
&& apt-get install -y --no-install-recommends \
# basic environment
curl nodejs libgmp-dev libmpfr-dev libmpc-dev \
# For Security
expat libldap-2.5-0 perl libsqlite3-0 zlib1g \
# install a chinese font to support the use of tools like matplotlib
fonts-noto-cjk \
# install libmagic to support the use of python-magic guess MIMETYPE
libmagic1 \
RUN apt-get update \
&& apt-get install -y --no-install-recommends curl nodejs libgmp-dev libmpfr-dev libmpc-dev \
# if you located in China, you can use aliyun mirror to speed up
# && echo "deb http://mirrors.aliyun.com/debian testing main" > /etc/apt/sources.list \
&& echo "deb http://deb.debian.org/debian testing main" > /etc/apt/sources.list \
&& apt-get update \
# For Security
&& apt-get install -y --no-install-recommends expat=2.6.4-1 libldap-2.5-0=2.5.18+dfsg-3+b1 perl=5.40.0-8 libsqlite3-0=3.46.1-1 zlib1g=1:1.3.dfsg+really1.3.1-1+b1 \
# install a chinese font to support the use of tools like matplotlib
&& apt-get install -y fonts-noto-cjk \
&& apt-get autoremove -y \
&& rm -rf /var/lib/apt/lists/*
@@ -78,6 +76,7 @@ COPY . /app/api/
COPY docker/entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
ARG COMMIT_SHA
ENV COMMIT_SHA=${COMMIT_SHA}

View File

@@ -79,5 +79,5 @@
2. Run the tests locally with mocked system environment variables in `tool.pytest_env` section in `pyproject.toml`
```bash
poetry run -P api bash dev/pytest/pytest_all_tests.sh
poetry run -C api bash dev/pytest/pytest_all_tests.sh
```

View File

@@ -1,8 +1,12 @@
import os
import sys
from libs import version_utils
# preparation before creating app
version_utils.check_supported_python_version()
def is_db_command():
import sys
if len(sys.argv) > 1 and sys.argv[0].endswith("flask") and sys.argv[1] == "db":
return True
return False
@@ -14,25 +18,10 @@ if is_db_command():
app = create_migrations_app()
else:
# It seems that JetBrains Python debugger does not work well with gevent,
# so we need to disable gevent in debug mode.
# If you are using debugpy and set GEVENT_SUPPORT=True, you can debug with gevent.
if (flask_debug := os.environ.get("FLASK_DEBUG", "0")) and flask_debug.lower() in {"false", "0", "no"}:
from gevent import monkey # type: ignore
# gevent
monkey.patch_all()
from grpc.experimental import gevent as grpc_gevent # type: ignore
# grpc gevent
grpc_gevent.init_gevent()
import psycogreen.gevent # type: ignore
psycogreen.gevent.patch_psycopg()
from app_factory import create_app
from libs import threadings_utils
threadings_utils.apply_gevent_threading_patch()
app = create_app()
celery = app.extensions["celery"]

View File

@@ -146,7 +146,7 @@ class EndpointConfig(BaseSettings):
)
CONSOLE_WEB_URL: str = Field(
description="Base URL for the console web interface,used for frontend references and CORS configuration",
description="Base URL for the console web interface," "used for frontend references and CORS configuration",
default="",
)
@@ -488,11 +488,6 @@ class AuthConfig(BaseSettings):
default=60,
)
REFRESH_TOKEN_EXPIRE_DAYS: PositiveFloat = Field(
description="Expiration time for refresh tokens in days",
default=30,
)
LOGIN_LOCKOUT_DURATION: PositiveInt = Field(
description="Time (in seconds) a user must wait before retrying login after exceeding the rate limit.",
default=86400,
@@ -606,7 +601,7 @@ class RagEtlConfig(BaseSettings):
UNSTRUCTURED_API_KEY: Optional[str] = Field(
description="API key for Unstructured.io service",
default="",
default=None,
)
SCARF_NO_ANALYTICS: Optional[str] = Field(
@@ -672,11 +667,6 @@ class IndexingConfig(BaseSettings):
default=4000,
)
CHILD_CHUNKS_PREVIEW_NUMBER: PositiveInt = Field(
description="Maximum number of child chunks to preview",
default=50,
)
class MultiModalTransferConfig(BaseSettings):
MULTIMODAL_SEND_FORMAT: Literal["base64", "url"] = Field(
@@ -775,13 +765,6 @@ class LoginConfig(BaseSettings):
)
class AccountConfig(BaseSettings):
ACCOUNT_DELETION_TOKEN_EXPIRY_MINUTES: PositiveInt = Field(
description="Duration in minutes for which a account deletion token remains valid",
default=5,
)
class FeatureConfig(
# place the configs in alphabet order
AppExecutionConfig,
@@ -809,7 +792,6 @@ class FeatureConfig(
WorkflowNodeExecutionConfig,
WorkspaceConfig,
LoginConfig,
AccountConfig,
# hosted services config
HostedServiceConfig,
CeleryBeatConfig,

View File

@@ -1,40 +1,9 @@
from typing import Optional
from pydantic import Field, NonNegativeInt, computed_field
from pydantic import Field, NonNegativeInt
from pydantic_settings import BaseSettings
class HostedCreditConfig(BaseSettings):
HOSTED_MODEL_CREDIT_CONFIG: str = Field(
description="Model credit configuration in format 'model:credits,model:credits', e.g., 'gpt-4:20,gpt-4o:10'",
default="",
)
def get_model_credits(self, model_name: str) -> int:
"""
Get credit value for a specific model name.
Returns 1 if model is not found in configuration (default credit).
:param model_name: The name of the model to search for
:return: The credit value for the model
"""
if not self.HOSTED_MODEL_CREDIT_CONFIG:
return 1
try:
credit_map = dict(
item.strip().split(":", 1) for item in self.HOSTED_MODEL_CREDIT_CONFIG.split(",") if ":" in item
)
# Search for matching model pattern
for pattern, credit in credit_map.items():
if pattern.strip() == model_name:
return int(credit)
return 1 # Default quota if no match found
except (ValueError, AttributeError):
return 1 # Return default quota if parsing fails
class HostedOpenAiConfig(BaseSettings):
"""
Configuration for hosted OpenAI service
@@ -212,7 +181,7 @@ class HostedFetchAppTemplateConfig(BaseSettings):
"""
HOSTED_FETCH_APP_TEMPLATES_MODE: str = Field(
description="Mode for fetching app templates: remote, db, or builtin default to remote,",
description="Mode for fetching app templates: remote, db, or builtin" " default to remote,",
default="remote",
)
@@ -233,7 +202,5 @@ class HostedServiceConfig(
HostedZhipuAIConfig,
# moderation
HostedModerationConfig,
# credit config
HostedCreditConfig,
):
pass

View File

@@ -1,5 +1,5 @@
from typing import Any, Literal, Optional
from urllib.parse import parse_qsl, quote_plus
from urllib.parse import quote_plus
from pydantic import Field, NonNegativeInt, PositiveFloat, PositiveInt, computed_field
from pydantic_settings import BaseSettings
@@ -166,28 +166,14 @@ class DatabaseConfig(BaseSettings):
default=False,
)
@computed_field # type: ignore[misc]
@property
@computed_field
def SQLALCHEMY_ENGINE_OPTIONS(self) -> dict[str, Any]:
# Parse DB_EXTRAS for 'options'
db_extras_dict = dict(parse_qsl(self.DB_EXTRAS))
options = db_extras_dict.get("options", "")
# Always include timezone
timezone_opt = "-c timezone=UTC"
if options:
# Merge user options and timezone
merged_options = f"{options} {timezone_opt}"
else:
merged_options = timezone_opt
connect_args = {"options": merged_options}
return {
"pool_size": self.SQLALCHEMY_POOL_SIZE,
"max_overflow": self.SQLALCHEMY_MAX_OVERFLOW,
"pool_recycle": self.SQLALCHEMY_POOL_RECYCLE,
"pool_pre_ping": self.SQLALCHEMY_POOL_PRE_PING,
"connect_args": connect_args,
"connect_args": {"options": "-c timezone=UTC"},
}

View File

@@ -33,9 +33,3 @@ class MilvusConfig(BaseSettings):
description="Name of the Milvus database to connect to (default is 'default')",
default="default",
)
MILVUS_ENABLE_HYBRID_SEARCH: bool = Field(
description="Enable hybrid search features (requires Milvus >= 2.5.0). Set to false for compatibility with "
"older versions",
default=True,
)

View File

@@ -9,7 +9,7 @@ class PackagingInfo(BaseSettings):
CURRENT_VERSION: str = Field(
description="Dify version",
default="0.15.8",
default="0.14.2",
)
COMMIT_SHA: str = Field(

View File

@@ -1,32 +1,12 @@
import mimetypes
import os
import platform
import re
import urllib.parse
import warnings
from collections.abc import Mapping
from typing import Any
from uuid import uuid4
import httpx
try:
import magic
except ImportError:
if platform.system() == "Windows":
warnings.warn(
"To use python-magic guess MIMETYPE, you need to run `pip install python-magic-bin`", stacklevel=2
)
elif platform.system() == "Darwin":
warnings.warn("To use python-magic guess MIMETYPE, you need to run `brew install libmagic`", stacklevel=2)
elif platform.system() == "Linux":
warnings.warn(
"To use python-magic guess MIMETYPE, you need to run `sudo apt-get install libmagic1`", stacklevel=2
)
else:
warnings.warn("To use python-magic guess MIMETYPE, you need to install `libmagic`", stacklevel=2)
magic = None # type: ignore
from pydantic import BaseModel
from configs import dify_config
@@ -67,13 +47,6 @@ def guess_file_info_from_response(response: httpx.Response):
# If guessing fails, use Content-Type from response headers
mimetype = response.headers.get("Content-Type", "application/octet-stream")
# Use python-magic to guess MIME type if still unknown or generic
if mimetype == "application/octet-stream" and magic is not None:
try:
mimetype = magic.from_buffer(response.content[:1024], mime=True)
except magic.MagicException:
pass
extension = os.path.splitext(filename)[1]
# Ensure filename has an extension

View File

@@ -56,7 +56,7 @@ class InsertExploreAppListApi(Resource):
app = App.query.filter(App.id == args["app_id"]).first()
if not app:
raise NotFound(f"App '{args['app_id']}' is not found")
raise NotFound(f'App \'{args["app_id"]}\' is not found')
site = app.site
if not site:

View File

@@ -2,28 +2,30 @@ import uuid
from typing import cast
from flask_login import current_user # type: ignore
from flask_restful import (Resource, inputs, marshal, # type: ignore
marshal_with, reqparse)
from flask_restful import Resource, inputs, marshal, marshal_with, reqparse # type: ignore
from sqlalchemy import select
from sqlalchemy.orm import Session
from werkzeug.exceptions import BadRequest, Forbidden, abort
from controllers.console import api
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import (account_initialization_required,
cloud_edition_billing_resource_check,
enterprise_license_required,
setup_required)
from controllers.console.wraps import (
account_initialization_required,
cloud_edition_billing_resource_check,
enterprise_license_required,
setup_required,
)
from core.ops.ops_trace_manager import OpsTraceManager
from extensions.ext_database import db
from fields.app_fields import (app_detail_fields, app_detail_fields_with_site,
app_pagination_fields)
from fields.app_fields import (
app_detail_fields,
app_detail_fields_with_site,
app_pagination_fields,
)
from libs.login import login_required
from models import Account, App
from services.app_dsl_service import AppDslService, ImportMode
from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
ALLOW_CREATE_APP_MODES = ["chat", "agent-chat", "advanced-chat", "workflow", "completion"]
@@ -55,27 +57,16 @@ class AppListApi(Resource):
)
parser.add_argument("name", type=str, location="args", required=False)
parser.add_argument("tag_ids", type=uuid_list, location="args", required=False)
parser.add_argument("is_created_by_me", type=inputs.boolean, location="args", required=False)
args = parser.parse_args()
# get app list
app_service = AppService()
app_pagination = app_service.get_paginate_apps(current_user.id, current_user.current_tenant_id, args)
app_pagination = app_service.get_paginate_apps(current_user.current_tenant_id, args)
if not app_pagination:
return {"data": [], "total": 0, "page": 1, "limit": 20, "has_more": False}
if FeatureService.get_system_features().webapp_auth.enabled:
app_ids = [str(app.id) for app in app_pagination.items]
res = EnterpriseService.WebAppAuth.batch_get_app_access_mode_by_id(app_ids=app_ids)
if len(res) != len(app_ids):
raise BadRequest("Invalid app id in webapp auth")
for app in app_pagination.items:
if str(app.id) in res:
app.access_mode = res[str(app.id)].access_mode
return marshal(app_pagination, app_pagination_fields), 200
return marshal(app_pagination, app_pagination_fields)
@setup_required
@login_required
@@ -119,10 +110,6 @@ class AppApi(Resource):
app_model = app_service.get_app(app_model)
if FeatureService.get_system_features().webapp_auth.enabled:
app_setting = EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id=str(app_model.id))
app_model.access_mode = app_setting.access_mode
return app_model
@setup_required

View File

@@ -22,7 +22,7 @@ from controllers.console.wraps import account_initialization_required, setup_req
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.model_runtime.errors.invoke import InvokeError
from libs.login import login_required
from models import App, AppMode
from models.model import AppMode
from services.audio_service import AudioService
from services.errors.audio import (
AudioTooLargeServiceError,
@@ -79,7 +79,7 @@ class ChatMessageTextApi(Resource):
@login_required
@account_initialization_required
@get_app_model
def post(self, app_model: App):
def post(self, app_model):
from werkzeug.exceptions import InternalServerError
try:
@@ -98,13 +98,9 @@ class ChatMessageTextApi(Resource):
and app_model.workflow.features_dict
):
text_to_speech = app_model.workflow.features_dict.get("text_to_speech")
if text_to_speech is None:
raise ValueError("TTS is not enabled")
voice = args.get("voice") or text_to_speech.get("voice")
else:
try:
if app_model.app_model_config is None:
raise ValueError("AppModelConfig not found")
voice = args.get("voice") or app_model.app_model_config.text_to_speech_dict.get("voice")
except Exception:
voice = None

View File

@@ -20,6 +20,7 @@ from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpErr
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import (
AppInvokeQuotaExceededError,
ModelCurrentlyNotSupportError,
ProviderTokenNotInitError,
QuotaExceededError,
@@ -75,7 +76,7 @@ class CompletionMessageApi(Resource):
raise ProviderModelCurrentlyNotSupportError()
except InvokeError as e:
raise CompletionRequestError(e.description)
except ValueError as e:
except (ValueError, AppInvokeQuotaExceededError) as e:
raise e
except Exception as e:
logging.exception("internal server error.")
@@ -140,7 +141,7 @@ class ChatMessageApi(Resource):
raise InvokeRateLimitHttpError(ex.description)
except InvokeError as e:
raise CompletionRequestError(e.description)
except ValueError as e:
except (ValueError, AppInvokeQuotaExceededError) as e:
raise e
except Exception as e:
logging.exception("internal server error.")

View File

@@ -273,7 +273,8 @@ FROM
messages m
ON c.id = m.conversation_id
WHERE
c.app_id = :app_id"""
c.override_model_configs IS NULL
AND c.app_id = :app_id"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id}
timezone = pytz.timezone(account.timezone)

View File

@@ -2,7 +2,7 @@ import json
import logging
from flask import abort, request
from flask_restful import Resource, inputs, marshal_with, reqparse # type: ignore
from flask_restful import Resource, marshal_with, reqparse # type: ignore
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
import services
@@ -14,7 +14,7 @@ from controllers.console.wraps import account_initialization_required, setup_req
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from factories import variable_factory
from fields.workflow_fields import workflow_fields, workflow_pagination_fields
from fields.workflow_fields import workflow_fields
from fields.workflow_run_fields import workflow_run_node_execution_fields
from libs import helper
from libs.helper import TimestampField, uuid_value
@@ -440,29 +440,29 @@ class WorkflowConfigApi(Resource):
}
class PublishedAllWorkflowApi(Resource):
class DraftWorkflowNodeRetriableApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@marshal_with(workflow_pagination_fields)
def get(self, app_model: App):
@marshal_with(workflow_run_node_execution_fields)
def post(self, app_model: App, node_id: str):
"""
Get published workflows
Run draft workflow node
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not current_user.is_editor:
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument("page", type=inputs.int_range(1, 99999), required=False, default=1, location="args")
parser.add_argument("limit", type=inputs.int_range(1, 100), required=False, default=20, location="args")
parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json")
args = parser.parse_args()
page = args.get("page")
limit = args.get("limit")
workflow_service = WorkflowService()
workflows, has_more = workflow_service.get_all_published_workflow(app_model=app_model, page=page, limit=limit)
workflow_node_execution = workflow_service.run_retriable_draft_workflow_node(
app_model=app_model, node_id=node_id, user_inputs=args.get("inputs", {}), account=current_user
)
return {"items": workflows, "page": page, "limit": limit, "has_more": has_more}
return workflow_node_execution
api.add_resource(DraftWorkflowApi, "/apps/<uuid:app_id>/workflows/draft")
@@ -479,9 +479,9 @@ api.add_resource(
WorkflowDraftRunIterationNodeApi, "/apps/<uuid:app_id>/workflows/draft/iteration/nodes/<string:node_id>/run"
)
api.add_resource(PublishedWorkflowApi, "/apps/<uuid:app_id>/workflows/publish")
api.add_resource(PublishedAllWorkflowApi, "/apps/<uuid:app_id>/workflows")
api.add_resource(DefaultBlockConfigsApi, "/apps/<uuid:app_id>/workflows/default-workflow-block-configs")
api.add_resource(
DefaultBlockConfigApi, "/apps/<uuid:app_id>/workflows/default-workflow-block-configs/<string:block_type>"
)
api.add_resource(ConvertToWorkflowApi, "/apps/<uuid:app_id>/convert-to-workflow")
api.add_resource(DraftWorkflowNodeRetriableApi, "/apps/<uuid:app_id>/workflows/draft/retry/nodes/<string:node_id>/run")

View File

@@ -53,15 +53,3 @@ class EmailCodeLoginRateLimitExceededError(BaseHTTPException):
error_code = "email_code_login_rate_limit_exceeded"
description = "Too many login emails have been sent. Please try again in 5 minutes."
code = 429
class EmailCodeAccountDeletionRateLimitExceededError(BaseHTTPException):
error_code = "email_code_account_deletion_rate_limit_exceeded"
description = "Too many account deletion emails have been sent. Please try again in 5 minutes."
code = 429
class EmailPasswordResetLimitError(BaseHTTPException):
error_code = "email_password_reset_limit"
description = "Too many failed password reset attempts. Please try again in 24 hours."
code = 429

View File

@@ -6,28 +6,26 @@ from flask_restful import Resource, reqparse # type: ignore
from constants.languages import languages
from controllers.console import api
from controllers.console.auth.error import (EmailCodeError, InvalidEmailError,
InvalidTokenError,
PasswordMismatchError)
from controllers.console.error import (AccountInFreezeError, AccountNotFound,
EmailSendIpLimitError)
from controllers.console.wraps import (email_password_login_enabled,
setup_required)
from controllers.console.auth.error import (
EmailCodeError,
InvalidEmailError,
InvalidTokenError,
PasswordMismatchError,
)
from controllers.console.error import AccountNotFound, EmailSendIpLimitError
from controllers.console.wraps import setup_required
from events.tenant_event import tenant_was_created
from extensions.ext_database import db
from libs.helper import email, extract_remote_ip
from libs.password import hash_password, valid_password
from models.account import Account
from services.account_service import AccountService, TenantService
from services.errors.account import AccountRegisterError
from services.errors.workspace import (WorkSpaceNotAllowedCreateError,
WorkspacesLimitExceededError)
from services.errors.workspace import WorkSpaceNotAllowedCreateError
from services.feature_service import FeatureService
class ForgotPasswordSendEmailApi(Resource):
@setup_required
@email_password_login_enabled
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("email", type=email, required=True, location="json")
@@ -59,7 +57,6 @@ class ForgotPasswordSendEmailApi(Resource):
class ForgotPasswordCheckApi(Resource):
@setup_required
@email_password_login_enabled
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("email", type=str, required=True, location="json")
@@ -79,20 +76,11 @@ class ForgotPasswordCheckApi(Resource):
if args["code"] != token_data.get("code"):
raise EmailCodeError()
# Verified, revoke the first token
AccountService.revoke_reset_password_token(args["token"])
# Refresh token data by generating a new token
_, new_token = AccountService.generate_reset_password_token(
user_email, code=args["code"], additional_data={"phase": "reset"}
)
return {"is_valid": True, "email": token_data.get("email"), "token": new_token}
return {"is_valid": True, "email": token_data.get("email")}
class ForgotPasswordResetApi(Resource):
@setup_required
@email_password_login_enabled
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("token", type=str, required=True, nullable=False, location="json")
@@ -111,9 +99,6 @@ class ForgotPasswordResetApi(Resource):
if reset_data is None:
raise InvalidTokenError()
# Must use token in reset phase
if reset_data.get("phase", "") != "reset":
raise InvalidTokenError()
AccountService.revoke_reset_password_token(token)
@@ -144,10 +129,6 @@ class ForgotPasswordResetApi(Resource):
)
except WorkSpaceNotAllowedCreateError:
pass
except AccountRegisterError as are:
raise AccountInFreezeError()
except WorkspacesLimitExceededError:
pass
return {"result": "success"}

View File

@@ -5,7 +5,6 @@ from flask import request
from flask_restful import Resource, reqparse # type: ignore
import services
from configs import dify_config
from constants.languages import languages
from controllers.console import api
from controllers.console.auth.error import (
@@ -17,21 +16,17 @@ from controllers.console.auth.error import (
)
from controllers.console.error import (
AccountBannedError,
AccountInFreezeError,
AccountNotFound,
EmailSendIpLimitError,
NotAllowedCreateWorkspace,
WorkspacesLimitExceeded,
)
from controllers.console.wraps import email_password_login_enabled, setup_required
from controllers.console.wraps import setup_required
from events.tenant_event import tenant_was_created
from libs.helper import email, extract_remote_ip
from libs.password import valid_password
from models.account import Account
from services.account_service import AccountService, RegisterService, TenantService
from services.billing_service import BillingService
from services.errors.account import AccountRegisterError
from services.errors.workspace import WorkSpaceNotAllowedCreateError, WorkspacesLimitExceededError
from services.errors.workspace import WorkSpaceNotAllowedCreateError
from services.feature_service import FeatureService
@@ -39,7 +34,6 @@ class LoginApi(Resource):
"""Resource for user login."""
@setup_required
@email_password_login_enabled
def post(self):
"""Authenticate user and login."""
parser = reqparse.RequestParser()
@@ -50,9 +44,6 @@ class LoginApi(Resource):
parser.add_argument("language", type=str, required=False, default="en-US", location="json")
args = parser.parse_args()
if dify_config.BILLING_ENABLED and BillingService.is_email_in_freeze(args["email"]):
raise AccountInFreezeError()
is_login_error_rate_limit = AccountService.is_login_error_rate_limit(args["email"])
if is_login_error_rate_limit:
raise EmailPasswordLoginLimitError()
@@ -89,15 +80,10 @@ class LoginApi(Resource):
# SELF_HOSTED only have one workspace
tenants = TenantService.get_join_tenants(account)
if len(tenants) == 0:
system_features = FeatureService.get_system_features()
if system_features.is_allow_create_workspace and not system_features.license.workspaces.is_available():
raise WorkspacesLimitExceeded()
else:
return {
"result": "fail",
"data": "workspace not found, please contact system admin to invite you to join in a workspace",
}
return {
"result": "fail",
"data": "workspace not found, please contact system admin to invite you to join in a workspace",
}
token_pair = AccountService.login(account=account, ip_address=extract_remote_ip(request))
AccountService.reset_login_error_rate_limit(args["email"])
@@ -117,7 +103,6 @@ class LogoutApi(Resource):
class ResetPasswordSendEmailApi(Resource):
@setup_required
@email_password_login_enabled
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("email", type=email, required=True, location="json")
@@ -128,10 +113,8 @@ class ResetPasswordSendEmailApi(Resource):
language = "zh-Hans"
else:
language = "en-US"
try:
account = AccountService.get_user_through_email(args["email"])
except AccountRegisterError as are:
raise AccountInFreezeError()
account = AccountService.get_user_through_email(args["email"])
if account is None:
if FeatureService.get_system_features().is_allow_register:
token = AccountService.send_reset_password_email(email=args["email"], language=language)
@@ -159,11 +142,8 @@ class EmailCodeLoginSendEmailApi(Resource):
language = "zh-Hans"
else:
language = "en-US"
try:
account = AccountService.get_user_through_email(args["email"])
except AccountRegisterError as are:
raise AccountInFreezeError()
account = AccountService.get_user_through_email(args["email"])
if account is None:
if FeatureService.get_system_features().is_allow_register:
token = AccountService.send_email_code_login_email(email=args["email"], language=language)
@@ -197,16 +177,10 @@ class EmailCodeLoginApi(Resource):
raise EmailCodeError()
AccountService.revoke_email_code_login_token(args["token"])
try:
account = AccountService.get_user_through_email(user_email)
except AccountRegisterError as are:
raise AccountInFreezeError()
account = AccountService.get_user_through_email(user_email)
if account:
tenant = TenantService.get_join_tenants(account)
if not tenant:
workspaces = FeatureService.get_system_features().license.workspaces
if not workspaces.is_available():
raise WorkspacesLimitExceeded()
if not FeatureService.get_system_features().is_allow_create_workspace:
raise NotAllowedCreateWorkspace()
else:
@@ -222,10 +196,6 @@ class EmailCodeLoginApi(Resource):
)
except WorkSpaceNotAllowedCreateError:
return NotAllowedCreateWorkspace()
except AccountRegisterError as are:
raise AccountInFreezeError()
except WorkspacesLimitExceededError:
raise WorkspacesLimitExceeded()
token_pair = AccountService.login(account, ip_address=extract_remote_ip(request))
AccountService.reset_login_error_rate_limit(args["email"])
return {"result": "success", "data": token_pair.model_dump()}

View File

@@ -16,7 +16,7 @@ from libs.oauth import GitHubOAuth, GoogleOAuth, OAuthUserInfo
from models import Account
from models.account import AccountStatus
from services.account_service import AccountService, RegisterService, TenantService
from services.errors.account import AccountNotFoundError, AccountRegisterError
from services.errors.account import AccountNotFoundError
from services.errors.workspace import WorkSpaceNotAllowedCreateError, WorkSpaceNotFoundError
from services.feature_service import FeatureService
@@ -99,8 +99,6 @@ class OAuthCallback(Resource):
f"{dify_config.CONSOLE_WEB_URL}/signin"
"?message=Workspace not found, please contact system admin to invite you to join in a workspace."
)
except AccountRegisterError as e:
return redirect(f"{dify_config.CONSOLE_WEB_URL}/signin?message={e.description}")
# Check account status
if account.status == AccountStatus.BANNED.value:

View File

@@ -52,12 +52,12 @@ class DatasetListApi(Resource):
# provider = request.args.get("provider", default="vendor")
search = request.args.get("keyword", default=None, type=str)
tag_ids = request.args.getlist("tag_ids")
include_all = request.args.get("include_all", default="false").lower() == "true"
if ids:
datasets, total = DatasetService.get_datasets_by_ids(ids, current_user.current_tenant_id)
else:
datasets, total = DatasetService.get_datasets(
page, limit, current_user.current_tenant_id, current_user, search, tag_ids, include_all
page, limit, current_user.current_tenant_id, current_user, search, tag_ids
)
# check embedding setting
@@ -457,7 +457,7 @@ class DatasetIndexingEstimateApi(Resource):
)
except LLMBadRequestError:
raise ProviderNotInitializeError(
"No Embedding Model available. Please configure a valid provider in the Settings -> Model Provider."
"No Embedding Model available. Please configure a valid provider " "in the Settings -> Model Provider."
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@@ -619,7 +619,9 @@ class DatasetRetrievalSettingApi(Resource):
vector_type = dify_config.VECTOR_STORE
match vector_type:
case (
VectorType.RELYT
VectorType.MILVUS
| VectorType.RELYT
| VectorType.PGVECTOR
| VectorType.TIDB_VECTOR
| VectorType.CHROMA
| VectorType.TENCENT
@@ -638,12 +640,10 @@ class DatasetRetrievalSettingApi(Resource):
| VectorType.MYSCALE
| VectorType.ORACLE
| VectorType.ELASTICSEARCH
| VectorType.ELASTICSEARCH_JA
| VectorType.PGVECTOR
| VectorType.TIDB_ON_QDRANT
| VectorType.LINDORM
| VectorType.COUCHBASE
| VectorType.MILVUS
):
return {
"retrieval_method": [
@@ -683,7 +683,6 @@ class DatasetRetrievalSettingMockApi(Resource):
| VectorType.MYSCALE
| VectorType.ORACLE
| VectorType.ELASTICSEARCH
| VectorType.ELASTICSEARCH_JA
| VectorType.COUCHBASE
| VectorType.PGVECTOR
| VectorType.LINDORM

View File

@@ -257,8 +257,7 @@ class DatasetDocumentListApi(Resource):
parser.add_argument("original_document_id", type=str, required=False, location="json")
parser.add_argument("doc_form", type=str, default="text_model", required=False, nullable=False, location="json")
parser.add_argument("retrieval_model", type=dict, required=False, nullable=False, location="json")
parser.add_argument("embedding_model", type=str, required=False, nullable=True, location="json")
parser.add_argument("embedding_model_provider", type=str, required=False, nullable=True, location="json")
parser.add_argument(
"doc_language", type=str, default="English", required=False, nullable=False, location="json"
)
@@ -350,7 +349,8 @@ class DatasetInitApi(Resource):
)
except InvokeAuthorizationError:
raise ProviderNotInitializeError(
"No Embedding Model available. Please configure a valid provider in the Settings -> Model Provider."
"No Embedding Model available. Please configure a valid provider "
"in the Settings -> Model Provider."
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@@ -525,7 +525,8 @@ class DocumentBatchIndexingEstimateApi(DocumentResource):
return response.model_dump(), 200
except LLMBadRequestError:
raise ProviderNotInitializeError(
"No Embedding Model available. Please configure a valid provider in the Settings -> Model Provider."
"No Embedding Model available. Please configure a valid provider "
"in the Settings -> Model Provider."
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)

View File

@@ -168,7 +168,8 @@ class DatasetDocumentSegmentApi(Resource):
)
except LLMBadRequestError:
raise ProviderNotInitializeError(
"No Embedding Model available. Please configure a valid provider in the Settings -> Model Provider."
"No Embedding Model available. Please configure a valid provider "
"in the Settings -> Model Provider."
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@@ -216,7 +217,8 @@ class DatasetDocumentSegmentAddApi(Resource):
)
except LLMBadRequestError:
raise ProviderNotInitializeError(
"No Embedding Model available. Please configure a valid provider in the Settings -> Model Provider."
"No Embedding Model available. Please configure a valid provider "
"in the Settings -> Model Provider."
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@@ -265,7 +267,8 @@ class DatasetDocumentSegmentUpdateApi(Resource):
)
except LLMBadRequestError:
raise ProviderNotInitializeError(
"No Embedding Model available. Please configure a valid provider in the Settings -> Model Provider."
"No Embedding Model available. Please configure a valid provider "
"in the Settings -> Model Provider."
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@@ -365,9 +368,9 @@ class DatasetDocumentSegmentBatchImportApi(Resource):
result = []
for index, row in df.iterrows():
if document.doc_form == "qa_model":
data = {"content": row.iloc[0], "answer": row.iloc[1]}
data = {"content": row[0], "answer": row[1]}
else:
data = {"content": row.iloc[0]}
data = {"content": row[0]}
result.append(data)
if len(result) == 0:
raise ValueError("The CSV file is empty.")
@@ -434,7 +437,8 @@ class ChildChunkAddApi(Resource):
)
except LLMBadRequestError:
raise ProviderNotInitializeError(
"No Embedding Model available. Please configure a valid provider in the Settings -> Model Provider."
"No Embedding Model available. Please configure a valid provider "
"in the Settings -> Model Provider."
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)

View File

@@ -46,18 +46,6 @@ class NotAllowedCreateWorkspace(BaseHTTPException):
code = 400
class WorkspaceMembersLimitExceeded(BaseHTTPException):
error_code = "limit_exceeded"
description = "Unable to add member because the maximum workspace's member limit was exceeded"
code = 400
class WorkspacesLimitExceeded(BaseHTTPException):
error_code = "limit_exceeded"
description = "Unable to create workspace because the maximum workspace limit was exceeded"
code = 400
class AccountBannedError(BaseHTTPException):
error_code = "account_banned"
description = "Account is banned."
@@ -104,12 +92,3 @@ class UnauthorizedAndForceLogout(BaseHTTPException):
error_code = "unauthorized_and_force_logout"
description = "Unauthorized and force logout."
code = 401
class AccountInFreezeError(BaseHTTPException):
error_code = "account_in_freeze"
code = 400
description = (
"This email account has been deleted within the past 30 days"
"and is temporarily unavailable for new account registration."
)

View File

@@ -18,11 +18,7 @@ from controllers.console.explore.error import NotChatAppError, NotCompletionAppE
from controllers.console.explore.wraps import InstalledAppResource
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import (
ModelCurrentlyNotSupportError,
ProviderTokenNotInitError,
QuotaExceededError,
)
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.model_runtime.errors.invoke import InvokeError
from extensions.ext_database import db
from libs import helper

View File

@@ -32,7 +32,7 @@ class ConversationListApi(InstalledAppResource):
pinned = None
if "pinned" in args and args["pinned"] is not None:
pinned = args["pinned"] == "true"
pinned = True if args["pinned"] == "true" else False
try:
with Session(db.engine) as session:

View File

@@ -23,9 +23,3 @@ class AppSuggestedQuestionsAfterAnswerDisabledError(BaseHTTPException):
error_code = "app_suggested_questions_after_answer_disabled"
description = "Function Suggested questions after answer disabled."
code = 403
class AppAccessDeniedError(BaseHTTPException):
error_code = "access_denied"
description = "App access denied."
code = 403

View File

@@ -1,26 +1,20 @@
import logging
from datetime import UTC, datetime
from typing import Any
from flask import request
from flask_login import current_user # type: ignore
from flask_restful import (Resource, inputs, marshal_with, # type: ignore
reqparse)
from flask_restful import Resource, inputs, marshal_with, reqparse # type: ignore
from sqlalchemy import and_
from werkzeug.exceptions import BadRequest, Forbidden, NotFound
from controllers.console import api
from controllers.console.explore.wraps import InstalledAppResource
from controllers.console.wraps import (account_initialization_required,
cloud_edition_billing_resource_check)
from controllers.console.wraps import account_initialization_required, cloud_edition_billing_resource_check
from extensions.ext_database import db
from fields.installed_app_fields import installed_app_list_fields
from libs.login import login_required
from models import App, InstalledApp, RecommendedApp
from services.account_service import TenantService
from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
class InstalledAppsListApi(Resource):
@@ -54,30 +48,6 @@ class InstalledAppsListApi(Resource):
for installed_app in installed_apps
if installed_app.app is not None
]
# filter out apps that user doesn't have access to
if FeatureService.get_system_features().webapp_auth.enabled:
user_id = current_user.id
res = []
app_ids = [installed_app["app"].id for installed_app in installed_app_list]
webapp_settings = EnterpriseService.WebAppAuth.batch_get_app_access_mode_by_id(app_ids)
for installed_app in installed_app_list:
webapp_setting = webapp_settings.get(installed_app["app"].id)
if not webapp_setting:
continue
if webapp_setting.access_mode == "sso_verified":
continue
app_code = AppService.get_app_code_by_id(str(installed_app["app"].id))
if EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(
user_id=user_id,
app_code=app_code,
):
res.append(installed_app)
installed_app_list = res
logging.info(
f"installed_app_list: {installed_app_list}, user_id: {user_id}"
)
installed_app_list.sort(
key=lambda app: (
-app["is_pinned"],

View File

@@ -50,7 +50,7 @@ class MessageListApi(InstalledAppResource):
try:
return MessageService.pagination_by_first_id(
app_model, current_user, args["conversation_id"], args["first_id"], args["limit"]
app_model, current_user, args["conversation_id"], args["first_id"], args["limit"], "desc"
)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
@@ -66,17 +66,10 @@ class MessageFeedbackApi(InstalledAppResource):
parser = reqparse.RequestParser()
parser.add_argument("rating", type=str, choices=["like", "dislike", None], location="json")
parser.add_argument("content", type=str, location="json")
args = parser.parse_args()
try:
MessageService.create_feedback(
app_model=app_model,
message_id=message_id,
user=current_user,
rating=args.get("rating"),
content=args.get("content"),
)
MessageService.create_feedback(app_model, message_id, current_user, args.get("rating"), args.get("content"))
except services.errors.message.MessageNotExistsError:
raise NotFound("Message Not Exists.")

View File

@@ -13,11 +13,7 @@ from controllers.console.explore.error import NotWorkflowAppError
from controllers.console.explore.wraps import InstalledAppResource
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import (
ModelCurrentlyNotSupportError,
ProviderTokenNotInitError,
QuotaExceededError,
)
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.login import current_user

View File

@@ -4,14 +4,10 @@ from flask_login import current_user # type: ignore
from flask_restful import Resource # type: ignore
from werkzeug.exceptions import NotFound
from controllers.console.explore.error import AppAccessDeniedError
from controllers.console.wraps import account_initialization_required
from extensions.ext_database import db
from libs.login import login_required
from models import InstalledApp
from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
def installed_app_required(view=None):
@@ -52,30 +48,6 @@ def installed_app_required(view=None):
return decorator
def user_allowed_to_access_app(view=None):
def decorator(view):
@wraps(view)
def decorated(installed_app: InstalledApp, *args, **kwargs):
feature = FeatureService.get_system_features()
if feature.webapp_auth.enabled:
app_id = installed_app.app_id
app_code = AppService.get_app_code_by_id(app_id)
res = EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(
user_id=str(current_user.id),
app_code=app_code,
)
if not res:
raise AppAccessDeniedError()
return view(installed_app, *args, **kwargs)
return decorated
if view:
return decorator(view)
return decorator
class InstalledAppResource(Resource):
# must be reversed if there are multiple decorators
method_decorators = [user_allowed_to_access_app, installed_app_required, account_initialization_required, login_required]
method_decorators = [installed_app_required, account_initialization_required, login_required]

View File

@@ -11,7 +11,6 @@ from controllers.console import api
from controllers.console.workspace.error import (
AccountAlreadyInitedError,
CurrentPasswordIncorrectError,
InvalidAccountDeletionCodeError,
InvalidInvitationCodeError,
RepeatPasswordNotMatchError,
)
@@ -22,7 +21,6 @@ from libs.helper import TimestampField, timezone
from libs.login import login_required
from models import AccountIntegrate, InvitationCode
from services.account_service import AccountService
from services.billing_service import BillingService
from services.errors.account import CurrentPasswordIncorrectError as ServiceCurrentPasswordIncorrectError
@@ -244,54 +242,6 @@ class AccountIntegrateApi(Resource):
return {"data": integrate_data}
class AccountDeleteVerifyApi(Resource):
@setup_required
@login_required
@account_initialization_required
def get(self):
account = current_user
token, code = AccountService.generate_account_deletion_verification_code(account)
AccountService.send_account_deletion_verification_email(account, code)
return {"result": "success", "data": token}
class AccountDeleteApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self):
account = current_user
parser = reqparse.RequestParser()
parser.add_argument("token", type=str, required=True, location="json")
parser.add_argument("code", type=str, required=True, location="json")
args = parser.parse_args()
if not AccountService.verify_account_deletion_code(args["token"], args["code"]):
raise InvalidAccountDeletionCodeError()
AccountService.delete_account(account)
return {"result": "success"}
class AccountDeleteUpdateFeedbackApi(Resource):
@setup_required
def post(self):
account = current_user
parser = reqparse.RequestParser()
parser.add_argument("email", type=str, required=True, location="json")
parser.add_argument("feedback", type=str, required=True, location="json")
args = parser.parse_args()
BillingService.update_account_deletion_feedback(args["email"], args["feedback"])
return {"result": "success"}
# Register API resources
api.add_resource(AccountInitApi, "/account/init")
api.add_resource(AccountProfileApi, "/account/profile")
@@ -302,8 +252,5 @@ api.add_resource(AccountInterfaceThemeApi, "/account/interface-theme")
api.add_resource(AccountTimezoneApi, "/account/timezone")
api.add_resource(AccountPasswordApi, "/account/password")
api.add_resource(AccountIntegrateApi, "/account/integrates")
api.add_resource(AccountDeleteVerifyApi, "/account/delete/verify")
api.add_resource(AccountDeleteApi, "/account/delete")
api.add_resource(AccountDeleteUpdateFeedbackApi, "/account/delete/feedback")
# api.add_resource(AccountEmailApi, '/account/email')
# api.add_resource(AccountEmailVerifyApi, '/account/email-verify')

View File

@@ -35,9 +35,3 @@ class AccountNotInitializedError(BaseHTTPException):
error_code = "account_not_initialized"
description = "The account has not been initialized yet. Please proceed with the initialization process first."
code = 400
class InvalidAccountDeletionCodeError(BaseHTTPException):
error_code = "invalid_account_deletion_code"
description = "Invalid account deletion code."
code = 400

View File

@@ -6,7 +6,6 @@ from flask_restful import Resource, abort, marshal_with, reqparse # type: ignor
import services
from configs import dify_config
from controllers.console import api
from controllers.console.error import WorkspaceMembersLimitExceeded
from controllers.console.wraps import (
account_initialization_required,
cloud_edition_billing_resource_check,
@@ -18,7 +17,6 @@ from libs.login import login_required
from models.account import Account, TenantAccountRole
from services.account_service import RegisterService, TenantService
from services.errors.account import AccountAlreadyInTenantError
from services.feature_service import FeatureService
class MemberListApi(Resource):
@@ -56,12 +54,6 @@ class MemberInviteEmailApi(Resource):
inviter = current_user
invitation_results = []
console_web_url = dify_config.CONSOLE_WEB_URL
workspace_members = FeatureService.get_features(tenant_id=inviter.current_tenant.id).workspace_members
if not workspace_members.is_available(len(invitee_emails)):
raise WorkspaceMembersLimitExceeded()
for invitee_email in invitee_emails:
try:
token = RegisterService.invite_new_member(
@@ -79,6 +71,7 @@ class MemberInviteEmailApi(Resource):
invitation_results.append(
{"status": "success", "email": invitee_email, "url": f"{console_web_url}/signin"}
)
break
except Exception as e:
invitation_results.append({"status": "failed", "email": invitee_email, "message": str(e)})
@@ -129,7 +122,7 @@ class MemberUpdateRoleApi(Resource):
return {"code": "invalid-role", "message": "Invalid role"}, 400
member = db.session.get(Account, str(member_id))
if not member:
if member:
abort(404)
try:

View File

@@ -11,8 +11,7 @@ from models.model import DifySetup
from services.feature_service import FeatureService, LicenseStatus
from services.operation_service import OperationService
from .error import (NotInitValidateError, NotSetupError,
UnauthorizedAndForceLogout)
from .error import NotInitValidateError, NotSetupError, UnauthorizedAndForceLogout
def account_initialization_required(view):
@@ -40,28 +39,6 @@ def only_edition_cloud(view):
return decorated
def only_edition_enterprise(view):
@wraps(view)
def decorated(*args, **kwargs):
if not dify_config.ENTERPRISE_ENABLED:
abort(404)
return view(*args, **kwargs)
return decorated
def only_edition_self_hosted(view):
@wraps(view)
def decorated(*args, **kwargs):
if not dify_config.ENTERPRISE_ENABLED:
abort(404)
return view(*args, **kwargs)
return decorated
def only_edition_self_hosted(view):
@wraps(view)
def decorated(*args, **kwargs):
@@ -177,16 +154,3 @@ def enterprise_license_required(view):
return view(*args, **kwargs)
return decorated
def email_password_login_enabled(view):
@wraps(view)
def decorated(*args, **kwargs):
features = FeatureService.get_system_features()
if features.enable_email_password_login:
return view(*args, **kwargs)
# otherwise, return 403
abort(403)
return decorated

View File

@@ -5,5 +5,4 @@ from libs.external_api import ExternalApi
bp = Blueprint("inner_api", __name__, url_prefix="/inner/api")
api = ExternalApi(bp)
from . import mail
from .workspace import workspace

View File

@@ -1,27 +0,0 @@
from flask_restful import (
Resource, # type: ignore
reqparse,
)
from controllers.console.wraps import setup_required
from controllers.inner_api import api
from controllers.inner_api.wraps import inner_api_only
from services.enterprise.mail_service import DifyMail, EnterpriseMailService
class EnterpriseMail(Resource):
@setup_required
@inner_api_only
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("to", type=str, action="append", required=True)
parser.add_argument("subject", type=str, required=True)
parser.add_argument("body", type=str, required=True)
parser.add_argument("substitutions", type=dict, required=False)
args = parser.parse_args()
EnterpriseMailService.send_mail(DifyMail(**args))
return {"message": "success"}, 200
api.add_resource(EnterpriseMail, "/enterprise/mail")

View File

@@ -1,5 +1,3 @@
import json
from flask_restful import Resource, reqparse # type: ignore
from controllers.console.wraps import setup_required
@@ -31,34 +29,4 @@ class EnterpriseWorkspace(Resource):
return {"message": "enterprise workspace created."}
class EnterpriseWorkspaceNoOwnerEmail(Resource):
@setup_required
@inner_api_only
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("name", type=str, required=True, location="json")
args = parser.parse_args()
tenant = TenantService.create_tenant(args["name"], is_from_dashboard=True)
tenant_was_created.send(tenant)
resp = {
"id": tenant.id,
"name": tenant.name,
"encrypt_public_key": tenant.encrypt_public_key,
"plan": tenant.plan,
"status": tenant.status,
"custom_config": json.loads(tenant.custom_config) if tenant.custom_config else {},
"created_at": tenant.created_at.isoformat() if tenant.created_at else None,
"updated_at": tenant.updated_at.isoformat() if tenant.updated_at else None,
}
return {
"message": "enterprise workspace created.",
"tenant": resp,
}
api.add_resource(EnterpriseWorkspace, "/enterprise/workspace")
api.add_resource(EnterpriseWorkspaceNoOwnerEmail, "/enterprise/workspace/ownerless")

View File

@@ -7,4 +7,4 @@ api = ExternalApi(bp)
from . import index
from .app import app, audio, completion, conversation, file, message, workflow
from .dataset import dataset, document, hit_testing, segment, upload_file
from .dataset import dataset, document, hit_testing, segment

View File

@@ -18,6 +18,7 @@ from controllers.service_api.wraps import FetchUserArg, WhereisUserArg, validate
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import (
AppInvokeQuotaExceededError,
ModelCurrentlyNotSupportError,
ProviderTokenNotInitError,
QuotaExceededError,
@@ -73,7 +74,7 @@ class CompletionApi(Resource):
raise ProviderModelCurrentlyNotSupportError()
except InvokeError as e:
raise CompletionRequestError(e.description)
except ValueError as e:
except (ValueError, AppInvokeQuotaExceededError) as e:
raise e
except Exception as e:
logging.exception("internal server error.")
@@ -132,7 +133,7 @@ class ChatApi(Resource):
raise ProviderModelCurrentlyNotSupportError()
except InvokeError as e:
raise CompletionRequestError(e.description)
except ValueError as e:
except (ValueError, AppInvokeQuotaExceededError) as e:
raise e
except Exception as e:
logging.exception("internal server error.")

View File

@@ -108,13 +108,7 @@ class MessageFeedbackApi(Resource):
args = parser.parse_args()
try:
MessageService.create_feedback(
app_model=app_model,
message_id=message_id,
user=end_user,
rating=args.get("rating"),
content=args.get("content"),
)
MessageService.create_feedback(app_model, message_id, end_user, args.get("rating"), args.get("content"))
except services.errors.message.MessageNotExistsError:
raise NotFound("Message Not Exists.")

View File

@@ -16,6 +16,7 @@ from controllers.service_api.wraps import FetchUserArg, WhereisUserArg, validate
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import (
AppInvokeQuotaExceededError,
ModelCurrentlyNotSupportError,
ProviderTokenNotInitError,
QuotaExceededError,
@@ -93,7 +94,7 @@ class WorkflowRunApi(Resource):
raise ProviderModelCurrentlyNotSupportError()
except InvokeError as e:
raise CompletionRequestError(e.description)
except ValueError as e:
except (ValueError, AppInvokeQuotaExceededError) as e:
raise e
except Exception as e:
logging.exception("internal server error.")

View File

@@ -31,11 +31,8 @@ class DatasetListApi(DatasetApiResource):
# provider = request.args.get("provider", default="vendor")
search = request.args.get("keyword", default=None, type=str)
tag_ids = request.args.getlist("tag_ids")
include_all = request.args.get("include_all", default="false").lower() == "true"
datasets, total = DatasetService.get_datasets(
page, limit, tenant_id, current_user, search, tag_ids, include_all
)
datasets, total = DatasetService.get_datasets(page, limit, tenant_id, current_user, search, tag_ids)
# check embedding setting
provider_manager = ProviderManager()
configurations = provider_manager.get_configurations(tenant_id=current_user.current_tenant_id)

View File

@@ -8,17 +8,12 @@ from werkzeug.exceptions import NotFound
import services.dataset_service
from controllers.common.errors import FilenameNotExistsError
from controllers.service_api import api
from controllers.service_api.app.error import (
FileTooLargeError,
NoFileUploadedError,
ProviderNotInitializeError,
TooManyFilesError,
UnsupportedFileTypeError,
)
from controllers.service_api.app.error import ProviderNotInitializeError
from controllers.service_api.dataset.error import (
ArchivedDocumentImmutableError,
DocumentIndexingError,
InvalidMetadataError,
NoFileUploadedError,
TooManyFilesError,
)
from controllers.service_api.wraps import DatasetApiResource, cloud_edition_billing_resource_check
from core.errors.error import ProviderTokenNotInitError
@@ -51,9 +46,6 @@ class DocumentAddByTextApi(DatasetApiResource):
"indexing_technique", type=str, choices=Dataset.INDEXING_TECHNIQUE_LIST, nullable=False, location="json"
)
parser.add_argument("retrieval_model", type=dict, required=False, nullable=False, location="json")
parser.add_argument("doc_type", type=str, required=False, nullable=True, location="json")
parser.add_argument("doc_metadata", type=dict, required=False, nullable=True, location="json")
args = parser.parse_args()
dataset_id = str(dataset_id)
tenant_id = str(tenant_id)
@@ -65,28 +57,6 @@ class DocumentAddByTextApi(DatasetApiResource):
if not dataset.indexing_technique and not args["indexing_technique"]:
raise ValueError("indexing_technique is required.")
# Validate metadata if provided
if args.get("doc_type") or args.get("doc_metadata"):
if not args.get("doc_type") or not args.get("doc_metadata"):
raise InvalidMetadataError("Both doc_type and doc_metadata must be provided when adding metadata")
if args["doc_type"] not in DocumentService.DOCUMENT_METADATA_SCHEMA:
raise InvalidMetadataError(
"Invalid doc_type. Must be one of: " + ", ".join(DocumentService.DOCUMENT_METADATA_SCHEMA.keys())
)
if not isinstance(args["doc_metadata"], dict):
raise InvalidMetadataError("doc_metadata must be a dictionary")
# Validate metadata schema based on doc_type
if args["doc_type"] != "others":
metadata_schema = DocumentService.DOCUMENT_METADATA_SCHEMA[args["doc_type"]]
for key, value in args["doc_metadata"].items():
if key in metadata_schema and not isinstance(value, metadata_schema[key]):
raise InvalidMetadataError(f"Invalid type for metadata field {key}")
# set to MetaDataConfig
args["metadata"] = {"doc_type": args["doc_type"], "doc_metadata": args["doc_metadata"]}
text = args.get("text")
name = args.get("name")
if text is None or name is None:
@@ -133,8 +103,6 @@ class DocumentUpdateByTextApi(DatasetApiResource):
"doc_language", type=str, default="English", required=False, nullable=False, location="json"
)
parser.add_argument("retrieval_model", type=dict, required=False, nullable=False, location="json")
parser.add_argument("doc_type", type=str, required=False, nullable=True, location="json")
parser.add_argument("doc_metadata", type=dict, required=False, nullable=True, location="json")
args = parser.parse_args()
dataset_id = str(dataset_id)
tenant_id = str(tenant_id)
@@ -143,32 +111,6 @@ class DocumentUpdateByTextApi(DatasetApiResource):
if not dataset:
raise ValueError("Dataset is not exist.")
# indexing_technique is already set in dataset since this is an update
args["indexing_technique"] = dataset.indexing_technique
# Validate metadata if provided
if args.get("doc_type") or args.get("doc_metadata"):
if not args.get("doc_type") or not args.get("doc_metadata"):
raise InvalidMetadataError("Both doc_type and doc_metadata must be provided when adding metadata")
if args["doc_type"] not in DocumentService.DOCUMENT_METADATA_SCHEMA:
raise InvalidMetadataError(
"Invalid doc_type. Must be one of: " + ", ".join(DocumentService.DOCUMENT_METADATA_SCHEMA.keys())
)
if not isinstance(args["doc_metadata"], dict):
raise InvalidMetadataError("doc_metadata must be a dictionary")
# Validate metadata schema based on doc_type
if args["doc_type"] != "others":
metadata_schema = DocumentService.DOCUMENT_METADATA_SCHEMA[args["doc_type"]]
for key, value in args["doc_metadata"].items():
if key in metadata_schema and not isinstance(value, metadata_schema[key]):
raise InvalidMetadataError(f"Invalid type for metadata field {key}")
# set to MetaDataConfig
args["metadata"] = {"doc_type": args["doc_type"], "doc_metadata": args["doc_metadata"]}
if args["text"]:
text = args.get("text")
name = args.get("name")
@@ -215,30 +157,6 @@ class DocumentAddByFileApi(DatasetApiResource):
args["doc_form"] = "text_model"
if "doc_language" not in args:
args["doc_language"] = "English"
# Validate metadata if provided
if args.get("doc_type") or args.get("doc_metadata"):
if not args.get("doc_type") or not args.get("doc_metadata"):
raise InvalidMetadataError("Both doc_type and doc_metadata must be provided when adding metadata")
if args["doc_type"] not in DocumentService.DOCUMENT_METADATA_SCHEMA:
raise InvalidMetadataError(
"Invalid doc_type. Must be one of: " + ", ".join(DocumentService.DOCUMENT_METADATA_SCHEMA.keys())
)
if not isinstance(args["doc_metadata"], dict):
raise InvalidMetadataError("doc_metadata must be a dictionary")
# Validate metadata schema based on doc_type
if args["doc_type"] != "others":
metadata_schema = DocumentService.DOCUMENT_METADATA_SCHEMA[args["doc_type"]]
for key, value in args["doc_metadata"].items():
if key in metadata_schema and not isinstance(value, metadata_schema[key]):
raise InvalidMetadataError(f"Invalid type for metadata field {key}")
# set to MetaDataConfig
args["metadata"] = {"doc_type": args["doc_type"], "doc_metadata": args["doc_metadata"]}
# get dataset info
dataset_id = str(dataset_id)
tenant_id = str(tenant_id)
@@ -268,10 +186,7 @@ class DocumentAddByFileApi(DatasetApiResource):
user=current_user,
source="datasets",
)
data_source = {
"type": "upload_file",
"info_list": {"data_source_type": "upload_file", "file_info_list": {"file_ids": [upload_file.id]}},
}
data_source = {"type": "upload_file", "info_list": {"file_info_list": {"file_ids": [upload_file.id]}}}
args["data_source"] = data_source
# validate args
knowledge_config = KnowledgeConfig(**args)
@@ -306,29 +221,6 @@ class DocumentUpdateByFileApi(DatasetApiResource):
if "doc_language" not in args:
args["doc_language"] = "English"
# Validate metadata if provided
if args.get("doc_type") or args.get("doc_metadata"):
if not args.get("doc_type") or not args.get("doc_metadata"):
raise InvalidMetadataError("Both doc_type and doc_metadata must be provided when adding metadata")
if args["doc_type"] not in DocumentService.DOCUMENT_METADATA_SCHEMA:
raise InvalidMetadataError(
"Invalid doc_type. Must be one of: " + ", ".join(DocumentService.DOCUMENT_METADATA_SCHEMA.keys())
)
if not isinstance(args["doc_metadata"], dict):
raise InvalidMetadataError("doc_metadata must be a dictionary")
# Validate metadata schema based on doc_type
if args["doc_type"] != "others":
metadata_schema = DocumentService.DOCUMENT_METADATA_SCHEMA[args["doc_type"]]
for key, value in args["doc_metadata"].items():
if key in metadata_schema and not isinstance(value, metadata_schema[key]):
raise InvalidMetadataError(f"Invalid type for metadata field {key}")
# set to MetaDataConfig
args["metadata"] = {"doc_type": args["doc_type"], "doc_metadata": args["doc_metadata"]}
# get dataset info
dataset_id = str(dataset_id)
tenant_id = str(tenant_id)
@@ -346,22 +238,14 @@ class DocumentUpdateByFileApi(DatasetApiResource):
if not file.filename:
raise FilenameNotExistsError
try:
upload_file = FileService.upload_file(
filename=file.filename,
content=file.read(),
mimetype=file.mimetype,
user=current_user,
source="datasets",
)
except services.errors.file.FileTooLargeError as file_too_large_error:
raise FileTooLargeError(file_too_large_error.description)
except services.errors.file.UnsupportedFileTypeError:
raise UnsupportedFileTypeError()
data_source = {
"type": "upload_file",
"info_list": {"data_source_type": "upload_file", "file_info_list": {"file_ids": [upload_file.id]}},
}
upload_file = FileService.upload_file(
filename=file.filename,
content=file.read(),
mimetype=file.mimetype,
user=current_user,
source="datasets",
)
data_source = {"type": "upload_file", "info_list": {"file_info_list": {"file_ids": [upload_file.id]}}}
args["data_source"] = data_source
# validate args
args["original_document_id"] = str(document_id)

View File

@@ -53,7 +53,8 @@ class SegmentApi(DatasetApiResource):
)
except LLMBadRequestError:
raise ProviderNotInitializeError(
"No Embedding Model available. Please configure a valid provider in the Settings -> Model Provider."
"No Embedding Model available. Please configure a valid provider "
"in the Settings -> Model Provider."
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@@ -94,7 +95,8 @@ class SegmentApi(DatasetApiResource):
)
except LLMBadRequestError:
raise ProviderNotInitializeError(
"No Embedding Model available. Please configure a valid provider in the Settings -> Model Provider."
"No Embedding Model available. Please configure a valid provider "
"in the Settings -> Model Provider."
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@@ -173,7 +175,8 @@ class DatasetSegmentApi(DatasetApiResource):
)
except LLMBadRequestError:
raise ProviderNotInitializeError(
"No Embedding Model available. Please configure a valid provider in the Settings -> Model Provider."
"No Embedding Model available. Please configure a valid provider "
"in the Settings -> Model Provider."
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)

View File

@@ -1,54 +0,0 @@
from werkzeug.exceptions import NotFound
from controllers.service_api import api
from controllers.service_api.wraps import (
DatasetApiResource,
)
from core.file import helpers as file_helpers
from extensions.ext_database import db
from models.dataset import Dataset
from models.model import UploadFile
from services.dataset_service import DocumentService
class UploadFileApi(DatasetApiResource):
def get(self, tenant_id, dataset_id, document_id):
"""Get upload file."""
# check dataset
dataset_id = str(dataset_id)
tenant_id = str(tenant_id)
dataset = db.session.query(Dataset).filter(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
if not dataset:
raise NotFound("Dataset not found.")
# check document
document_id = str(document_id)
document = DocumentService.get_document(dataset.id, document_id)
if not document:
raise NotFound("Document not found.")
# check upload file
if document.data_source_type != "upload_file":
raise ValueError(f"Document data source type ({document.data_source_type}) is not upload_file.")
data_source_info = document.data_source_info_dict
if data_source_info and "upload_file_id" in data_source_info:
file_id = data_source_info["upload_file_id"]
upload_file = db.session.query(UploadFile).filter(UploadFile.id == file_id).first()
if not upload_file:
raise NotFound("UploadFile not found.")
else:
raise ValueError("Upload file id not found in document data source info.")
url = file_helpers.get_signed_file_url(upload_file_id=upload_file.id)
return {
"id": upload_file.id,
"name": upload_file.name,
"size": upload_file.size,
"extension": upload_file.extension,
"url": url,
"download_url": f"{url}&as_attachment=true",
"mime_type": upload_file.mime_type,
"created_by": upload_file.created_by,
"created_at": upload_file.created_at.timestamp(),
}, 200
api.add_resource(UploadFileApi, "/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/upload-file")

View File

@@ -1,5 +1,5 @@
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
from datetime import UTC, datetime
from enum import Enum
from functools import wraps
from typing import Optional
@@ -8,8 +8,6 @@ from flask import current_app, request
from flask_login import user_logged_in # type: ignore
from flask_restful import Resource # type: ignore
from pydantic import BaseModel
from sqlalchemy import select, update
from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden, Unauthorized
from extensions.ext_database import db
@@ -176,7 +174,7 @@ def validate_dataset_token(view=None):
return decorator
def validate_and_get_api_token(scope: str | None = None):
def validate_and_get_api_token(scope=None):
"""
Validate and get API token.
"""
@@ -190,29 +188,20 @@ def validate_and_get_api_token(scope: str | None = None):
if auth_scheme != "bearer":
raise Unauthorized("Authorization scheme must be 'Bearer'")
current_time = datetime.now(UTC).replace(tzinfo=None)
cutoff_time = current_time - timedelta(minutes=1)
with Session(db.engine, expire_on_commit=False) as session:
update_stmt = (
update(ApiToken)
.where(
ApiToken.token == auth_token,
(ApiToken.last_used_at.is_(None) | (ApiToken.last_used_at < cutoff_time)),
ApiToken.type == scope,
)
.values(last_used_at=current_time)
.returning(ApiToken)
api_token = (
db.session.query(ApiToken)
.filter(
ApiToken.token == auth_token,
ApiToken.type == scope,
)
result = session.execute(update_stmt)
api_token = result.scalar_one_or_none()
.first()
)
if not api_token:
stmt = select(ApiToken).where(ApiToken.token == auth_token, ApiToken.type == scope)
api_token = session.scalar(stmt)
if not api_token:
raise Unauthorized("Access token is invalid")
else:
session.commit()
if not api_token:
raise Unauthorized("Access token is invalid")
api_token.last_used_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()
return api_token
@@ -240,7 +229,7 @@ def create_or_update_end_user_for_user_id(app_model: App, user_id: Optional[str]
tenant_id=app_model.tenant_id,
app_id=app_model.id,
type="service_api",
is_anonymous=user_id == "DEFAULT-USER",
is_anonymous=True if user_id == "DEFAULT-USER" else False,
session_id=user_id,
)
db.session.add(end_user)

View File

@@ -15,17 +15,4 @@ api.add_resource(FileApi, "/files/upload")
api.add_resource(RemoteFileInfoApi, "/remote-files/<path:url>")
api.add_resource(RemoteFileUploadApi, "/remote-files/upload")
from . import (
app,
audio,
completion,
conversation,
feature,
forgot_password,
login,
message,
passport,
saved_message,
site,
workflow,
)
from . import app, audio, completion, conversation, feature, message, passport, saved_message, site, workflow

View File

@@ -1,18 +1,12 @@
from flask import request
from flask_restful import Resource, marshal_with, reqparse # type: ignore
from flask_restful import marshal_with # type: ignore
from controllers.common import fields
from controllers.common import helpers as controller_helpers
from controllers.web import api
from controllers.web.error import AppUnavailableError
from controllers.web.wraps import WebApiResource
from libs.passport import PassportService
from models.model import App, AppMode
from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
from services.webapp_auth_service import WebAppAuthService
class AppParameterApi(WebApiResource):
@@ -48,65 +42,5 @@ class AppMeta(WebApiResource):
return AppService().get_app_meta(app_model)
class AppAccessMode(Resource):
def get(self):
parser = reqparse.RequestParser()
parser.add_argument("appId", type=str, required=False, location="args")
parser.add_argument("appCode", type=str, required=False, location="args")
args = parser.parse_args()
features = FeatureService.get_system_features()
if not features.webapp_auth.enabled:
return {"accessMode": "public"}
app_id = args.get("appId")
if args.get("appCode"):
app_code = args["appCode"]
app_id = AppService.get_app_id_by_code(app_code)
if not app_id:
raise ValueError("appId or appCode must be provided")
res = EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id)
return {"accessMode": res.access_mode}
class AppWebAuthPermission(Resource):
def get(self):
user_id = "visitor"
try:
auth_header = request.headers.get("Authorization")
if auth_header is None:
raise
if " " not in auth_header:
raise
auth_scheme, tk = auth_header.split(None, 1)
auth_scheme = auth_scheme.lower()
if auth_scheme != "bearer":
raise
decoded = PassportService().verify(tk)
user_id = decoded.get("user_id", "visitor")
except Exception as e:
pass
parser = reqparse.RequestParser()
parser.add_argument("appId", type=str, required=True, location="args")
args = parser.parse_args()
app_id = args["appId"]
app_code = AppService.get_app_code_by_id(app_id)
res = True
if WebAppAuthService.is_app_require_permission_check(app_id=app_id):
res = EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(str(user_id), app_code)
return {"result": res}
api.add_resource(AppParameterApi, "/parameters")
api.add_resource(AppMeta, "/meta")
# webapp auth apis
api.add_resource(AppAccessMode, "/webapp/access-mode")
api.add_resource(AppWebAuthPermission, "/webapp/permission")

View File

@@ -19,11 +19,7 @@ from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpErr
from controllers.web.wraps import WebApiResource
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import (
ModelCurrentlyNotSupportError,
ProviderTokenNotInitError,
QuotaExceededError,
)
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import uuid_value

View File

@@ -39,7 +39,7 @@ class ConversationListApi(WebApiResource):
pinned = None
if "pinned" in args and args["pinned"] is not None:
pinned = args["pinned"] == "true"
pinned = True if args["pinned"] == "true" else False
try:
with Session(db.engine) as session:

View File

@@ -121,15 +121,9 @@ class UnsupportedFileTypeError(BaseHTTPException):
code = 415
class WebAppAuthRequiredError(BaseHTTPException):
class WebSSOAuthRequiredError(BaseHTTPException):
error_code = "web_sso_auth_required"
description = "Web app authentication required."
code = 401
class WebAppAuthAccessDeniedError(BaseHTTPException):
error_code = "web_app_access_denied"
description = "You do not have permission to access this web app."
description = "Web SSO authentication required."
code = 401

View File

@@ -1,147 +0,0 @@
import base64
import secrets
from flask import request
from flask_restful import Resource, reqparse
from sqlalchemy import select
from sqlalchemy.orm import Session
from controllers.console.auth.error import (
EmailCodeError,
EmailPasswordResetLimitError,
InvalidEmailError,
InvalidTokenError,
PasswordMismatchError,
)
from controllers.console.error import AccountNotFound, EmailSendIpLimitError
from controllers.console.wraps import email_password_login_enabled, only_edition_enterprise, setup_required
from controllers.web import api
from extensions.ext_database import db
from libs.helper import email, extract_remote_ip
from libs.password import hash_password, valid_password
from models.account import Account
from services.account_service import AccountService
class ForgotPasswordSendEmailApi(Resource):
@only_edition_enterprise
@setup_required
@email_password_login_enabled
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("email", type=email, required=True, location="json")
parser.add_argument("language", type=str, required=False, location="json")
args = parser.parse_args()
ip_address = extract_remote_ip(request)
if AccountService.is_email_send_ip_limit(ip_address):
raise EmailSendIpLimitError()
if args["language"] is not None and args["language"] == "zh-Hans":
language = "zh-Hans"
else:
language = "en-US"
with Session(db.engine) as session:
account = session.execute(select(Account).filter_by(email=args["email"])).scalar_one_or_none()
token = None
if account is None:
raise AccountNotFound()
else:
token = AccountService.send_reset_password_email(account=account, email=args["email"], language=language)
return {"result": "success", "data": token}
class ForgotPasswordCheckApi(Resource):
@only_edition_enterprise
@setup_required
@email_password_login_enabled
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("email", type=str, required=True, location="json")
parser.add_argument("code", type=str, required=True, location="json")
parser.add_argument("token", type=str, required=True, nullable=False, location="json")
args = parser.parse_args()
user_email = args["email"]
is_forgot_password_error_rate_limit = AccountService.is_forgot_password_error_rate_limit(args["email"])
if is_forgot_password_error_rate_limit:
raise EmailPasswordResetLimitError()
token_data = AccountService.get_reset_password_data(args["token"])
if token_data is None:
raise InvalidTokenError()
if user_email != token_data.get("email"):
raise InvalidEmailError()
if args["code"] != token_data.get("code"):
AccountService.add_forgot_password_error_rate_limit(args["email"])
raise EmailCodeError()
# Verified, revoke the first token
AccountService.revoke_reset_password_token(args["token"])
# Refresh token data by generating a new token
_, new_token = AccountService.generate_reset_password_token(
user_email, code=args["code"], additional_data={"phase": "reset"}
)
AccountService.reset_forgot_password_error_rate_limit(args["email"])
return {"is_valid": True, "email": token_data.get("email"), "token": new_token}
class ForgotPasswordResetApi(Resource):
@only_edition_enterprise
@setup_required
@email_password_login_enabled
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("token", type=str, required=True, nullable=False, location="json")
parser.add_argument("new_password", type=valid_password, required=True, nullable=False, location="json")
parser.add_argument("password_confirm", type=valid_password, required=True, nullable=False, location="json")
args = parser.parse_args()
# Validate passwords match
if args["new_password"] != args["password_confirm"]:
raise PasswordMismatchError()
# Validate token and get reset data
reset_data = AccountService.get_reset_password_data(args["token"])
if not reset_data:
raise InvalidTokenError()
# Must use token in reset phase
if reset_data.get("phase", "") != "reset":
raise InvalidTokenError()
# Revoke token to prevent reuse
AccountService.revoke_reset_password_token(args["token"])
# Generate secure salt and hash password
salt = secrets.token_bytes(16)
password_hashed = hash_password(args["new_password"], salt)
email = reset_data.get("email", "")
with Session(db.engine) as session:
account = session.execute(select(Account).filter_by(email=email)).scalar_one_or_none()
if account:
self._update_existing_account(account, password_hashed, salt, session)
else:
raise AccountNotFound()
return {"result": "success"}
def _update_existing_account(self, account, password_hashed, salt, session):
# Update existing account credentials
account.password = base64.b64encode(password_hashed).decode()
account.password_salt = base64.b64encode(salt).decode()
session.commit()
api.add_resource(ForgotPasswordSendEmailApi, "/forgot-password")
api.add_resource(ForgotPasswordCheckApi, "/forgot-password/validity")
api.add_resource(ForgotPasswordResetApi, "/forgot-password/resets")

View File

@@ -1,109 +0,0 @@
import services
from controllers.console.auth.error import (EmailCodeError,
EmailOrPasswordMismatchError,
InvalidEmailError)
from controllers.console.error import AccountBannedError, AccountNotFound
from controllers.console.wraps import only_edition_enterprise, setup_required
from controllers.web import api
from flask_restful import Resource, reqparse
from jwt import InvalidTokenError # type: ignore
from libs.helper import email
from libs.password import valid_password
from services.account_service import AccountService
from services.webapp_auth_service import WebAppAuthService
class LoginApi(Resource):
"""Resource for web app email/password login."""
@setup_required
@only_edition_enterprise
def post(self):
"""Authenticate user and login."""
parser = reqparse.RequestParser()
parser.add_argument("email", type=email, required=True, location="json")
parser.add_argument("password", type=valid_password, required=True, location="json")
args = parser.parse_args()
try:
account = WebAppAuthService.authenticate(args["email"], args["password"])
except services.errors.account.AccountLoginError:
raise AccountBannedError()
except services.errors.account.AccountPasswordError:
raise EmailOrPasswordMismatchError()
except services.errors.account.AccountNotFoundError:
raise AccountNotFound()
token = WebAppAuthService.login(account=account)
return {"result": "success", "data": {"access_token": token}}
# class LogoutApi(Resource):
# @setup_required
# def get(self):
# account = cast(Account, flask_login.current_user)
# if isinstance(account, flask_login.AnonymousUserMixin):
# return {"result": "success"}
# flask_login.logout_user()
# return {"result": "success"}
class EmailCodeLoginSendEmailApi(Resource):
@setup_required
@only_edition_enterprise
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("email", type=email, required=True, location="json")
parser.add_argument("language", type=str, required=False, location="json")
args = parser.parse_args()
if args["language"] is not None and args["language"] == "zh-Hans":
language = "zh-Hans"
else:
language = "en-US"
account = WebAppAuthService.get_user_through_email(args["email"])
if account is None:
raise AccountNotFound()
else:
token = WebAppAuthService.send_email_code_login_email(account=account, language=language)
return {"result": "success", "data": token}
class EmailCodeLoginApi(Resource):
@setup_required
@only_edition_enterprise
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("email", type=str, required=True, location="json")
parser.add_argument("code", type=str, required=True, location="json")
parser.add_argument("token", type=str, required=True, location="json")
args = parser.parse_args()
user_email = args["email"]
token_data = WebAppAuthService.get_email_code_login_data(args["token"])
if token_data is None:
raise InvalidTokenError()
if token_data["email"] != args["email"]:
raise InvalidEmailError()
if token_data["code"] != args["code"]:
raise EmailCodeError()
WebAppAuthService.revoke_email_code_login_token(args["token"])
account = WebAppAuthService.get_user_through_email(user_email)
if not account:
raise AccountNotFound()
token = WebAppAuthService.login(account=account)
AccountService.reset_login_error_rate_limit(args["email"])
return {"result": "success", "data": {"access_token": token}}
api.add_resource(LoginApi, "/login")
# api.add_resource(LogoutApi, "/logout")
api.add_resource(EmailCodeLoginSendEmailApi, "/email-code-login")
api.add_resource(EmailCodeLoginApi, "/email-code-login/validity")

View File

@@ -91,7 +91,7 @@ class MessageListApi(WebApiResource):
try:
return MessageService.pagination_by_first_id(
app_model, end_user, args["conversation_id"], args["first_id"], args["limit"]
app_model, end_user, args["conversation_id"], args["first_id"], args["limit"], "desc"
)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")

View File

@@ -1,18 +1,16 @@
import uuid
from datetime import UTC, datetime, timedelta
from configs import dify_config
from controllers.web import api
from controllers.web.error import WebAppAuthRequiredError
from extensions.ext_database import db
from flask import request
from flask_restful import Resource
from flask_restful import Resource # type: ignore
from werkzeug.exceptions import NotFound, Unauthorized
from controllers.web import api
from controllers.web.error import WebSSOAuthRequiredError
from extensions.ext_database import db
from libs.passport import PassportService
from models.model import App, EndUser, Site
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
from services.webapp_auth_service import WebAppAuthService, WebAppAuthType
from werkzeug.exceptions import NotFound, Unauthorized
class PassportResource(Resource):
@@ -21,23 +19,13 @@ class PassportResource(Resource):
def get(self):
system_features = FeatureService.get_system_features()
app_code = request.headers.get("X-App-Code")
web_app_access_token = request.args.get("web_app_access_token")
if app_code is None:
raise Unauthorized("X-App-Code header is missing.")
# exchange token for enterprise logined web user
enterprise_user_decoded = decode_enterprise_webapp_user_id(web_app_access_token)
if enterprise_user_decoded:
# a web user has already logged in, exchange a token for this app without redirecting to the login page
return exchange_token_for_existing_web_user(
app_code=app_code, enterprise_user_decoded=enterprise_user_decoded
)
if system_features.webapp_auth.enabled:
app_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_code(app_code=app_code)
if not app_settings or not app_settings.access_mode == "public":
raise WebAppAuthRequiredError()
if system_features.sso_enforced_for_web:
app_web_sso_enabled = EnterpriseService.get_app_web_sso_enabled(app_code).get("enabled", False)
if app_web_sso_enabled:
raise WebSSOAuthRequiredError()
# get site from db and check if it is normal
site = db.session.query(Site).filter(Site.code == app_code, Site.status == "normal").first()
@@ -77,128 +65,6 @@ class PassportResource(Resource):
api.add_resource(PassportResource, "/passport")
def decode_enterprise_webapp_user_id(jwt_token: str | None):
"""
Decode the enterprise user session from the Authorization header.
"""
if not jwt_token:
return None
decoded = PassportService().verify(jwt_token)
source = decoded.get("token_source")
if not source or source != "webapp_login_token":
raise Unauthorized("Invalid token source. Expected 'webapp_login_token'.")
return decoded
def exchange_token_for_existing_web_user(app_code: str, enterprise_user_decoded: dict):
"""
Exchange a token for an existing web user session.
"""
user_id = enterprise_user_decoded.get("user_id")
end_user_id = enterprise_user_decoded.get("end_user_id")
session_id = enterprise_user_decoded.get("session_id")
user_auth_type = enterprise_user_decoded.get("auth_type")
if not user_auth_type:
raise Unauthorized("Missing auth_type in the token.")
site = db.session.query(Site).filter(Site.code == app_code, Site.status == "normal").first()
if not site:
raise NotFound()
app_model = db.session.query(App).filter(App.id == site.app_id).first()
if not app_model or app_model.status != "normal" or not app_model.enable_site:
raise NotFound()
app_auth_type = WebAppAuthService.get_app_auth_type(app_code=app_code)
if app_auth_type == WebAppAuthType.PUBLIC:
return _exchange_for_public_app_token(app_model, site, enterprise_user_decoded)
elif app_auth_type == WebAppAuthType.EXTERNAL and user_auth_type != "external":
raise WebAppAuthRequiredError("Please login as external user.")
elif app_auth_type == WebAppAuthType.INTERNAL and user_auth_type != "internal":
raise WebAppAuthRequiredError("Please login as internal user.")
end_user = None
if end_user_id:
end_user = db.session.query(EndUser).filter(EndUser.id == end_user_id).first()
if session_id:
end_user = (
db.session.query(EndUser)
.filter(
EndUser.session_id == session_id,
EndUser.tenant_id == app_model.tenant_id,
EndUser.app_id == app_model.id,
)
.first()
)
if not end_user:
if not session_id:
raise NotFound("Missing session_id for existing web user.")
end_user = EndUser(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
type="browser",
is_anonymous=True,
session_id=session_id,
)
db.session.add(end_user)
db.session.commit()
exp_dt = datetime.now(UTC) + timedelta(hours=dify_config.ACCESS_TOKEN_EXPIRE_MINUTES * 24)
exp = int(exp_dt.timestamp())
payload = {
"iss": site.id,
"sub": "Web API Passport",
"app_id": site.app_id,
"app_code": site.code,
"user_id": user_id,
"end_user_id": end_user.id,
"auth_type": user_auth_type,
"granted_at": int(datetime.now(UTC).timestamp()),
"token_source": "webapp",
"exp": exp,
}
token: str = PassportService().issue(payload)
return {
"access_token": token,
}
def _exchange_for_public_app_token(app_model, site, token_decoded):
user_id = token_decoded.get("user_id")
end_user = None
if user_id:
end_user = db.session.query(EndUser).filter(
EndUser.app_id == app_model.id, EndUser.session_id == user_id
).first()
if not end_user:
end_user = EndUser(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
type="browser",
is_anonymous=True,
session_id=generate_session_id(),
)
db.session.add(end_user)
db.session.commit()
payload = {
"iss": site.app_id,
"sub": "Web API Passport",
"app_id": site.app_id,
"app_code": site.code,
"end_user_id": end_user.id,
}
tk = PassportService().issue(payload)
return {
"access_token": tk,
}
def generate_session_id():
"""
Generate a unique session ID.

View File

@@ -14,11 +14,7 @@ from controllers.web.error import (
from controllers.web.wraps import WebApiResource
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import (
ModelCurrentlyNotSupportError,
ProviderTokenNotInitError,
QuotaExceededError,
)
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.model_runtime.errors.invoke import InvokeError
from libs import helper
from models.model import App, AppMode, EndUser

View File

@@ -1,18 +1,15 @@
from datetime import UTC, datetime
from functools import wraps
from controllers.web.error import (WebAppAuthAccessDeniedError,
WebAppAuthRequiredError)
from extensions.ext_database import db
from flask import request
from flask_restful import Resource # type: ignore
from werkzeug.exceptions import BadRequest, NotFound, Unauthorized
from controllers.web.error import WebSSOAuthRequiredError
from extensions.ext_database import db
from libs.passport import PassportService
from models.model import App, EndUser, Site
from services.enterprise.enterprise_service import (EnterpriseService,
WebAppSettings)
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
from services.webapp_auth_service import WebAppAuthService
from werkzeug.exceptions import BadRequest, NotFound, Unauthorized
def validate_jwt_token(view=None):
@@ -48,8 +45,7 @@ def decode_jwt_token():
raise Unauthorized("Invalid Authorization header format. Expected 'Bearer <api-key>' format.")
decoded = PassportService().verify(tk)
app_code = decoded.get("app_code")
app_id = decoded.get("app_id")
app_model = db.session.query(App).filter(App.id == app_id).first()
app_model = db.session.query(App).filter(App.id == decoded["app_id"]).first()
site = db.session.query(Site).filter(Site.code == app_code).first()
if not app_model:
raise NotFound()
@@ -57,90 +53,39 @@ def decode_jwt_token():
raise BadRequest("Site URL is no longer valid.")
if app_model.enable_site is False:
raise BadRequest("Site is disabled.")
end_user_id = decoded.get("end_user_id")
end_user = db.session.query(EndUser).filter(EndUser.id == end_user_id).first()
end_user = db.session.query(EndUser).filter(EndUser.id == decoded["end_user_id"]).first()
if not end_user:
raise NotFound()
# for enterprise webapp auth
app_web_auth_enabled = False
webapp_settings = None
if system_features.webapp_auth.enabled:
webapp_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_code(app_code=app_code)
if not webapp_settings:
raise NotFound("Web app settings not found.")
app_web_auth_enabled = webapp_settings.access_mode != "public"
_validate_webapp_token(decoded, app_web_auth_enabled, system_features.webapp_auth.enabled)
_validate_user_accessibility(
decoded, app_code, app_web_auth_enabled, system_features.webapp_auth.enabled, webapp_settings
)
_validate_web_sso_token(decoded, system_features, app_code)
return app_model, end_user
except Unauthorized as e:
if system_features.webapp_auth.enabled:
if not app_code:
raise Unauthorized("Please re-login to access the web app.")
app_web_auth_enabled = (
EnterpriseService.WebAppAuth.get_app_access_mode_by_code(app_code=app_code).access_mode != "public"
)
if app_web_auth_enabled:
raise WebAppAuthRequiredError()
if system_features.sso_enforced_for_web:
app_web_sso_enabled = EnterpriseService.get_app_web_sso_enabled(app_code).get("enabled", False)
if app_web_sso_enabled:
raise WebSSOAuthRequiredError()
raise Unauthorized(e.description)
def _validate_webapp_token(decoded, app_web_auth_enabled: bool, system_webapp_auth_enabled: bool):
# Check if authentication is enforced for web app, and if the token source is not webapp,
# raise an error and redirect to login
if system_webapp_auth_enabled and app_web_auth_enabled:
source = decoded.get("token_source")
if not source or source != "webapp":
raise WebAppAuthRequiredError()
def _validate_web_sso_token(decoded, system_features, app_code):
app_web_sso_enabled = False
# Check if authentication is not enforced for web, and if the token source is webapp,
# Check if SSO is enforced for web, and if the token source is not SSO, raise an error and redirect to SSO login
if system_features.sso_enforced_for_web:
app_web_sso_enabled = EnterpriseService.get_app_web_sso_enabled(app_code).get("enabled", False)
if app_web_sso_enabled:
source = decoded.get("token_source")
if not source or source != "sso":
raise WebSSOAuthRequiredError()
# Check if SSO is not enforced for web, and if the token source is SSO,
# raise an error and redirect to normal passport login
if not system_webapp_auth_enabled or not app_web_auth_enabled:
if not system_features.sso_enforced_for_web or not app_web_sso_enabled:
source = decoded.get("token_source")
if source and source == "webapp":
raise Unauthorized("webapp token expired.")
def _validate_user_accessibility(
decoded,
app_code,
app_web_auth_enabled: bool,
system_webapp_auth_enabled: bool,
webapp_settings: WebAppSettings | None,
):
if system_webapp_auth_enabled and app_web_auth_enabled:
# Check if the user is allowed to access the web app
user_id = decoded.get("user_id")
if not user_id:
raise WebAppAuthRequiredError()
if not webapp_settings:
raise WebAppAuthRequiredError("Web app settings not found.")
if WebAppAuthService.is_app_require_permission_check(access_mode=webapp_settings.access_mode):
if not EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(user_id, app_code=app_code):
raise WebAppAuthAccessDeniedError()
auth_type = decoded.get("auth_type")
granted_at = decoded.get("granted_at")
if not auth_type:
raise WebAppAuthAccessDeniedError("Missing auth_type in the token.")
if not granted_at:
raise WebAppAuthAccessDeniedError("Missing granted_at in the token.")
# check if sso has been updated
if auth_type == "external":
last_update_time = EnterpriseService.get_app_sso_settings_last_update_time()
if granted_at and datetime.fromtimestamp(granted_at, tz=UTC) < last_update_time:
raise WebAppAuthAccessDeniedError("SSO settings have been updated. Please re-login.")
elif auth_type == "internal":
last_update_time = EnterpriseService.get_workspace_sso_settings_last_update_time()
if granted_at and datetime.fromtimestamp(granted_at, tz=UTC) < last_update_time:
raise WebAppAuthAccessDeniedError("SSO settings have been updated. Please re-login.")
if source and source == "sso":
raise Unauthorized("sso token expired.")
class WebApiResource(Resource):

View File

@@ -339,13 +339,13 @@ class BaseAgentRunner(AppRunner):
raise ValueError(f"Agent thought {agent_thought.id} not found")
agent_thought = queried_thought
if thought:
if thought is not None:
agent_thought.thought = thought
if tool_name:
if tool_name is not None:
agent_thought.tool = tool_name
if tool_input:
if tool_input is not None:
if isinstance(tool_input, dict):
try:
tool_input = json.dumps(tool_input, ensure_ascii=False)
@@ -354,7 +354,7 @@ class BaseAgentRunner(AppRunner):
agent_thought.tool_input = tool_input
if observation:
if observation is not None:
if isinstance(observation, dict):
try:
observation = json.dumps(observation, ensure_ascii=False)
@@ -363,7 +363,7 @@ class BaseAgentRunner(AppRunner):
agent_thought.observation = observation
if answer:
if answer is not None:
agent_thought.answer = answer
if messages_ids is not None and len(messages_ids) > 0:

View File

@@ -104,6 +104,7 @@ class CotAgentRunner(BaseAgentRunner, ABC):
# recalc llm max tokens
prompt_messages = self._organize_prompt_messages()
self.recalc_llm_max_tokens(self.model_config, prompt_messages)
# invoke model
chunks = model_instance.invoke_llm(
prompt_messages=prompt_messages,
@@ -171,7 +172,7 @@ class CotAgentRunner(BaseAgentRunner, ABC):
self.save_agent_thought(
agent_thought=agent_thought,
tool_name=(scratchpad.action.action_name if scratchpad.action and not scratchpad.is_final() else ""),
tool_name=scratchpad.action.action_name if scratchpad.action else "",
tool_input={scratchpad.action.action_name: scratchpad.action.action_input} if scratchpad.action else {},
tool_invoke_meta={},
thought=scratchpad.thought or "",

View File

@@ -84,6 +84,7 @@ class FunctionCallAgentRunner(BaseAgentRunner):
# recalc llm max tokens
prompt_messages = self._organize_prompt_messages()
self.recalc_llm_max_tokens(self.model_config, prompt_messages)
# invoke model
chunks: Union[Generator[LLMResultChunk, None, None], LLMResult] = model_instance.invoke_llm(
prompt_messages=prompt_messages,

View File

@@ -21,7 +21,7 @@ from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom
from core.app.entities.task_entities import ChatbotAppBlockingResponse, ChatbotAppStreamResponse
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
from extensions.ext_database import db
@@ -336,7 +336,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
except ValidationError as e:
logger.exception("Validation Error when generating")
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)
except ValueError as e:
except (ValueError, InvokeError) as e:
if dify_config.DEBUG:
logger.exception("Error when generating")
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)

View File

@@ -67,17 +67,24 @@ from models.account import Account
from models.enums import CreatedByRole
from models.workflow import (
Workflow,
WorkflowNodeExecution,
WorkflowRunStatus,
)
logger = logging.getLogger(__name__)
class AdvancedChatAppGenerateTaskPipeline:
class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleManage, MessageCycleManage):
"""
AdvancedChatAppGenerateTaskPipeline is a class that generate stream output and state management for Application.
"""
_task_state: WorkflowTaskState
_application_generate_entity: AdvancedChatAppGenerateEntity
_workflow_system_variables: dict[SystemVariableKey, Any]
_wip_workflow_node_executions: dict[str, WorkflowNodeExecution]
_conversation_name_generate_thread: Optional[Thread] = None
def __init__(
self,
application_generate_entity: AdvancedChatAppGenerateEntity,
@@ -89,7 +96,7 @@ class AdvancedChatAppGenerateTaskPipeline:
stream: bool,
dialogue_count: int,
) -> None:
self._base_task_pipeline = BasedGenerateTaskPipeline(
super().__init__(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
stream=stream,
@@ -106,35 +113,32 @@ class AdvancedChatAppGenerateTaskPipeline:
else:
raise NotImplementedError(f"User type not supported: {type(user)}")
self._workflow_cycle_manager = WorkflowCycleManage(
application_generate_entity=application_generate_entity,
workflow_system_variables={
SystemVariableKey.QUERY: message.query,
SystemVariableKey.FILES: application_generate_entity.files,
SystemVariableKey.CONVERSATION_ID: conversation.id,
SystemVariableKey.USER_ID: user_session_id,
SystemVariableKey.DIALOGUE_COUNT: dialogue_count,
SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id,
SystemVariableKey.WORKFLOW_ID: workflow.id,
SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id,
},
)
self._task_state = WorkflowTaskState()
self._message_cycle_manager = MessageCycleManage(
application_generate_entity=application_generate_entity, task_state=self._task_state
)
self._application_generate_entity = application_generate_entity
self._workflow_id = workflow.id
self._workflow_features_dict = workflow.features_dict
self._conversation_id = conversation.id
self._conversation_mode = conversation.mode
self._message_id = message.id
self._message_created_at = int(message.created_at.timestamp())
self._conversation_name_generate_thread: Thread | None = None
self._workflow_system_variables = {
SystemVariableKey.QUERY: message.query,
SystemVariableKey.FILES: application_generate_entity.files,
SystemVariableKey.CONVERSATION_ID: conversation.id,
SystemVariableKey.USER_ID: user_session_id,
SystemVariableKey.DIALOGUE_COUNT: dialogue_count,
SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id,
SystemVariableKey.WORKFLOW_ID: workflow.id,
SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id,
}
self._task_state = WorkflowTaskState()
self._wip_workflow_node_executions = {}
self._conversation_name_generate_thread = None
self._recorded_files: list[Mapping[str, Any]] = []
self._workflow_run_id: str = ""
self._workflow_run_id = ""
def process(self) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]:
"""
@@ -142,13 +146,13 @@ class AdvancedChatAppGenerateTaskPipeline:
:return:
"""
# start generate conversation name thread
self._conversation_name_generate_thread = self._message_cycle_manager._generate_conversation_name(
self._conversation_name_generate_thread = self._generate_conversation_name(
conversation_id=self._conversation_id, query=self._application_generate_entity.query
)
generator = self._wrapper_process_stream_response(trace_manager=self._application_generate_entity.trace_manager)
if self._base_task_pipeline._stream:
if self._stream:
return self._to_stream_response(generator)
else:
return self._to_blocking_response(generator)
@@ -265,26 +269,24 @@ class AdvancedChatAppGenerateTaskPipeline:
# init fake graph runtime state
graph_runtime_state: Optional[GraphRuntimeState] = None
for queue_message in self._base_task_pipeline._queue_manager.listen():
for queue_message in self._queue_manager.listen():
event = queue_message.event
if isinstance(event, QueuePingEvent):
yield self._base_task_pipeline._ping_stream_response()
yield self._ping_stream_response()
elif isinstance(event, QueueErrorEvent):
with Session(db.engine, expire_on_commit=False) as session:
err = self._base_task_pipeline._handle_error(
event=event, session=session, message_id=self._message_id
)
with Session(db.engine) as session:
err = self._handle_error(event=event, session=session, message_id=self._message_id)
session.commit()
yield self._base_task_pipeline._error_to_stream_response(err)
yield self._error_to_stream_response(err)
break
elif isinstance(event, QueueWorkflowStartedEvent):
# override graph runtime state
graph_runtime_state = event.graph_runtime_state
with Session(db.engine, expire_on_commit=False) as session:
with Session(db.engine) as session:
# init workflow run
workflow_run = self._workflow_cycle_manager._handle_workflow_run_start(
workflow_run = self._handle_workflow_run_start(
session=session,
workflow_id=self._workflow_id,
user_id=self._user_id,
@@ -295,7 +297,7 @@ class AdvancedChatAppGenerateTaskPipeline:
if not message:
raise ValueError(f"Message not found: {self._message_id}")
message.workflow_run_id = workflow_run.id
workflow_start_resp = self._workflow_cycle_manager._workflow_start_to_stream_response(
workflow_start_resp = self._workflow_start_to_stream_response(
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
session.commit()
@@ -308,14 +310,12 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried(
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
workflow_node_execution = self._handle_workflow_node_execution_retried(
session=session, workflow_run=workflow_run, event=event
)
node_retry_resp = self._workflow_cycle_manager._workflow_node_retry_to_stream_response(
node_retry_resp = self._workflow_node_retry_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
@@ -329,15 +329,13 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start(
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
workflow_node_execution = self._handle_node_execution_start(
session=session, workflow_run=workflow_run, event=event
)
node_start_resp = self._workflow_cycle_manager._workflow_node_start_to_stream_response(
node_start_resp = self._workflow_node_start_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
@@ -350,16 +348,12 @@ class AdvancedChatAppGenerateTaskPipeline:
elif isinstance(event, QueueNodeSucceededEvent):
# Record files if it's an answer node or end node
if event.node_type in [NodeType.ANSWER, NodeType.END]:
self._recorded_files.extend(
self._workflow_cycle_manager._fetch_files_from_node_outputs(event.outputs or {})
)
self._recorded_files.extend(self._fetch_files_from_node_outputs(event.outputs or {}))
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_success(
session=session, event=event
)
with Session(db.engine) as session:
workflow_node_execution = self._handle_workflow_node_execution_success(session=session, event=event)
node_finish_resp = self._workflow_cycle_manager._workflow_node_finish_to_stream_response(
node_finish_resp = self._workflow_node_finish_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
@@ -370,12 +364,10 @@ class AdvancedChatAppGenerateTaskPipeline:
if node_finish_resp:
yield node_finish_resp
elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent | QueueNodeExceptionEvent):
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed(
session=session, event=event
)
with Session(db.engine) as session:
workflow_node_execution = self._handle_workflow_node_execution_failed(session=session, event=event)
node_finish_resp = self._workflow_cycle_manager._workflow_node_finish_to_stream_response(
node_finish_resp = self._workflow_node_finish_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
@@ -389,17 +381,13 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
parallel_start_resp = (
self._workflow_cycle_manager._workflow_parallel_branch_start_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
parallel_start_resp = self._workflow_parallel_branch_start_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)
yield parallel_start_resp
@@ -407,17 +395,13 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
parallel_finish_resp = (
self._workflow_cycle_manager._workflow_parallel_branch_finished_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
parallel_finish_resp = self._workflow_parallel_branch_finished_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)
yield parallel_finish_resp
@@ -425,11 +409,9 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
iter_start_resp = self._workflow_cycle_manager._workflow_iteration_start_to_stream_response(
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
iter_start_resp = self._workflow_iteration_start_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
@@ -441,11 +423,9 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
iter_next_resp = self._workflow_cycle_manager._workflow_iteration_next_to_stream_response(
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
iter_next_resp = self._workflow_iteration_next_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
@@ -457,11 +437,9 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
iter_finish_resp = self._workflow_cycle_manager._workflow_iteration_completed_to_stream_response(
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
iter_finish_resp = self._workflow_iteration_completed_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
@@ -476,8 +454,8 @@ class AdvancedChatAppGenerateTaskPipeline:
if not graph_runtime_state:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._handle_workflow_run_success(
with Session(db.engine) as session:
workflow_run = self._handle_workflow_run_success(
session=session,
workflow_run_id=self._workflow_run_id,
start_at=graph_runtime_state.start_at,
@@ -488,23 +466,21 @@ class AdvancedChatAppGenerateTaskPipeline:
trace_manager=trace_manager,
)
workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response(
workflow_finish_resp = self._workflow_finish_to_stream_response(
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
session.commit()
yield workflow_finish_resp
self._base_task_pipeline._queue_manager.publish(
QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE
)
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._handle_workflow_run_partial_success(
with Session(db.engine) as session:
workflow_run = self._handle_workflow_run_partial_success(
session=session,
workflow_run_id=self._workflow_run_id,
start_at=graph_runtime_state.start_at,
@@ -515,23 +491,21 @@ class AdvancedChatAppGenerateTaskPipeline:
conversation_id=None,
trace_manager=trace_manager,
)
workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response(
workflow_finish_resp = self._workflow_finish_to_stream_response(
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
session.commit()
yield workflow_finish_resp
self._base_task_pipeline._queue_manager.publish(
QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE
)
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
elif isinstance(event, QueueWorkflowFailedEvent):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._handle_workflow_run_failed(
with Session(db.engine) as session:
workflow_run = self._handle_workflow_run_failed(
session=session,
workflow_run_id=self._workflow_run_id,
start_at=graph_runtime_state.start_at,
@@ -543,22 +517,20 @@ class AdvancedChatAppGenerateTaskPipeline:
trace_manager=trace_manager,
exceptions_count=event.exceptions_count,
)
workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response(
workflow_finish_resp = self._workflow_finish_to_stream_response(
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
err_event = QueueErrorEvent(error=ValueError(f"Run failed: {workflow_run.error}"))
err = self._base_task_pipeline._handle_error(
event=err_event, session=session, message_id=self._message_id
)
err = self._handle_error(event=err_event, session=session, message_id=self._message_id)
session.commit()
yield workflow_finish_resp
yield self._base_task_pipeline._error_to_stream_response(err)
yield self._error_to_stream_response(err)
break
elif isinstance(event, QueueStopEvent):
if self._workflow_run_id and graph_runtime_state:
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._handle_workflow_run_failed(
with Session(db.engine) as session:
workflow_run = self._handle_workflow_run_failed(
session=session,
workflow_run_id=self._workflow_run_id,
start_at=graph_runtime_state.start_at,
@@ -569,7 +541,7 @@ class AdvancedChatAppGenerateTaskPipeline:
conversation_id=self._conversation_id,
trace_manager=trace_manager,
)
workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response(
workflow_finish_resp = self._workflow_finish_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
@@ -583,18 +555,18 @@ class AdvancedChatAppGenerateTaskPipeline:
yield self._message_end_to_stream_response()
break
elif isinstance(event, QueueRetrieverResourcesEvent):
self._message_cycle_manager._handle_retriever_resources(event)
self._handle_retriever_resources(event)
with Session(db.engine, expire_on_commit=False) as session:
with Session(db.engine) as session:
message = self._get_message(session=session)
message.message_metadata = (
json.dumps(jsonable_encoder(self._task_state.metadata)) if self._task_state.metadata else None
)
session.commit()
elif isinstance(event, QueueAnnotationReplyEvent):
self._message_cycle_manager._handle_annotation_reply(event)
self._handle_annotation_reply(event)
with Session(db.engine, expire_on_commit=False) as session:
with Session(db.engine) as session:
message = self._get_message(session=session)
message.message_metadata = (
json.dumps(jsonable_encoder(self._task_state.metadata)) if self._task_state.metadata else None
@@ -615,27 +587,23 @@ class AdvancedChatAppGenerateTaskPipeline:
tts_publisher.publish(queue_message)
self._task_state.answer += delta_text
yield self._message_cycle_manager._message_to_stream_response(
yield self._message_to_stream_response(
answer=delta_text, message_id=self._message_id, from_variable_selector=event.from_variable_selector
)
elif isinstance(event, QueueMessageReplaceEvent):
# published by moderation
yield self._message_cycle_manager._message_replace_to_stream_response(answer=event.text)
yield self._message_replace_to_stream_response(answer=event.text)
elif isinstance(event, QueueAdvancedChatMessageEndEvent):
if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")
output_moderation_answer = self._base_task_pipeline._handle_output_moderation_when_task_finished(
self._task_state.answer
)
output_moderation_answer = self._handle_output_moderation_when_task_finished(self._task_state.answer)
if output_moderation_answer:
self._task_state.answer = output_moderation_answer
yield self._message_cycle_manager._message_replace_to_stream_response(
answer=output_moderation_answer
)
yield self._message_replace_to_stream_response(answer=output_moderation_answer)
# Save message
with Session(db.engine, expire_on_commit=False) as session:
with Session(db.engine) as session:
self._save_message(session=session, graph_runtime_state=graph_runtime_state)
session.commit()
@@ -653,7 +621,7 @@ class AdvancedChatAppGenerateTaskPipeline:
def _save_message(self, *, session: Session, graph_runtime_state: Optional[GraphRuntimeState] = None) -> None:
message = self._get_message(session=session)
message.answer = self._task_state.answer
message.provider_response_latency = time.perf_counter() - self._base_task_pipeline._start_at
message.provider_response_latency = time.perf_counter() - self._start_at
message.message_metadata = (
json.dumps(jsonable_encoder(self._task_state.metadata)) if self._task_state.metadata else None
)
@@ -717,20 +685,20 @@ class AdvancedChatAppGenerateTaskPipeline:
:param text: text
:return: True if output moderation should direct output, otherwise False
"""
if self._base_task_pipeline._output_moderation_handler:
if self._base_task_pipeline._output_moderation_handler.should_direct_output():
if self._output_moderation_handler:
if self._output_moderation_handler.should_direct_output():
# stop subscribe new token when output moderation should direct output
self._task_state.answer = self._base_task_pipeline._output_moderation_handler.get_final_output()
self._base_task_pipeline._queue_manager.publish(
self._task_state.answer = self._output_moderation_handler.get_final_output()
self._queue_manager.publish(
QueueTextChunkEvent(text=self._task_state.answer), PublishFrom.TASK_PIPELINE
)
self._base_task_pipeline._queue_manager.publish(
self._queue_manager.publish(
QueueStopEvent(stopped_by=QueueStopEvent.StopBy.OUTPUT_MODERATION), PublishFrom.TASK_PIPELINE
)
return True
else:
self._base_task_pipeline._output_moderation_handler.append_new_token(text)
self._output_moderation_handler.append_new_token(text)
return False

View File

@@ -18,7 +18,7 @@ from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskSt
from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity, InvokeFrom
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.ops.ops_trace_manager import TraceQueueManager
from extensions.ext_database import db
from factories import file_factory
@@ -245,7 +245,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
except ValidationError as e:
logger.exception("Validation Error when generating")
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)
except ValueError as e:
except (ValueError, InvokeError) as e:
if dify_config.DEBUG:
logger.exception("Error when generating")
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)

View File

@@ -55,6 +55,20 @@ class AgentChatAppRunner(AppRunner):
query = application_generate_entity.query
files = application_generate_entity.files
# Pre-calculate the number of tokens of the prompt messages,
# and return the rest number of tokens by model context token size limit and max token size limit.
# If the rest number of tokens is not enough, raise exception.
# Include: prompt template, inputs, query(optional), files(optional)
# Not Include: memory, external data, dataset context
self.get_pre_calculate_rest_tokens(
app_record=app_record,
model_config=application_generate_entity.model_conf,
prompt_template_entity=app_config.prompt_template,
inputs=inputs,
files=files,
query=query,
)
memory = None
if application_generate_entity.conversation_id:
# get memory of conversation (read-only)
@@ -188,7 +202,7 @@ class AgentChatAppRunner(AppRunner):
# change function call strategy based on LLM model
llm_model = cast(LargeLanguageModel, model_instance.model_type_instance)
model_schema = llm_model.get_model_schema(model_instance.model, model_instance.credentials)
if not model_schema:
if not model_schema or not model_schema.features:
raise ValueError("Model schema not found")
if {ModelFeature.MULTI_TOOL_CALL, ModelFeature.TOOL_CALL}.intersection(model_schema.features or []):

View File

@@ -167,7 +167,8 @@ class AppQueueManager:
else:
if isinstance(data, DeclarativeMeta) or hasattr(data, "_sa_instance_state"):
raise TypeError(
"Critical Error: Passing SQLAlchemy Model instances that cause thread safety issues is not allowed."
"Critical Error: Passing SQLAlchemy Model instances "
"that cause thread safety issues is not allowed."
)

View File

@@ -15,8 +15,10 @@ from core.app.features.annotation_reply.annotation_reply import AnnotationReplyF
from core.app.features.hosting_moderation.hosting_moderation import HostingModerationFeature
from core.external_data_tool.external_data_fetch import ExternalDataFetch
from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
from core.model_runtime.entities.message_entities import AssistantPromptMessage, PromptMessage
from core.model_runtime.entities.model_entities import ModelPropertyKey
from core.model_runtime.errors.invoke import InvokeBadRequestError
from core.moderation.input_moderation import InputModeration
from core.prompt.advanced_prompt_transform import AdvancedPromptTransform
@@ -29,6 +31,106 @@ if TYPE_CHECKING:
class AppRunner:
def get_pre_calculate_rest_tokens(
self,
app_record: App,
model_config: ModelConfigWithCredentialsEntity,
prompt_template_entity: PromptTemplateEntity,
inputs: Mapping[str, str],
files: Sequence["File"],
query: Optional[str] = None,
) -> int:
"""
Get pre calculate rest tokens
:param app_record: app record
:param model_config: model config entity
:param prompt_template_entity: prompt template entity
:param inputs: inputs
:param files: files
:param query: query
:return:
"""
# Invoke model
model_instance = ModelInstance(
provider_model_bundle=model_config.provider_model_bundle, model=model_config.model
)
model_context_tokens = model_config.model_schema.model_properties.get(ModelPropertyKey.CONTEXT_SIZE)
max_tokens = 0
for parameter_rule in model_config.model_schema.parameter_rules:
if parameter_rule.name == "max_tokens" or (
parameter_rule.use_template and parameter_rule.use_template == "max_tokens"
):
max_tokens = (
model_config.parameters.get(parameter_rule.name)
or model_config.parameters.get(parameter_rule.use_template or "")
) or 0
if model_context_tokens is None:
return -1
if max_tokens is None:
max_tokens = 0
# get prompt messages without memory and context
prompt_messages, stop = self.organize_prompt_messages(
app_record=app_record,
model_config=model_config,
prompt_template_entity=prompt_template_entity,
inputs=inputs,
files=files,
query=query,
)
prompt_tokens = model_instance.get_llm_num_tokens(prompt_messages)
rest_tokens: int = model_context_tokens - max_tokens - prompt_tokens
if rest_tokens < 0:
raise InvokeBadRequestError(
"Query or prefix prompt is too long, you can reduce the prefix prompt, "
"or shrink the max token, or switch to a llm with a larger token limit size."
)
return rest_tokens
def recalc_llm_max_tokens(
self, model_config: ModelConfigWithCredentialsEntity, prompt_messages: list[PromptMessage]
):
# recalc max_tokens if sum(prompt_token + max_tokens) over model token limit
model_instance = ModelInstance(
provider_model_bundle=model_config.provider_model_bundle, model=model_config.model
)
model_context_tokens = model_config.model_schema.model_properties.get(ModelPropertyKey.CONTEXT_SIZE)
max_tokens = 0
for parameter_rule in model_config.model_schema.parameter_rules:
if parameter_rule.name == "max_tokens" or (
parameter_rule.use_template and parameter_rule.use_template == "max_tokens"
):
max_tokens = (
model_config.parameters.get(parameter_rule.name)
or model_config.parameters.get(parameter_rule.use_template or "")
) or 0
if model_context_tokens is None:
return -1
if max_tokens is None:
max_tokens = 0
prompt_tokens = model_instance.get_llm_num_tokens(prompt_messages)
if prompt_tokens + max_tokens > model_context_tokens:
max_tokens = max(model_context_tokens - prompt_tokens, 16)
for parameter_rule in model_config.model_schema.parameter_rules:
if parameter_rule.name == "max_tokens" or (
parameter_rule.use_template and parameter_rule.use_template == "max_tokens"
):
model_config.parameters[parameter_rule.name] = max_tokens
def organize_prompt_messages(
self,
app_record: App,

View File

@@ -18,7 +18,7 @@ from core.app.apps.chat.generate_response_converter import ChatAppGenerateRespon
from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import ChatAppGenerateEntity, InvokeFrom
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.ops.ops_trace_manager import TraceQueueManager
from extensions.ext_database import db
from factories import file_factory
@@ -237,7 +237,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
except ValidationError as e:
logger.exception("Validation Error when generating")
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)
except ValueError as e:
except (ValueError, InvokeError) as e:
if dify_config.DEBUG:
logger.exception("Error when generating")
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)

View File

@@ -50,6 +50,20 @@ class ChatAppRunner(AppRunner):
query = application_generate_entity.query
files = application_generate_entity.files
# Pre-calculate the number of tokens of the prompt messages,
# and return the rest number of tokens by model context token size limit and max token size limit.
# If the rest number of tokens is not enough, raise exception.
# Include: prompt template, inputs, query(optional), files(optional)
# Not Include: memory, external data, dataset context
self.get_pre_calculate_rest_tokens(
app_record=app_record,
model_config=application_generate_entity.model_conf,
prompt_template_entity=app_config.prompt_template,
inputs=inputs,
files=files,
query=query,
)
memory = None
if application_generate_entity.conversation_id:
# get memory of conversation (read-only)
@@ -180,6 +194,9 @@ class ChatAppRunner(AppRunner):
if hosting_moderation_result:
return
# Re-calculate the max tokens if sum(prompt_token + max_tokens) over model token limit
self.recalc_llm_max_tokens(model_config=application_generate_entity.model_conf, prompt_messages=prompt_messages)
# Invoke model
model_instance = ModelInstance(
provider_model_bundle=application_generate_entity.model_conf.provider_model_bundle,

View File

@@ -17,7 +17,7 @@ from core.app.apps.completion.generate_response_converter import CompletionAppGe
from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import CompletionAppGenerateEntity, InvokeFrom
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.ops.ops_trace_manager import TraceQueueManager
from extensions.ext_database import db
from factories import file_factory
@@ -214,7 +214,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
except ValidationError as e:
logger.exception("Validation Error when generating")
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)
except ValueError as e:
except (ValueError, InvokeError) as e:
if dify_config.DEBUG:
logger.exception("Error when generating")
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)

View File

@@ -43,6 +43,20 @@ class CompletionAppRunner(AppRunner):
query = application_generate_entity.query
files = application_generate_entity.files
# Pre-calculate the number of tokens of the prompt messages,
# and return the rest number of tokens by model context token size limit and max token size limit.
# If the rest number of tokens is not enough, raise exception.
# Include: prompt template, inputs, query(optional), files(optional)
# Not Include: memory, external data, dataset context
self.get_pre_calculate_rest_tokens(
app_record=app_record,
model_config=application_generate_entity.model_conf,
prompt_template_entity=app_config.prompt_template,
inputs=inputs,
files=files,
query=query,
)
# organize all inputs and template to prompt messages
# Include: prompt template, inputs, query(optional), files(optional)
prompt_messages, stop = self.organize_prompt_messages(
@@ -138,6 +152,9 @@ class CompletionAppRunner(AppRunner):
if hosting_moderation_result:
return
# Re-calculate the max tokens if sum(prompt_token + max_tokens) over model token limit
self.recalc_llm_max_tokens(model_config=application_generate_entity.model_conf, prompt_messages=prompt_messages)
# Invoke model
model_instance = ModelInstance(
provider_model_bundle=application_generate_entity.model_conf.provider_model_bundle,

View File

@@ -89,7 +89,6 @@ class MessageBasedAppGenerator(BaseAppGenerator):
Conversation.id == conversation_id,
Conversation.app_id == app_model.id,
Conversation.status == "normal",
Conversation.is_deleted.is_(False),
]
if isinstance(user, Account):

View File

@@ -20,7 +20,7 @@ from core.app.apps.workflow.generate_response_converter import WorkflowAppGenera
from core.app.apps.workflow.generate_task_pipeline import WorkflowAppGenerateTaskPipeline
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.task_entities import WorkflowAppBlockingResponse, WorkflowAppStreamResponse
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.ops.ops_trace_manager import TraceQueueManager
from extensions.ext_database import db
from factories import file_factory
@@ -221,7 +221,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
single_iteration_run=WorkflowAppGenerateEntity.SingleIterationRunEntity(
node_id=node_id, inputs=args["inputs"]
),
workflow_run_id=str(uuid.uuid4()),
)
contexts.tenant_id.set(application_generate_entity.app_config.tenant_id)
@@ -271,7 +270,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
except ValidationError as e:
logger.exception("Validation Error when generating")
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)
except ValueError as e:
except (ValueError, InvokeError) as e:
if dify_config.DEBUG:
logger.exception("Error when generating")
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)

View File

@@ -1,7 +1,7 @@
import logging
import time
from collections.abc import Generator
from typing import Optional, Union
from typing import Any, Optional, Union
from sqlalchemy.orm import Session
@@ -58,6 +58,7 @@ from models.workflow import (
Workflow,
WorkflowAppLog,
WorkflowAppLogCreatedFrom,
WorkflowNodeExecution,
WorkflowRun,
WorkflowRunStatus,
)
@@ -65,11 +66,16 @@ from models.workflow import (
logger = logging.getLogger(__name__)
class WorkflowAppGenerateTaskPipeline:
class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleManage):
"""
WorkflowAppGenerateTaskPipeline is a class that generate stream output and state management for Application.
"""
_task_state: WorkflowTaskState
_application_generate_entity: WorkflowAppGenerateEntity
_workflow_system_variables: dict[SystemVariableKey, Any]
_wip_workflow_node_executions: dict[str, WorkflowNodeExecution]
def __init__(
self,
application_generate_entity: WorkflowAppGenerateEntity,
@@ -78,7 +84,7 @@ class WorkflowAppGenerateTaskPipeline:
user: Union[Account, EndUser],
stream: bool,
) -> None:
self._base_task_pipeline = BasedGenerateTaskPipeline(
super().__init__(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
stream=stream,
@@ -95,21 +101,19 @@ class WorkflowAppGenerateTaskPipeline:
else:
raise ValueError(f"Invalid user type: {type(user)}")
self._workflow_cycle_manager = WorkflowCycleManage(
application_generate_entity=application_generate_entity,
workflow_system_variables={
SystemVariableKey.FILES: application_generate_entity.files,
SystemVariableKey.USER_ID: user_session_id,
SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id,
SystemVariableKey.WORKFLOW_ID: workflow.id,
SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id,
},
)
self._application_generate_entity = application_generate_entity
self._workflow_id = workflow.id
self._workflow_features_dict = workflow.features_dict
self._workflow_system_variables = {
SystemVariableKey.FILES: application_generate_entity.files,
SystemVariableKey.USER_ID: user_session_id,
SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id,
SystemVariableKey.WORKFLOW_ID: workflow.id,
SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id,
}
self._task_state = WorkflowTaskState()
self._wip_workflow_node_executions = {}
self._workflow_run_id = ""
def process(self) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:
@@ -118,7 +122,7 @@ class WorkflowAppGenerateTaskPipeline:
:return:
"""
generator = self._wrapper_process_stream_response(trace_manager=self._application_generate_entity.trace_manager)
if self._base_task_pipeline._stream:
if self._stream:
return self._to_stream_response(generator)
else:
return self._to_blocking_response(generator)
@@ -233,29 +237,29 @@ class WorkflowAppGenerateTaskPipeline:
"""
graph_runtime_state = None
for queue_message in self._base_task_pipeline._queue_manager.listen():
for queue_message in self._queue_manager.listen():
event = queue_message.event
if isinstance(event, QueuePingEvent):
yield self._base_task_pipeline._ping_stream_response()
yield self._ping_stream_response()
elif isinstance(event, QueueErrorEvent):
err = self._base_task_pipeline._handle_error(event=event)
yield self._base_task_pipeline._error_to_stream_response(err)
err = self._handle_error(event=event)
yield self._error_to_stream_response(err)
break
elif isinstance(event, QueueWorkflowStartedEvent):
# override graph runtime state
graph_runtime_state = event.graph_runtime_state
with Session(db.engine, expire_on_commit=False) as session:
with Session(db.engine) as session:
# init workflow run
workflow_run = self._workflow_cycle_manager._handle_workflow_run_start(
workflow_run = self._handle_workflow_run_start(
session=session,
workflow_id=self._workflow_id,
user_id=self._user_id,
created_by_role=self._created_by_role,
)
self._workflow_run_id = workflow_run.id
start_resp = self._workflow_cycle_manager._workflow_start_to_stream_response(
start_resp = self._workflow_start_to_stream_response(
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
session.commit()
@@ -267,14 +271,12 @@ class WorkflowAppGenerateTaskPipeline:
):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried(
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
workflow_node_execution = self._handle_workflow_node_execution_retried(
session=session, workflow_run=workflow_run, event=event
)
response = self._workflow_cycle_manager._workflow_node_retry_to_stream_response(
response = self._workflow_node_retry_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
@@ -288,14 +290,12 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start(
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
workflow_node_execution = self._handle_node_execution_start(
session=session, workflow_run=workflow_run, event=event
)
node_start_response = self._workflow_cycle_manager._workflow_node_start_to_stream_response(
node_start_response = self._workflow_node_start_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
@@ -306,11 +306,9 @@ class WorkflowAppGenerateTaskPipeline:
if node_start_response:
yield node_start_response
elif isinstance(event, QueueNodeSucceededEvent):
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_success(
session=session, event=event
)
node_success_response = self._workflow_cycle_manager._workflow_node_finish_to_stream_response(
with Session(db.engine) as session:
workflow_node_execution = self._handle_workflow_node_execution_success(session=session, event=event)
node_success_response = self._workflow_node_finish_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
@@ -321,12 +319,12 @@ class WorkflowAppGenerateTaskPipeline:
if node_success_response:
yield node_success_response
elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent | QueueNodeExceptionEvent):
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed(
with Session(db.engine) as session:
workflow_node_execution = self._handle_workflow_node_execution_failed(
session=session,
event=event,
)
node_failed_response = self._workflow_cycle_manager._workflow_node_finish_to_stream_response(
node_failed_response = self._workflow_node_finish_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
@@ -341,17 +339,13 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
parallel_start_resp = (
self._workflow_cycle_manager._workflow_parallel_branch_start_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
parallel_start_resp = self._workflow_parallel_branch_start_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)
yield parallel_start_resp
@@ -360,17 +354,13 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
parallel_finish_resp = (
self._workflow_cycle_manager._workflow_parallel_branch_finished_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
parallel_finish_resp = self._workflow_parallel_branch_finished_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)
yield parallel_finish_resp
@@ -379,11 +369,9 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
iter_start_resp = self._workflow_cycle_manager._workflow_iteration_start_to_stream_response(
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
iter_start_resp = self._workflow_iteration_start_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
@@ -396,11 +384,9 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
iter_next_resp = self._workflow_cycle_manager._workflow_iteration_next_to_stream_response(
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
iter_next_resp = self._workflow_iteration_next_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
@@ -413,11 +399,9 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
iter_finish_resp = self._workflow_cycle_manager._workflow_iteration_completed_to_stream_response(
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
iter_finish_resp = self._workflow_iteration_completed_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
@@ -432,8 +416,8 @@ class WorkflowAppGenerateTaskPipeline:
if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._handle_workflow_run_success(
with Session(db.engine) as session:
workflow_run = self._handle_workflow_run_success(
session=session,
workflow_run_id=self._workflow_run_id,
start_at=graph_runtime_state.start_at,
@@ -447,7 +431,7 @@ class WorkflowAppGenerateTaskPipeline:
# save workflow app log
self._save_workflow_app_log(session=session, workflow_run=workflow_run)
workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response(
workflow_finish_resp = self._workflow_finish_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
@@ -461,8 +445,8 @@ class WorkflowAppGenerateTaskPipeline:
if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._handle_workflow_run_partial_success(
with Session(db.engine) as session:
workflow_run = self._handle_workflow_run_partial_success(
session=session,
workflow_run_id=self._workflow_run_id,
start_at=graph_runtime_state.start_at,
@@ -477,7 +461,7 @@ class WorkflowAppGenerateTaskPipeline:
# save workflow app log
self._save_workflow_app_log(session=session, workflow_run=workflow_run)
workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response(
workflow_finish_resp = self._workflow_finish_to_stream_response(
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
session.commit()
@@ -489,8 +473,8 @@ class WorkflowAppGenerateTaskPipeline:
if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._handle_workflow_run_failed(
with Session(db.engine) as session:
workflow_run = self._handle_workflow_run_failed(
session=session,
workflow_run_id=self._workflow_run_id,
start_at=graph_runtime_state.start_at,
@@ -508,7 +492,7 @@ class WorkflowAppGenerateTaskPipeline:
# save workflow app log
self._save_workflow_app_log(session=session, workflow_run=workflow_run)
workflow_finish_resp = self._workflow_cycle_manager._workflow_finish_to_stream_response(
workflow_finish_resp = self._workflow_finish_to_stream_response(
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
session.commit()

View File

@@ -195,7 +195,7 @@ class WorkflowAppGenerateEntity(AppGenerateEntity):
# app config
app_config: WorkflowUIBasedAppConfig
workflow_run_id: str
workflow_run_id: Optional[str] = None
class SingleIterationRunEntity(BaseModel):
"""

View File

@@ -15,6 +15,7 @@ from core.app.entities.queue_entities import (
from core.app.entities.task_entities import (
ErrorStreamResponse,
PingStreamResponse,
TaskState,
)
from core.errors.error import QuotaExceededError
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
@@ -29,12 +30,22 @@ class BasedGenerateTaskPipeline:
BasedGenerateTaskPipeline is a class that generate stream output and state management for Application.
"""
_task_state: TaskState
_application_generate_entity: AppGenerateEntity
def __init__(
self,
application_generate_entity: AppGenerateEntity,
queue_manager: AppQueueManager,
stream: bool,
) -> None:
"""
Initialize GenerateTaskPipeline.
:param application_generate_entity: application generate entity
:param queue_manager: queue manager
:param user: user
:param stream: stream
"""
self._application_generate_entity = application_generate_entity
self._queue_manager = queue_manager
self._start_at = time.perf_counter()

View File

@@ -31,19 +31,10 @@ from services.annotation_service import AppAnnotationService
class MessageCycleManage:
def __init__(
self,
*,
application_generate_entity: Union[
ChatAppGenerateEntity,
CompletionAppGenerateEntity,
AgentChatAppGenerateEntity,
AdvancedChatAppGenerateEntity,
],
task_state: Union[EasyUITaskState, WorkflowTaskState],
) -> None:
self._application_generate_entity = application_generate_entity
self._task_state = task_state
_application_generate_entity: Union[
ChatAppGenerateEntity, CompletionAppGenerateEntity, AgentChatAppGenerateEntity, AdvancedChatAppGenerateEntity
]
_task_state: Union[EasyUITaskState, WorkflowTaskState]
def _generate_conversation_name(self, *, conversation_id: str, query: str) -> Optional[Thread]:
"""
@@ -145,7 +136,7 @@ class MessageCycleManage:
# get extension
if "." in message_file.url:
extension = f".{message_file.url.split('.')[-1]}"
extension = f'.{message_file.url.split(".")[-1]}'
if len(extension) > 10:
extension = ".bin"
else:

View File

@@ -34,6 +34,7 @@ from core.app.entities.task_entities import (
ParallelBranchStartStreamResponse,
WorkflowFinishStreamResponse,
WorkflowStartStreamResponse,
WorkflowTaskState,
)
from core.file import FILE_MODEL_IDENTITY, File
from core.model_runtime.utils.encoders import jsonable_encoder
@@ -57,20 +58,13 @@ from models.workflow import (
WorkflowRunStatus,
)
from .exc import WorkflowRunNotFoundError
from .exc import WorkflowNodeExecutionNotFoundError, WorkflowRunNotFoundError
class WorkflowCycleManage:
def __init__(
self,
*,
application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity],
workflow_system_variables: dict[SystemVariableKey, Any],
) -> None:
self._workflow_run: WorkflowRun | None = None
self._workflow_node_executions: dict[str, WorkflowNodeExecution] = {}
self._application_generate_entity = application_generate_entity
self._workflow_system_variables = workflow_system_variables
_application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity]
_task_state: WorkflowTaskState
_workflow_system_variables: dict[SystemVariableKey, Any]
def _handle_workflow_run_start(
self,
@@ -108,8 +102,7 @@ class WorkflowCycleManage:
inputs = dict(WorkflowEntry.handle_special_values(inputs) or {})
# init workflow run
# TODO: This workflow_run_id should always not be None, maybe we can use a more elegant way to handle this
workflow_run_id = str(self._workflow_system_variables.get(SystemVariableKey.WORKFLOW_RUN_ID) or uuid4())
workflow_run_id = str(self._workflow_system_variables.get(SystemVariableKey.WORKFLOW_RUN_ID, uuid4()))
workflow_run = WorkflowRun()
workflow_run.id = workflow_run_id
@@ -246,7 +239,7 @@ class WorkflowCycleManage:
workflow_run.finished_at = datetime.now(UTC).replace(tzinfo=None)
workflow_run.exceptions_count = exceptions_count
stmt = select(WorkflowNodeExecution.node_execution_id).where(
stmt = select(WorkflowNodeExecution).where(
WorkflowNodeExecution.tenant_id == workflow_run.tenant_id,
WorkflowNodeExecution.app_id == workflow_run.app_id,
WorkflowNodeExecution.workflow_id == workflow_run.workflow_id,
@@ -254,18 +247,16 @@ class WorkflowCycleManage:
WorkflowNodeExecution.workflow_run_id == workflow_run.id,
WorkflowNodeExecution.status == WorkflowNodeExecutionStatus.RUNNING.value,
)
ids = session.scalars(stmt).all()
# Use self._get_workflow_node_execution here to make sure the cache is updated
running_workflow_node_executions = [
self._get_workflow_node_execution(session=session, node_execution_id=id) for id in ids if id
]
running_workflow_node_executions = session.scalars(stmt).all()
for workflow_node_execution in running_workflow_node_executions:
now = datetime.now(UTC).replace(tzinfo=None)
workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
workflow_node_execution.error = error
workflow_node_execution.finished_at = now
workflow_node_execution.elapsed_time = (now - workflow_node_execution.created_at).total_seconds()
workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
workflow_node_execution.elapsed_time = (
workflow_node_execution.finished_at - workflow_node_execution.created_at
).total_seconds()
if trace_manager:
trace_manager.add_trace_task(
@@ -283,7 +274,7 @@ class WorkflowCycleManage:
self, *, session: Session, workflow_run: WorkflowRun, event: QueueNodeStartedEvent
) -> WorkflowNodeExecution:
workflow_node_execution = WorkflowNodeExecution()
workflow_node_execution.id = str(uuid4())
workflow_node_execution.id = event.node_execution_id
workflow_node_execution.tenant_id = workflow_run.tenant_id
workflow_node_execution.app_id = workflow_run.app_id
workflow_node_execution.workflow_id = workflow_run.workflow_id
@@ -307,8 +298,6 @@ class WorkflowCycleManage:
workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
session.add(workflow_node_execution)
self._workflow_node_executions[event.node_execution_id] = workflow_node_execution
return workflow_node_execution
def _handle_workflow_node_execution_success(
@@ -336,7 +325,6 @@ class WorkflowCycleManage:
workflow_node_execution.finished_at = finished_at
workflow_node_execution.elapsed_time = elapsed_time
workflow_node_execution = session.merge(workflow_node_execution)
return workflow_node_execution
def _handle_workflow_node_execution_failed(
@@ -376,7 +364,6 @@ class WorkflowCycleManage:
workflow_node_execution.elapsed_time = elapsed_time
workflow_node_execution.execution_metadata = execution_metadata
workflow_node_execution = session.merge(workflow_node_execution)
return workflow_node_execution
def _handle_workflow_node_execution_retried(
@@ -404,7 +391,7 @@ class WorkflowCycleManage:
execution_metadata = json.dumps(merged_metadata)
workflow_node_execution = WorkflowNodeExecution()
workflow_node_execution.id = str(uuid4())
workflow_node_execution.id = event.node_execution_id
workflow_node_execution.tenant_id = workflow_run.tenant_id
workflow_node_execution.app_id = workflow_run.app_id
workflow_node_execution.workflow_id = workflow_run.workflow_id
@@ -428,8 +415,6 @@ class WorkflowCycleManage:
workflow_node_execution.index = event.node_run_index
session.add(workflow_node_execution)
self._workflow_node_executions[event.node_execution_id] = workflow_node_execution
return workflow_node_execution
#################################################
@@ -826,20 +811,22 @@ class WorkflowCycleManage:
return None
def _get_workflow_run(self, *, session: Session, workflow_run_id: str) -> WorkflowRun:
if self._workflow_run and self._workflow_run.id == workflow_run_id:
cached_workflow_run = self._workflow_run
cached_workflow_run = session.merge(cached_workflow_run)
return cached_workflow_run
"""
Refetch workflow run
:param workflow_run_id: workflow run id
:return:
"""
stmt = select(WorkflowRun).where(WorkflowRun.id == workflow_run_id)
workflow_run = session.scalar(stmt)
if not workflow_run:
raise WorkflowRunNotFoundError(workflow_run_id)
self._workflow_run = workflow_run
return workflow_run
def _get_workflow_node_execution(self, session: Session, node_execution_id: str) -> WorkflowNodeExecution:
if node_execution_id not in self._workflow_node_executions:
raise ValueError(f"Workflow node execution not found: {node_execution_id}")
cached_workflow_node_execution = self._workflow_node_executions[node_execution_id]
return cached_workflow_node_execution
stmt = select(WorkflowNodeExecution).where(WorkflowNodeExecution.id == node_execution_id)
workflow_node_execution = session.scalar(stmt)
if not workflow_node_execution:
raise WorkflowNodeExecutionNotFoundError(node_execution_id)
return workflow_node_execution

Some files were not shown because too many files have changed in this diff Show More