Compare commits

...

41 Commits

Author SHA1 Message Date
-LAN-
76fd286a89 Initialize workflow runtime state explicitly 2026-04-03 16:54:32 +08:00
yyh
c94951b2f8 refactor(web): migrate notion page selectors to tanstack virtual (#34508)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-03 07:03:12 +00:00
Matt Van Horn
a9cf8f6c5d refactor(web): replace react-syntax-highlighter with shiki (#33473)
Co-authored-by: Matt Van Horn <455140+mvanhorn@users.noreply.github.com>
Co-authored-by: Stephen Zhou <38493346+hyoban@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-03 06:40:26 +00:00
YBoy
64ddec0d67 refactor(api): type annotation service dicts with TypedDict (#34482)
Co-authored-by: Asuka Minato <i@asukaminato.eu.org>
2026-04-03 06:25:52 +00:00
Renzo
da3b0caf5e refactor: select in account_service (RegisterService class) (#34500)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/amd64, ubuntu-latest, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/amd64, ubuntu-latest, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Skip Duplicate Checks (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / Run API Tests (push) Has been cancelled
Main CI Pipeline / Skip API Tests (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Run Web Tests (push) Has been cancelled
Main CI Pipeline / Skip Web Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Run Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Skip Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / Run VDB Tests (push) Has been cancelled
Main CI Pipeline / Skip VDB Tests (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / Run DB Migration Test (push) Has been cancelled
Main CI Pipeline / Skip DB Migration Test (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-03 06:21:26 +00:00
Stephen Zhou
4fedd43af5 chore: update code-inspector-plugin to 1.5.1 (#34506) 2026-04-03 05:34:03 +00:00
yyh
a263f28e19 fix(web): restore ui select public exports (#34501) 2026-04-03 04:42:02 +00:00
Stephen Zhou
d53862f135 chore: override lodash (#34502) 2026-04-03 04:40:46 +00:00
Renzo
608958de1c refactor: select in external_knowledge_service (#34493)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-03 03:42:16 +00:00
Renzo
7eb632eb34 refactor: select in rag_pipeline (#34495)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-03 03:42:01 +00:00
Renzo
33d4fd357c refactor: select in account_service (AccountService class) (#34496) 2026-04-03 03:41:46 +00:00
agenthaulk
e55bd61c17 refactor: replace useContext with use in selected batch (#34450) 2026-04-03 03:37:35 +00:00
Stephen Zhou
f2fc213d52 chore: update deps (#34487) 2026-04-03 03:26:49 +00:00
YBoy
f814579ed2 test: migrate service_api dataset controller tests to testcontainers (#34423)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-03 02:28:47 +00:00
YBoy
71d299d0d3 refactor(api): type hit testing retrieve responses with TypedDict (#34484) 2026-04-03 02:25:30 +00:00
YBoy
e178451d04 refactor(api): type log identity dict with IdentityDict TypedDict (#34485) 2026-04-03 02:25:02 +00:00
YBoy
9a6222f245 refactor(api): type webhook data extraction with RawWebhookDataDict TypedDict (#34486) 2026-04-03 02:24:17 +00:00
YBoy
affe5ed30b refactor(api): type get_knowledge_rate_limit with KnowledgeRateLimitD… (#34483) 2026-04-03 02:23:32 +00:00
wangxiaolei
4cc5401d7e fix: fix import dsl failed (#34492)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-03 02:08:21 +00:00
Stephen Zhou
36e840cd87 chore: knip fix (#34481)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/amd64, ubuntu-latest, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/amd64, ubuntu-latest, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Skip Duplicate Checks (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / Run API Tests (push) Has been cancelled
Main CI Pipeline / Skip API Tests (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Run Web Tests (push) Has been cancelled
Main CI Pipeline / Skip Web Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Run Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Skip Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / Run VDB Tests (push) Has been cancelled
Main CI Pipeline / Skip VDB Tests (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / Run DB Migration Test (push) Has been cancelled
Main CI Pipeline / Skip DB Migration Test (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-02 15:03:42 +00:00
Tim Ren
985b41c40b fix(security): add tenant_id validation to prevent IDOR in data source binding (#34456)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/amd64, ubuntu-latest, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/amd64, ubuntu-latest, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Skip Duplicate Checks (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / Run API Tests (push) Has been cancelled
Main CI Pipeline / Skip API Tests (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Run Web Tests (push) Has been cancelled
Main CI Pipeline / Skip Web Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Run Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Skip Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / Run VDB Tests (push) Has been cancelled
Main CI Pipeline / Skip VDB Tests (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / Run DB Migration Test (push) Has been cancelled
Main CI Pipeline / Skip DB Migration Test (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 13:17:02 +00:00
lif
2e29ac2829 fix: remove redundant cast in MCP base session (#34461)
Signed-off-by: majiayu000 <1835304752@qq.com>
2026-04-02 12:36:21 +00:00
Renzo
dbfb474eab refactor: select in workflow_tools_manage_service (#34477) 2026-04-02 12:35:04 +00:00
Renzo
d243de26ec refactor: select in metadata_service (#34479) 2026-04-02 12:34:38 +00:00
Stephen Zhou
894826771a chore: clean up useless tailwind reference (#34478) 2026-04-02 11:45:19 +00:00
Asuka Minato
a3386da5d6 ci: Update pyrefly version to 0.59.1 (#34452)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-02 09:48:46 +00:00
99
318a3d0308 refactor(api): tighten login and wrapper typing (#34447) 2026-04-02 09:36:58 +00:00
Poojan
5bafb163cc test: add unit tests for services and tasks part-4 (#33223)
Co-authored-by: akashseth-ifp <akash.seth@infocusp.com>
Co-authored-by: rajatagarwal-oss <rajat.agarwal@infocusp.com>
Co-authored-by: Dev Sharma <50591491+cryptus-neoxys@users.noreply.github.com>
Co-authored-by: sahil-infocusp <73810410+sahil-infocusp@users.noreply.github.com>
2026-04-02 08:35:46 +00:00
Stephen Zhou
52b1bc5b09 refactor: split icon collections (#34453) 2026-04-02 07:58:15 +00:00
Stephen Zhou
1873b22e96 refactor: update to tailwind v4 (#34415)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: yyh <yuanyouhuilyz@gmail.com>
2026-04-02 07:06:11 +00:00
Akash Kumar
9a8c853a2e test: added unit test for remaining files in core helper folder (#33288)
Co-authored-by: rajatagarwal-oss <rajat.agarwal@infocusp.com>
Co-authored-by: sahil-infocusp <73810410+sahil-infocusp@users.noreply.github.com>
2026-04-02 06:50:58 +00:00
Akash Kumar
e54383d0fe test: added test for api/services/rag_pipeline folder (#33222)
Co-authored-by: sahil-infocusp <73810410+sahil-infocusp@users.noreply.github.com>
2026-04-02 06:40:52 +00:00
Sedo
43c48ba4d7 fix: add tenant/dataset ownership checks to prevent IDOR vulnerabilities (#34436)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/amd64, ubuntu-latest, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/amd64, ubuntu-latest, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Skip Duplicate Checks (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / Run API Tests (push) Has been cancelled
Main CI Pipeline / Skip API Tests (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Run Web Tests (push) Has been cancelled
Main CI Pipeline / Skip Web Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Run Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Skip Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / Run VDB Tests (push) Has been cancelled
Main CI Pipeline / Skip VDB Tests (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / Run DB Migration Test (push) Has been cancelled
Main CI Pipeline / Skip DB Migration Test (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-02 05:45:20 +00:00
99
8f9dbf269e chore(api): align Python support with 3.12 (#34419)
Co-authored-by: Asuka Minato <i@asukaminato.eu.org>
2026-04-02 05:07:32 +00:00
Renzo
cb9ee5903a refactor: select in tag_service (#34441)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-02 05:04:36 +00:00
99
cd406d2794 refactor(api): replace test fixture side-effect imports (#34421) 2026-04-02 04:55:15 +00:00
wangxiaolei
993a301468 fix: fix online_drive is not a valid datasource_type (#34440) 2026-04-02 04:45:02 +00:00
Renzo
399d3f8da5 refactor: model_load_balancing_service and api_tools_manage_service (#34434)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-02 04:38:35 +00:00
yyh
f9d9ad7a38 refactor(web): migrate remaining toast usage (#34433) 2026-04-02 04:16:50 +00:00
YBoy
2d29345f26 refactor(api): type OpsTraceProviderConfigMap with TracingProviderCon… (#34424) 2026-04-02 01:47:08 +00:00
dependabot[bot]
725f9e3dc4 chore(deps): bump aiohttp from 3.13.3 to 3.13.4 in /api (#34425)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-02 00:33:09 +00:00
2079 changed files with 25986 additions and 19698 deletions

View File

@@ -64,7 +64,7 @@ export const useUpdateAccessMode = () => {
// Component only adds UI behavior.
updateAccessMode({ appId, mode }, {
onSuccess: () => Toast.notify({ type: 'success', message: '...' }),
onSuccess: () => toast.success('...'),
})
// Avoid putting invalidation knowledge in the component.
@@ -114,10 +114,7 @@ try {
router.push(`/orders/${order.id}`)
}
catch (error) {
Toast.notify({
type: 'error',
message: error instanceof Error ? error.message : 'Unknown error',
})
toast.error(error instanceof Error ? error.message : 'Unknown error')
}
```

2
.gitignore vendored
View File

@@ -212,7 +212,7 @@ api/.vscode
# pnpm
/.pnpm-store
/node_modules
node_modules
.vite-hooks/_
# plugin migrate

1
.npmrc Normal file
View File

@@ -0,0 +1 @@
save-exact=true

View File

@@ -89,6 +89,12 @@ if $web_modified; then
echo "No staged TypeScript changes detected, skipping type-check:tsgo"
fi
echo "Running knip"
if ! pnpm run knip; then
echo "Knip check failed. Please run 'pnpm run knip' to fix the errors."
exit 1
fi
echo "Running unit tests check"
modified_files=$(git diff --cached --name-only -- utils | grep -v '\.spec\.ts$' || true)

View File

@@ -115,12 +115,6 @@ ignore = [
"controllers/console/human_input_form.py" = ["TID251"]
"controllers/web/human_input_form.py" = ["TID251"]
[lint.pyflakes]
allowed-unused-imports = [
"tests.integration_tests",
"tests.unit_tests",
]
[lint.flake8-tidy-imports]
[lint.flake8-tidy-imports.banned-api."flask_restx.reqparse"]

View File

@@ -10,7 +10,7 @@ import threading
from abc import ABC, abstractmethod
from collections.abc import Callable, Generator
from contextlib import AbstractContextManager, contextmanager
from typing import Any, Protocol, TypeVar, final, runtime_checkable
from typing import Any, Protocol, final, runtime_checkable
from pydantic import BaseModel
@@ -188,8 +188,6 @@ class ExecutionContextBuilder:
_capturer: Callable[[], IExecutionContext] | None = None
_tenant_context_providers: dict[tuple[str, str], Callable[[], BaseModel]] = {}
T = TypeVar("T", bound=BaseModel)
class ContextProviderNotFoundError(KeyError):
"""Raised when a tenant-scoped context provider is missing."""

View File

@@ -1,7 +1,4 @@
from contextvars import ContextVar
from typing import Generic, TypeVar
T = TypeVar("T")
class HiddenValue:
@@ -11,7 +8,7 @@ class HiddenValue:
_default = HiddenValue()
class RecyclableContextVar(Generic[T]):
class RecyclableContextVar[T]:
"""
RecyclableContextVar is a wrapper around ContextVar
It's safe to use in gunicorn with thread recycling, but features like `reset` are not available for now

View File

@@ -1,14 +1,14 @@
from __future__ import annotations
from typing import Any, TypeAlias
from typing import Any
from graphon.file import helpers as file_helpers
from pydantic import BaseModel, ConfigDict, computed_field
from models.model import IconType
JSONValue: TypeAlias = str | int | float | bool | None | dict[str, Any] | list[Any]
JSONObject: TypeAlias = dict[str, Any]
type JSONValue = str | int | float | bool | None | dict[str, Any] | list[Any]
type JSONObject = dict[str, Any]
class SystemParameters(BaseModel):

View File

@@ -2,7 +2,6 @@ import csv
import io
from collections.abc import Callable
from functools import wraps
from typing import ParamSpec, TypeVar
from flask import request
from flask_restx import Resource
@@ -20,9 +19,6 @@ from libs.token import extract_access_token
from models.model import App, ExporleBanner, InstalledApp, RecommendedApp, TrialApp
from services.billing_service import BillingService
P = ParamSpec("P")
R = TypeVar("R")
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
@@ -72,9 +68,9 @@ console_ns.schema_model(
)
def admin_required(view: Callable[P, R]):
def admin_required[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
if not dify_config.ADMIN_API_KEY:
raise Unauthorized("API key is invalid.")

View File

@@ -1,7 +1,7 @@
import logging
import uuid
from datetime import datetime
from typing import Any, Literal, TypeAlias
from typing import Any, Literal
from flask import request
from flask_restx import Resource
@@ -152,7 +152,7 @@ class AppTracePayload(BaseModel):
return value
JSONValue: TypeAlias = Any
type JSONValue = Any
class ResponseModel(BaseModel):

View File

@@ -1,6 +1,6 @@
from flask_restx import Resource, fields, marshal_with
from pydantic import BaseModel, Field
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import Session, sessionmaker
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import (
@@ -71,7 +71,7 @@ class AppImportApi(Resource):
args = AppImportPayload.model_validate(console_ns.payload)
# Create service with session
with sessionmaker(db.engine).begin() as session:
with Session(db.engine) as session:
import_service = AppDslService(session)
# Import app
account = current_user

View File

@@ -1,7 +1,7 @@
import logging
from collections.abc import Callable
from functools import wraps
from typing import Any, NoReturn, ParamSpec, TypeVar
from typing import Any
from flask import Response, request
from flask_restx import Resource, fields, marshal, marshal_with
@@ -192,11 +192,8 @@ workflow_draft_variable_list_model = console_ns.model(
"WorkflowDraftVariableList", workflow_draft_variable_list_fields_copy
)
P = ParamSpec("P")
R = TypeVar("R")
def _api_prerequisite(f: Callable[P, R]):
def _api_prerequisite[**P, R](f: Callable[P, R]) -> Callable[P, R | Response]:
"""Common prerequisites for all draft workflow variable APIs.
It ensures the following conditions are satisfied:
@@ -213,7 +210,7 @@ def _api_prerequisite(f: Callable[P, R]):
@edit_permission_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@wraps(f)
def wrapper(*args: P.args, **kwargs: P.kwargs):
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R | Response:
return f(*args, **kwargs)
return wrapper
@@ -270,7 +267,7 @@ class WorkflowVariableCollectionApi(Resource):
return Response("", 204)
def validate_node_id(node_id: str) -> NoReturn | None:
def validate_node_id(node_id: str) -> None:
if node_id in [
CONVERSATION_VARIABLE_NODE_ID,
SYSTEM_VARIABLE_NODE_ID,
@@ -285,7 +282,6 @@ def validate_node_id(node_id: str) -> NoReturn | None:
raise InvalidArgumentError(
f"invalid node_id, please use correspond api for conversation and system variables, node_id={node_id}",
)
return None
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/variables")

View File

@@ -1,6 +1,6 @@
from collections.abc import Callable
from functools import wraps
from typing import ParamSpec, TypeVar, Union
from typing import overload
from sqlalchemy import select
@@ -9,11 +9,6 @@ from extensions.ext_database import db
from libs.login import current_account_with_tenant
from models import App, AppMode
P = ParamSpec("P")
R = TypeVar("R")
P1 = ParamSpec("P1")
R1 = TypeVar("R1")
def _load_app_model(app_id: str) -> App | None:
_, current_tenant_id = current_account_with_tenant()
@@ -28,10 +23,30 @@ def _load_app_model_with_trial(app_id: str) -> App | None:
return app_model
def get_app_model(view: Callable[P, R] | None = None, *, mode: Union[AppMode, list[AppMode], None] = None):
def decorator(view_func: Callable[P1, R1]):
@overload
def get_app_model[**P, R](
view: Callable[P, R],
*,
mode: AppMode | list[AppMode] | None = None,
) -> Callable[P, R]: ...
@overload
def get_app_model[**P, R](
view: None = None,
*,
mode: AppMode | list[AppMode] | None = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]: ...
def get_app_model[**P, R](
view: Callable[P, R] | None = None,
*,
mode: AppMode | list[AppMode] | None = None,
) -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]:
def decorator(view_func: Callable[P, R]) -> Callable[P, R]:
@wraps(view_func)
def decorated_view(*args: P1.args, **kwargs: P1.kwargs):
def decorated_view(*args: P.args, **kwargs: P.kwargs) -> R:
if not kwargs.get("app_id"):
raise ValueError("missing app_id in path parameters")
@@ -69,10 +84,30 @@ def get_app_model(view: Callable[P, R] | None = None, *, mode: Union[AppMode, li
return decorator(view)
def get_app_model_with_trial(view: Callable[P, R] | None = None, *, mode: Union[AppMode, list[AppMode], None] = None):
def decorator(view_func: Callable[P, R]):
@overload
def get_app_model_with_trial[**P, R](
view: Callable[P, R],
*,
mode: AppMode | list[AppMode] | None = None,
) -> Callable[P, R]: ...
@overload
def get_app_model_with_trial[**P, R](
view: None = None,
*,
mode: AppMode | list[AppMode] | None = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]: ...
def get_app_model_with_trial[**P, R](
view: Callable[P, R] | None = None,
*,
mode: AppMode | list[AppMode] | None = None,
) -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]:
def decorator(view_func: Callable[P, R]) -> Callable[P, R]:
@wraps(view_func)
def decorated_view(*args: P.args, **kwargs: P.kwargs):
def decorated_view(*args: P.args, **kwargs: P.kwargs) -> R:
if not kwargs.get("app_id"):
raise ValueError("missing app_id in path parameters")

View File

@@ -1,8 +1,9 @@
from collections.abc import Callable
from functools import wraps
from typing import Concatenate, ParamSpec, TypeVar
from typing import Concatenate
from flask import jsonify, request
from flask.typing import ResponseReturnValue
from flask_restx import Resource
from graphon.model_runtime.utils.encoders import jsonable_encoder
from pydantic import BaseModel
@@ -16,10 +17,6 @@ from services.oauth_server import OAUTH_ACCESS_TOKEN_EXPIRES_IN, OAuthGrantType,
from .. import console_ns
P = ParamSpec("P")
R = TypeVar("R")
T = TypeVar("T")
class OAuthClientPayload(BaseModel):
client_id: str
@@ -39,9 +36,11 @@ class OAuthTokenRequest(BaseModel):
refresh_token: str | None = None
def oauth_server_client_id_required(view: Callable[Concatenate[T, OAuthProviderApp, P], R]):
def oauth_server_client_id_required[T, **P, R](
view: Callable[Concatenate[T, OAuthProviderApp, P], R],
) -> Callable[Concatenate[T, P], R]:
@wraps(view)
def decorated(self: T, *args: P.args, **kwargs: P.kwargs):
def decorated(self: T, *args: P.args, **kwargs: P.kwargs) -> R:
json_data = request.get_json()
if json_data is None:
raise BadRequest("client_id is required")
@@ -58,9 +57,13 @@ def oauth_server_client_id_required(view: Callable[Concatenate[T, OAuthProviderA
return decorated
def oauth_server_access_token_required(view: Callable[Concatenate[T, OAuthProviderApp, Account, P], R]):
def oauth_server_access_token_required[T, **P, R](
view: Callable[Concatenate[T, OAuthProviderApp, Account, P], R],
) -> Callable[Concatenate[T, OAuthProviderApp, P], R | ResponseReturnValue]:
@wraps(view)
def decorated(self: T, oauth_provider_app: OAuthProviderApp, *args: P.args, **kwargs: P.kwargs):
def decorated(
self: T, oauth_provider_app: OAuthProviderApp, *args: P.args, **kwargs: P.kwargs
) -> R | ResponseReturnValue:
if not isinstance(oauth_provider_app, OAuthProviderApp):
raise BadRequest("Invalid oauth_provider_app")

View File

@@ -158,10 +158,11 @@ class DataSourceApi(Resource):
@login_required
@account_initialization_required
def patch(self, binding_id, action: Literal["enable", "disable"]):
_, current_tenant_id = current_account_with_tenant()
binding_id = str(binding_id)
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
data_source_binding = session.execute(
select(DataSourceOauthBinding).filter_by(id=binding_id)
select(DataSourceOauthBinding).filter_by(id=binding_id, tenant_id=current_tenant_id)
).scalar_one_or_none()
if data_source_binding is None:
raise NotFound("Data source binding not found.")

View File

@@ -173,8 +173,11 @@ class ExternalApiTemplateApi(Resource):
@login_required
@account_initialization_required
def get(self, external_knowledge_api_id):
_, current_tenant_id = current_account_with_tenant()
external_knowledge_api_id = str(external_knowledge_api_id)
external_knowledge_api = ExternalDatasetService.get_external_knowledge_api(external_knowledge_api_id)
external_knowledge_api = ExternalDatasetService.get_external_knowledge_api(
external_knowledge_api_id, current_tenant_id
)
if external_knowledge_api is None:
raise NotFound("API template not found.")

View File

@@ -1,4 +1,5 @@
import logging
from collections.abc import Callable
from typing import Any, NoReturn
from flask import Response, request
@@ -55,7 +56,7 @@ class WorkflowDraftVariablePatchPayload(BaseModel):
register_schema_models(console_ns, WorkflowDraftVariablePatchPayload)
def _api_prerequisite(f):
def _api_prerequisite[**P, R](f: Callable[P, R]) -> Callable[P, R | Response]:
"""Common prerequisites for all draft workflow variable APIs.
It ensures the following conditions are satisfied:
@@ -70,7 +71,7 @@ def _api_prerequisite(f):
@login_required
@account_initialization_required
@get_rag_pipeline
def wrapper(*args, **kwargs):
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R | Response:
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
return f(*args, **kwargs)

View File

@@ -1,6 +1,5 @@
from collections.abc import Callable
from functools import wraps
from typing import ParamSpec, TypeVar
from sqlalchemy import select
@@ -9,13 +8,10 @@ from extensions.ext_database import db
from libs.login import current_account_with_tenant
from models.dataset import Pipeline
P = ParamSpec("P")
R = TypeVar("R")
def get_rag_pipeline(view_func: Callable[P, R]):
def get_rag_pipeline[**P, R](view_func: Callable[P, R]) -> Callable[P, R]:
@wraps(view_func)
def decorated_view(*args: P.args, **kwargs: P.kwargs):
def decorated_view(*args: P.args, **kwargs: P.kwargs) -> R:
if not kwargs.get("pipeline_id"):
raise ValueError("missing pipeline_id in path parameters")

View File

@@ -1,6 +1,6 @@
from collections.abc import Callable
from functools import wraps
from typing import Concatenate, ParamSpec, TypeVar
from typing import Concatenate
from flask import abort
from flask_restx import Resource
@@ -15,12 +15,8 @@ from models import AccountTrialAppRecord, App, InstalledApp, TrialApp
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
P = ParamSpec("P")
R = TypeVar("R")
T = TypeVar("T")
def installed_app_required(view: Callable[Concatenate[InstalledApp, P], R] | None = None):
def installed_app_required[**P, R](view: Callable[Concatenate[InstalledApp, P], R] | None = None):
def decorator(view: Callable[Concatenate[InstalledApp, P], R]):
@wraps(view)
def decorated(installed_app_id: str, *args: P.args, **kwargs: P.kwargs):
@@ -49,7 +45,7 @@ def installed_app_required(view: Callable[Concatenate[InstalledApp, P], R] | Non
return decorator
def user_allowed_to_access_app(view: Callable[Concatenate[InstalledApp, P], R] | None = None):
def user_allowed_to_access_app[**P, R](view: Callable[Concatenate[InstalledApp, P], R] | None = None):
def decorator(view: Callable[Concatenate[InstalledApp, P], R]):
@wraps(view)
def decorated(installed_app: InstalledApp, *args: P.args, **kwargs: P.kwargs):
@@ -73,7 +69,7 @@ def user_allowed_to_access_app(view: Callable[Concatenate[InstalledApp, P], R] |
return decorator
def trial_app_required(view: Callable[Concatenate[App, P], R] | None = None):
def trial_app_required[**P, R](view: Callable[Concatenate[App, P], R] | None = None):
def decorator(view: Callable[Concatenate[App, P], R]):
@wraps(view)
def decorated(app_id: str, *args: P.args, **kwargs: P.kwargs):
@@ -106,7 +102,7 @@ def trial_app_required(view: Callable[Concatenate[App, P], R] | None = None):
return decorator
def trial_feature_enable(view: Callable[P, R]):
def trial_feature_enable[**P, R](view: Callable[P, R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
features = FeatureService.get_system_features()
@@ -117,7 +113,7 @@ def trial_feature_enable(view: Callable[P, R]):
return decorated
def explore_banner_enabled(view: Callable[P, R]):
def explore_banner_enabled[**P, R](view: Callable[P, R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
features = FeatureService.get_system_features()

View File

@@ -1,6 +1,5 @@
from collections.abc import Callable
from functools import wraps
from typing import ParamSpec, TypeVar
from sqlalchemy.orm import sessionmaker
from werkzeug.exceptions import Forbidden
@@ -9,17 +8,14 @@ from extensions.ext_database import db
from libs.login import current_account_with_tenant
from models.account import TenantPluginPermission
P = ParamSpec("P")
R = TypeVar("R")
def plugin_permission_required(
install_required: bool = False,
debug_required: bool = False,
):
def interceptor(view: Callable[P, R]):
def interceptor[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
current_user, current_tenant_id = current_account_with_tenant()
user = current_user
tenant_id = current_tenant_id

View File

@@ -4,7 +4,6 @@ import os
import time
from collections.abc import Callable
from functools import wraps
from typing import ParamSpec, TypeVar
from flask import abort, request
from sqlalchemy import select
@@ -25,9 +24,6 @@ from services.operation_service import OperationService
from .error import NotInitValidateError, NotSetupError, UnauthorizedAndForceLogout
P = ParamSpec("P")
R = TypeVar("R")
# Field names for decryption
FIELD_NAME_PASSWORD = "password"
FIELD_NAME_CODE = "code"
@@ -37,7 +33,7 @@ ERROR_MSG_INVALID_ENCRYPTED_DATA = "Invalid encrypted data"
ERROR_MSG_INVALID_ENCRYPTED_CODE = "Invalid encrypted code"
def account_initialization_required(view: Callable[P, R]) -> Callable[P, R]:
def account_initialization_required[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
# check account initialization
@@ -50,7 +46,7 @@ def account_initialization_required(view: Callable[P, R]) -> Callable[P, R]:
return decorated
def only_edition_cloud(view: Callable[P, R]):
def only_edition_cloud[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
if dify_config.EDITION != "CLOUD":
@@ -61,7 +57,7 @@ def only_edition_cloud(view: Callable[P, R]):
return decorated
def only_edition_enterprise(view: Callable[P, R]):
def only_edition_enterprise[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
if not dify_config.ENTERPRISE_ENABLED:
@@ -72,7 +68,7 @@ def only_edition_enterprise(view: Callable[P, R]):
return decorated
def only_edition_self_hosted(view: Callable[P, R]):
def only_edition_self_hosted[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
if dify_config.EDITION != "SELF_HOSTED":
@@ -83,7 +79,7 @@ def only_edition_self_hosted(view: Callable[P, R]):
return decorated
def cloud_edition_billing_enabled(view: Callable[P, R]):
def cloud_edition_billing_enabled[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
_, current_tenant_id = current_account_with_tenant()
@@ -95,7 +91,7 @@ def cloud_edition_billing_enabled(view: Callable[P, R]):
return decorated
def cloud_edition_billing_resource_check(resource: str):
def cloud_edition_billing_resource_check[**P, R](resource: str) -> Callable[[Callable[P, R]], Callable[P, R]]:
def interceptor(view: Callable[P, R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
@@ -137,7 +133,9 @@ def cloud_edition_billing_resource_check(resource: str):
return interceptor
def cloud_edition_billing_knowledge_limit_check(resource: str):
def cloud_edition_billing_knowledge_limit_check[**P, R](
resource: str,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
def interceptor(view: Callable[P, R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
@@ -160,7 +158,7 @@ def cloud_edition_billing_knowledge_limit_check(resource: str):
return interceptor
def cloud_edition_billing_rate_limit_check(resource: str):
def cloud_edition_billing_rate_limit_check[**P, R](resource: str) -> Callable[[Callable[P, R]], Callable[P, R]]:
def interceptor(view: Callable[P, R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
@@ -196,7 +194,7 @@ def cloud_edition_billing_rate_limit_check(resource: str):
return interceptor
def cloud_utm_record(view: Callable[P, R]):
def cloud_utm_record[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
with contextlib.suppress(Exception):
@@ -215,7 +213,7 @@ def cloud_utm_record(view: Callable[P, R]):
return decorated
def setup_required(view: Callable[P, R]) -> Callable[P, R]:
def setup_required[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
# check setup
@@ -229,7 +227,7 @@ def setup_required(view: Callable[P, R]) -> Callable[P, R]:
return decorated
def enterprise_license_required(view: Callable[P, R]):
def enterprise_license_required[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
settings = FeatureService.get_system_features()
@@ -241,7 +239,7 @@ def enterprise_license_required(view: Callable[P, R]):
return decorated
def email_password_login_enabled(view: Callable[P, R]):
def email_password_login_enabled[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
features = FeatureService.get_system_features()
@@ -254,7 +252,7 @@ def email_password_login_enabled(view: Callable[P, R]):
return decorated
def email_register_enabled(view: Callable[P, R]):
def email_register_enabled[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
features = FeatureService.get_system_features()
@@ -267,7 +265,7 @@ def email_register_enabled(view: Callable[P, R]):
return decorated
def enable_change_email(view: Callable[P, R]):
def enable_change_email[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
features = FeatureService.get_system_features()
@@ -280,7 +278,7 @@ def enable_change_email(view: Callable[P, R]):
return decorated
def is_allow_transfer_owner(view: Callable[P, R]):
def is_allow_transfer_owner[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
from libs.workspace_permission import check_workspace_owner_transfer_permission
@@ -293,7 +291,7 @@ def is_allow_transfer_owner(view: Callable[P, R]):
return decorated
def knowledge_pipeline_publish_enabled(view: Callable[P, R]):
def knowledge_pipeline_publish_enabled[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
_, current_tenant_id = current_account_with_tenant()
@@ -305,7 +303,7 @@ def knowledge_pipeline_publish_enabled(view: Callable[P, R]):
return decorated
def edit_permission_required(f: Callable[P, R]):
def edit_permission_required[**P, R](f: Callable[P, R]) -> Callable[P, R]:
@wraps(f)
def decorated_function(*args: P.args, **kwargs: P.kwargs):
from werkzeug.exceptions import Forbidden
@@ -323,7 +321,7 @@ def edit_permission_required(f: Callable[P, R]):
return decorated_function
def is_admin_or_owner_required(f: Callable[P, R]):
def is_admin_or_owner_required[**P, R](f: Callable[P, R]) -> Callable[P, R]:
@wraps(f)
def decorated_function(*args: P.args, **kwargs: P.kwargs):
from werkzeug.exceptions import Forbidden
@@ -339,7 +337,7 @@ def is_admin_or_owner_required(f: Callable[P, R]):
return decorated_function
def annotation_import_rate_limit(view: Callable[P, R]):
def annotation_import_rate_limit[**P, R](view: Callable[P, R]) -> Callable[P, R]:
"""
Rate limiting decorator for annotation import operations.
@@ -388,7 +386,7 @@ def annotation_import_rate_limit(view: Callable[P, R]):
return decorated
def annotation_import_concurrency_limit(view: Callable[P, R]):
def annotation_import_concurrency_limit[**P, R](view: Callable[P, R]) -> Callable[P, R]:
"""
Concurrency control decorator for annotation import operations.
@@ -455,7 +453,7 @@ def _decrypt_field(field_name: str, error_class: type[Exception], error_message:
payload[field_name] = decoded_value
def decrypt_password_field(view: Callable[P, R]):
def decrypt_password_field[**P, R](view: Callable[P, R]) -> Callable[P, R]:
"""
Decorator to decrypt password field in request payload.
@@ -477,7 +475,7 @@ def decrypt_password_field(view: Callable[P, R]):
return decorated
def decrypt_code_field(view: Callable[P, R]):
def decrypt_code_field[**P, R](view: Callable[P, R]) -> Callable[P, R]:
"""
Decorator to decrypt verification code field in request payload.

View File

@@ -1,6 +1,5 @@
from collections.abc import Callable
from functools import wraps
from typing import ParamSpec, TypeVar
from flask import current_app, request
from flask_login import user_logged_in
@@ -13,9 +12,6 @@ from libs.login import current_user
from models.account import Tenant
from models.model import DefaultEndUserSessionID, EndUser
P = ParamSpec("P")
R = TypeVar("R")
class TenantUserPayload(BaseModel):
tenant_id: str
@@ -65,9 +61,9 @@ def get_user(tenant_id: str, user_id: str | None) -> EndUser:
return user_model
def get_user_tenant(view_func: Callable[P, R]):
def get_user_tenant[**P, R](view_func: Callable[P, R]) -> Callable[P, R]:
@wraps(view_func)
def decorated_view(*args: P.args, **kwargs: P.kwargs):
def decorated_view(*args: P.args, **kwargs: P.kwargs) -> R:
payload = TenantUserPayload.model_validate(request.get_json(silent=True) or {})
user_id = payload.user_id
@@ -97,10 +93,14 @@ def get_user_tenant(view_func: Callable[P, R]):
return decorated_view
def plugin_data(view: Callable[P, R] | None = None, *, payload_type: type[BaseModel]):
def decorator(view_func: Callable[P, R]):
def plugin_data[**P, R](
view: Callable[P, R] | None = None,
*,
payload_type: type[BaseModel],
) -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]:
def decorator(view_func: Callable[P, R]) -> Callable[P, R]:
@wraps(view_func)
def decorated_view(*args: P.args, **kwargs: P.kwargs):
def decorated_view(*args: P.args, **kwargs: P.kwargs) -> R:
try:
data = request.get_json()
except Exception:

View File

@@ -3,10 +3,7 @@ from collections.abc import Callable
from functools import wraps
from hashlib import sha1
from hmac import new as hmac_new
from typing import ParamSpec, TypeVar
P = ParamSpec("P")
R = TypeVar("R")
from flask import abort, request
from configs import dify_config
@@ -14,9 +11,9 @@ from extensions.ext_database import db
from models.model import EndUser
def billing_inner_api_only(view: Callable[P, R]):
def billing_inner_api_only[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
if not dify_config.INNER_API:
abort(404)
@@ -30,9 +27,9 @@ def billing_inner_api_only(view: Callable[P, R]):
return decorated
def enterprise_inner_api_only(view: Callable[P, R]):
def enterprise_inner_api_only[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
if not dify_config.INNER_API:
abort(404)
@@ -46,9 +43,9 @@ def enterprise_inner_api_only(view: Callable[P, R]):
return decorated
def enterprise_inner_api_user_auth(view: Callable[P, R]):
def enterprise_inner_api_user_auth[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
if not dify_config.INNER_API:
return view(*args, **kwargs)
@@ -82,9 +79,9 @@ def enterprise_inner_api_user_auth(view: Callable[P, R]):
return decorated
def plugin_inner_api_only(view: Callable[P, R]):
def plugin_inner_api_only[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
if not dify_config.PLUGIN_DAEMON_KEY:
abort(404)

View File

@@ -1,9 +1,10 @@
import inspect
import logging
import time
from collections.abc import Callable
from enum import StrEnum, auto
from functools import wraps
from typing import Concatenate, ParamSpec, TypeVar, cast, overload
from typing import cast, overload
from flask import current_app, request
from flask_login import user_logged_in
@@ -23,10 +24,6 @@ from services.api_token_service import ApiTokenCache, fetch_token_with_single_fl
from services.end_user_service import EndUserService
from services.feature_service import FeatureService
P = ParamSpec("P")
R = TypeVar("R")
T = TypeVar("T")
logger = logging.getLogger(__name__)
@@ -46,16 +43,16 @@ class FetchUserArg(BaseModel):
@overload
def validate_app_token(view: Callable[P, R]) -> Callable[P, R]: ...
def validate_app_token[**P, R](view: Callable[P, R]) -> Callable[P, R]: ...
@overload
def validate_app_token(
def validate_app_token[**P, R](
view: None = None, *, fetch_user_arg: FetchUserArg | None = None
) -> Callable[[Callable[P, R]], Callable[P, R]]: ...
def validate_app_token(
def validate_app_token[**P, R](
view: Callable[P, R] | None = None, *, fetch_user_arg: FetchUserArg | None = None
) -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]:
def decorator(view_func: Callable[P, R]) -> Callable[P, R]:
@@ -136,7 +133,10 @@ def validate_app_token(
return decorator(view)
def cloud_edition_billing_resource_check(resource: str, api_token_type: str):
def cloud_edition_billing_resource_check[**P, R](
resource: str,
api_token_type: str,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
def interceptor(view: Callable[P, R]):
def decorated(*args: P.args, **kwargs: P.kwargs):
api_token = validate_and_get_api_token(api_token_type)
@@ -166,7 +166,10 @@ def cloud_edition_billing_resource_check(resource: str, api_token_type: str):
return interceptor
def cloud_edition_billing_knowledge_limit_check(resource: str, api_token_type: str):
def cloud_edition_billing_knowledge_limit_check[**P, R](
resource: str,
api_token_type: str,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
def interceptor(view: Callable[P, R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
@@ -188,7 +191,10 @@ def cloud_edition_billing_knowledge_limit_check(resource: str, api_token_type: s
return interceptor
def cloud_edition_billing_rate_limit_check(resource: str, api_token_type: str):
def cloud_edition_billing_rate_limit_check[**P, R](
resource: str,
api_token_type: str,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
def interceptor(view: Callable[P, R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
@@ -225,99 +231,73 @@ def cloud_edition_billing_rate_limit_check(resource: str, api_token_type: str):
return interceptor
@overload
def validate_dataset_token(view: Callable[Concatenate[T, P], R]) -> Callable[P, R]: ...
def validate_dataset_token[R](view: Callable[..., R]) -> Callable[..., R]:
positional_parameters = [
parameter
for parameter in inspect.signature(view).parameters.values()
if parameter.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD)
]
expects_bound_instance = bool(positional_parameters and positional_parameters[0].name in {"self", "cls"})
@wraps(view)
def decorated(*args: object, **kwargs: object) -> R:
api_token = validate_and_get_api_token("dataset")
@overload
def validate_dataset_token(view: None = None) -> Callable[[Callable[Concatenate[T, P], R]], Callable[P, R]]: ...
# Flask may pass URL path parameters positionally, so inspect both kwargs and args.
dataset_id = kwargs.get("dataset_id")
if not dataset_id and args:
potential_id = args[0]
try:
str_id = str(potential_id)
if len(str_id) == 36 and str_id.count("-") == 4:
dataset_id = str_id
except Exception:
logger.exception("Failed to parse dataset_id from positional args")
def validate_dataset_token(
view: Callable[Concatenate[T, P], R] | None = None,
) -> Callable[P, R] | Callable[[Callable[Concatenate[T, P], R]], Callable[P, R]]:
def decorator(view_func: Callable[Concatenate[T, P], R]) -> Callable[P, R]:
@wraps(view_func)
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
api_token = validate_and_get_api_token("dataset")
# get url path dataset_id from positional args or kwargs
# Flask passes URL path parameters as positional arguments
dataset_id = None
# First try to get from kwargs (explicit parameter)
dataset_id = kwargs.get("dataset_id")
# If not in kwargs, try to extract from positional args
if not dataset_id and args:
# For class methods: args[0] is self, args[1] is dataset_id (if exists)
# Check if first arg is likely a class instance (has __dict__ or __class__)
if len(args) > 1 and hasattr(args[0], "__dict__"):
# This is a class method, dataset_id should be in args[1]
potential_id = args[1]
# Validate it's a string-like UUID, not another object
try:
# Try to convert to string and check if it's a valid UUID format
str_id = str(potential_id)
# Basic check: UUIDs are 36 chars with hyphens
if len(str_id) == 36 and str_id.count("-") == 4:
dataset_id = str_id
except Exception:
logger.exception("Failed to parse dataset_id from class method args")
elif len(args) > 0:
# Not a class method, check if args[0] looks like a UUID
potential_id = args[0]
try:
str_id = str(potential_id)
if len(str_id) == 36 and str_id.count("-") == 4:
dataset_id = str_id
except Exception:
logger.exception("Failed to parse dataset_id from positional args")
# Validate dataset if dataset_id is provided
if dataset_id:
dataset_id = str(dataset_id)
dataset = db.session.scalar(
select(Dataset)
.where(
Dataset.id == dataset_id,
Dataset.tenant_id == api_token.tenant_id,
)
.limit(1)
if dataset_id:
dataset_id = str(dataset_id)
dataset = db.session.scalar(
select(Dataset)
.where(
Dataset.id == dataset_id,
Dataset.tenant_id == api_token.tenant_id,
)
if not dataset:
raise NotFound("Dataset not found.")
if not dataset.enable_api:
raise Forbidden("Dataset api access is not enabled.")
tenant_account_join = db.session.execute(
select(Tenant, TenantAccountJoin)
.where(Tenant.id == api_token.tenant_id)
.where(TenantAccountJoin.tenant_id == Tenant.id)
.where(TenantAccountJoin.role.in_(["owner"]))
.where(Tenant.status == TenantStatus.NORMAL)
).one_or_none() # TODO: only owner information is required, so only one is returned.
if tenant_account_join:
tenant, ta = tenant_account_join
account = db.session.get(Account, ta.account_id)
# Login admin
if account:
account.current_tenant = tenant
current_app.login_manager._update_request_context_with_user(account) # type: ignore
user_logged_in.send(current_app._get_current_object(), user=current_user) # type: ignore
else:
raise Unauthorized("Tenant owner account does not exist.")
.limit(1)
)
if not dataset:
raise NotFound("Dataset not found.")
if not dataset.enable_api:
raise Forbidden("Dataset api access is not enabled.")
tenant_account_join = db.session.execute(
select(Tenant, TenantAccountJoin)
.where(Tenant.id == api_token.tenant_id)
.where(TenantAccountJoin.tenant_id == Tenant.id)
.where(TenantAccountJoin.role.in_(["owner"]))
.where(Tenant.status == TenantStatus.NORMAL)
).one_or_none() # TODO: only owner information is required, so only one is returned.
if tenant_account_join:
tenant, ta = tenant_account_join
account = db.session.get(Account, ta.account_id)
# Login admin
if account:
account.current_tenant = tenant
current_app.login_manager._update_request_context_with_user(account) # type: ignore
user_logged_in.send(current_app._get_current_object(), user=current_user) # type: ignore
else:
raise Unauthorized("Tenant does not exist.")
return view_func(api_token.tenant_id, *args, **kwargs) # type: ignore[arg-type]
raise Unauthorized("Tenant owner account does not exist.")
else:
raise Unauthorized("Tenant does not exist.")
return decorated
if expects_bound_instance:
if not args:
raise TypeError("validate_dataset_token expected a bound resource instance.")
return view(args[0], api_token.tenant_id, *args[1:], **kwargs)
if view:
return decorator(view)
return view(api_token.tenant_id, *args, **kwargs)
# if view is None, it means that the decorator is used without parentheses
# use the decorator as a function for method_decorators
return decorator
return decorated
def validate_and_get_api_token(scope: str | None = None):

View File

@@ -7,7 +7,7 @@ from werkzeug.exceptions import NotFound, RequestEntityTooLarge
from controllers.trigger import bp
from core.trigger.debug.event_bus import TriggerDebugEventBus
from core.trigger.debug.events import WebhookDebugEvent, build_webhook_pool_key
from services.trigger.webhook_service import WebhookService
from services.trigger.webhook_service import RawWebhookDataDict, WebhookService
logger = logging.getLogger(__name__)
@@ -23,6 +23,7 @@ def _prepare_webhook_execution(webhook_id: str, is_debug: bool = False):
webhook_id, is_debug=is_debug
)
webhook_data: RawWebhookDataDict
try:
# Use new unified extraction and validation
webhook_data = WebhookService.extract_and_validate_webhook_data(webhook_trigger, node_config)

View File

@@ -1,7 +1,7 @@
from collections.abc import Callable
from datetime import UTC, datetime
from functools import wraps
from typing import Concatenate, ParamSpec, TypeVar
from typing import Concatenate
from flask import request
from flask_restx import Resource
@@ -20,14 +20,13 @@ from services.enterprise.enterprise_service import EnterpriseService, WebAppSett
from services.feature_service import FeatureService
from services.webapp_auth_service import WebAppAuthService
P = ParamSpec("P")
R = TypeVar("R")
def validate_jwt_token(view: Callable[Concatenate[App, EndUser, P], R] | None = None):
def decorator(view: Callable[Concatenate[App, EndUser, P], R]):
def validate_jwt_token[**P, R](
view: Callable[Concatenate[App, EndUser, P], R] | None = None,
) -> Callable[P, R] | Callable[[Callable[Concatenate[App, EndUser, P], R]], Callable[P, R]]:
def decorator(view: Callable[Concatenate[App, EndUser, P], R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
app_model, end_user = decode_jwt_token()
return view(app_model, end_user, *args, **kwargs)
@@ -38,7 +37,7 @@ def validate_jwt_token(view: Callable[Concatenate[App, EndUser, P], R] | None =
return decorator
def decode_jwt_token(app_code: str | None = None, user_id: str | None = None):
def decode_jwt_token(app_code: str | None = None, user_id: str | None = None) -> tuple[App, EndUser]:
system_features = FeatureService.get_system_features()
if not app_code:
app_code = str(request.headers.get(HEADER_NAME_APP_CODE))

View File

@@ -5,7 +5,7 @@ import logging
import threading
import uuid
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any, Literal, Union, overload
from typing import TYPE_CHECKING, Any, Literal, overload
from flask import Flask, current_app
from pydantic import ValidationError
@@ -68,7 +68,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
self,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
workflow_run_id: str,
@@ -81,7 +81,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
self,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
workflow_run_id: str,
@@ -94,7 +94,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
self,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
workflow_run_id: str,
@@ -106,7 +106,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
self,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
workflow_run_id: str,
@@ -239,7 +239,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
*,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
conversation: Conversation,
message: Message,
application_generate_entity: AdvancedChatAppGenerateEntity,
@@ -271,9 +271,9 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
workflow: Workflow,
node_id: str,
user: Account | EndUser,
args: Mapping,
args: Mapping[str, Any],
streaming: bool = True,
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], Any, None]:
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], None, None]:
"""
Generate App response.
@@ -359,7 +359,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
user: Account | EndUser,
args: LoopNodeRunPayload,
streaming: bool = True,
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], Any, None]:
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], None, None]:
"""
Generate App response.
@@ -439,7 +439,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
self,
*,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
invoke_from: InvokeFrom,
application_generate_entity: AdvancedChatAppGenerateEntity,
workflow_execution_repository: WorkflowExecutionRepository,
@@ -451,7 +451,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
pause_state_config: PauseStateLayerConfig | None = None,
graph_runtime_state: GraphRuntimeState | None = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], Any, None]:
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], None, None]:
"""
Generate App response.
@@ -653,10 +653,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
queue_manager: AppQueueManager,
conversation: ConversationSnapshot,
message: MessageSnapshot,
user: Union[Account, EndUser],
user: Account | EndUser,
draft_var_saver_factory: DraftVariableSaverFactory,
stream: bool = False,
) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]:
) -> ChatbotAppBlockingResponse | Generator[ChatbotAppStreamResponse, None, None]:
"""
Handle response.
:param application_generate_entity: application generate entity

View File

@@ -33,6 +33,7 @@ from core.moderation.base import ModerationError
from core.moderation.input_moderation import InputModeration
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import get_default_root_node_id
from core.workflow.runtime_state import create_graph_runtime_state
from core.workflow.system_variables import (
build_bootstrap_variables,
build_system_variables,
@@ -188,7 +189,11 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=new_inputs)
# init graph
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.time())
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.time(),
workflow_id=self._workflow.id,
)
graph = self._init_graph(
graph_config=self._workflow.graph_dict,
graph_runtime_state=graph_runtime_state,

View File

@@ -3,7 +3,7 @@ import logging
import threading
import uuid
from collections.abc import Generator, Mapping
from typing import Any, Literal, Union, overload
from typing import Any, Literal, overload
from flask import Flask, current_app
from graphon.model_runtime.errors.invoke import InvokeAuthorizationError
@@ -37,7 +37,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
self,
*,
app_model: App,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: Literal[False],
@@ -48,7 +48,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
self,
*,
app_model: App,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: Literal[True],
@@ -59,21 +59,21 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
self,
*,
app_model: App,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool,
) -> Union[Mapping, Generator[Mapping | str, None, None]]: ...
) -> Mapping | Generator[Mapping | str, None, None]: ...
def generate(
self,
*,
app_model: App,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool = True,
) -> Union[Mapping, Generator[Mapping | str, None, None]]:
) -> Mapping | Generator[Mapping | str, None, None]:
"""
Generate App response.

View File

@@ -3,7 +3,7 @@ import logging
import threading
import uuid
from collections.abc import Generator, Mapping
from typing import Any, Literal, Union, overload
from typing import Any, Literal, overload
from flask import Flask, copy_current_request_context, current_app
from graphon.model_runtime.errors.invoke import InvokeAuthorizationError
@@ -36,7 +36,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: Literal[True],
@@ -46,7 +46,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: Literal[False],
@@ -56,20 +56,20 @@ class ChatAppGenerator(MessageBasedAppGenerator):
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool,
) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]: ...
) -> Mapping[str, Any] | Generator[Mapping[str, Any] | str, None, None]: ...
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool = True,
) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]:
) -> Mapping[str, Any] | Generator[Mapping[str, Any] | str, None, None]:
"""
Generate App response.

View File

@@ -3,7 +3,7 @@ import logging
import threading
import uuid
from collections.abc import Generator, Mapping
from typing import Any, Literal, Union, overload
from typing import Any, Literal, overload
from flask import Flask, copy_current_request_context, current_app
from graphon.model_runtime.errors.invoke import InvokeAuthorizationError
@@ -36,7 +36,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: Literal[True],
@@ -46,7 +46,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: Literal[False],
@@ -56,20 +56,20 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool = False,
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]: ...
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], None, None]: ...
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool = True,
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], None, None]:
"""
Generate App response.
@@ -244,10 +244,10 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
self,
app_model: App,
message_id: str,
user: Union[Account, EndUser],
user: Account | EndUser,
invoke_from: InvokeFrom,
stream: bool = True,
) -> Union[Mapping, Generator[Mapping | str, None, None]]:
) -> Mapping | Generator[Mapping | str, None, None]:
"""
Generate App response.

View File

@@ -7,7 +7,7 @@ import threading
import time
import uuid
from collections.abc import Generator, Mapping
from typing import Any, Literal, Union, cast, overload
from typing import Any, Literal, cast, overload
from flask import Flask, current_app
from graphon.model_runtime.errors.invoke import InvokeAuthorizationError
@@ -62,7 +62,7 @@ class PipelineGenerator(BaseAppGenerator):
*,
pipeline: Pipeline,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: Literal[True],
@@ -77,7 +77,7 @@ class PipelineGenerator(BaseAppGenerator):
*,
pipeline: Pipeline,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: Literal[False],
@@ -92,28 +92,28 @@ class PipelineGenerator(BaseAppGenerator):
*,
pipeline: Pipeline,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool,
call_depth: int,
workflow_thread_pool_id: str | None,
is_retry: bool = False,
) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]: ...
) -> Mapping[str, Any] | Generator[Mapping | str, None, None]: ...
def generate(
self,
*,
pipeline: Pipeline,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool = True,
call_depth: int = 0,
workflow_thread_pool_id: str | None = None,
is_retry: bool = False,
) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None], None]:
) -> Mapping[str, Any] | Generator[Mapping | str, None, None] | None:
# Add null check for dataset
with Session(db.engine, expire_on_commit=False) as session:
@@ -278,7 +278,7 @@ class PipelineGenerator(BaseAppGenerator):
context: contextvars.Context,
pipeline: Pipeline,
workflow_id: str,
user: Union[Account, EndUser],
user: Account | EndUser,
application_generate_entity: RagPipelineGenerateEntity,
invoke_from: InvokeFrom,
workflow_execution_repository: WorkflowExecutionRepository,
@@ -286,7 +286,7 @@ class PipelineGenerator(BaseAppGenerator):
streaming: bool = True,
variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER,
workflow_thread_pool_id: str | None = None,
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], None, None]:
"""
Generate App response.
@@ -624,10 +624,10 @@ class PipelineGenerator(BaseAppGenerator):
application_generate_entity: RagPipelineGenerateEntity,
workflow: Workflow,
queue_manager: AppQueueManager,
user: Union[Account, EndUser],
user: Account | EndUser,
draft_var_saver_factory: DraftVariableSaverFactory,
stream: bool = False,
) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:
) -> WorkflowAppBlockingResponse | Generator[WorkflowAppStreamResponse, None, None]:
"""
Handle response.
:param application_generate_entity: application generate entity
@@ -668,7 +668,7 @@ class PipelineGenerator(BaseAppGenerator):
datasource_info: Mapping[str, Any],
created_from: str,
position: int,
account: Union[Account, EndUser],
account: Account | EndUser,
batch: str,
document_form: str,
):
@@ -715,7 +715,7 @@ class PipelineGenerator(BaseAppGenerator):
pipeline: Pipeline,
workflow: Workflow,
start_node_id: str,
user: Union[Account, EndUser],
user: Account | EndUser,
) -> list[Mapping[str, Any]]:
"""
Format datasource info list.

View File

@@ -23,6 +23,7 @@ from core.app.entities.app_invoke_entities import (
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id
from core.workflow.runtime_state import create_graph_runtime_state
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
@@ -158,7 +159,11 @@ class PipelineRunner(WorkflowBasedAppRunner):
workflow.graph_dict
)
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=inputs)
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.perf_counter(),
workflow_id=workflow.id,
)
# init graph
graph = self._init_rag_pipeline_graph(

View File

@@ -5,7 +5,7 @@ import logging
import threading
import uuid
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any, Literal, Union, overload
from typing import TYPE_CHECKING, Any, Literal, overload
from flask import Flask, current_app
from graphon.graph_engine.layers import GraphEngineLayer
@@ -64,7 +64,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
*,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: Literal[True],
@@ -82,7 +82,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
*,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: Literal[False],
@@ -100,7 +100,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
*,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool,
@@ -110,14 +110,14 @@ class WorkflowAppGenerator(BaseAppGenerator):
root_node_id: str | None = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
pause_state_config: PauseStateLayerConfig | None = None,
) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]: ...
) -> Mapping[str, Any] | Generator[Mapping[str, Any] | str, None, None]: ...
def generate(
self,
*,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
streaming: bool = True,
@@ -127,7 +127,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
root_node_id: str | None = None,
graph_engine_layers: Sequence[GraphEngineLayer] = (),
pause_state_config: PauseStateLayerConfig | None = None,
) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]:
) -> Mapping[str, Any] | Generator[Mapping[str, Any] | str, None, None]:
with self._bind_file_access_scope(tenant_id=app_model.tenant_id, user=user, invoke_from=invoke_from):
files: Sequence[Mapping[str, Any]] = args.get("files") or []
@@ -237,7 +237,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
*,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
application_generate_entity: WorkflowAppGenerateEntity,
graph_runtime_state: GraphRuntimeState,
workflow_execution_repository: WorkflowExecutionRepository,
@@ -245,7 +245,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
graph_engine_layers: Sequence[GraphEngineLayer] = (),
pause_state_config: PauseStateLayerConfig | None = None,
variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER,
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], None, None]:
"""
Resume a paused workflow execution using the persisted runtime state.
"""
@@ -269,7 +269,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
*,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
user: Account | EndUser,
application_generate_entity: WorkflowAppGenerateEntity,
invoke_from: InvokeFrom,
workflow_execution_repository: WorkflowExecutionRepository,
@@ -280,7 +280,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
graph_engine_layers: Sequence[GraphEngineLayer] = (),
graph_runtime_state: GraphRuntimeState | None = None,
pause_state_config: PauseStateLayerConfig | None = None,
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], None, None]:
"""
Generate App response.
@@ -609,10 +609,10 @@ class WorkflowAppGenerator(BaseAppGenerator):
application_generate_entity: WorkflowAppGenerateEntity,
workflow: Workflow,
queue_manager: AppQueueManager,
user: Union[Account, EndUser],
user: Account | EndUser,
draft_var_saver_factory: DraftVariableSaverFactory,
stream: bool = False,
) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:
) -> WorkflowAppBlockingResponse | Generator[WorkflowAppStreamResponse, None, None]:
"""
Handle response.
:param application_generate_entity: application generate entity

View File

@@ -16,6 +16,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import get_default_root_node_id
from core.workflow.runtime_state import create_graph_runtime_state
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
@@ -118,7 +119,11 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
root_node_id = self._root_node_id or get_default_root_node_id(self._workflow.graph_dict)
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=inputs)
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.perf_counter(),
workflow_id=self._workflow.id,
)
graph = self._init_graph(
graph_config=self._workflow.graph_dict,
graph_runtime_state=graph_runtime_state,

View File

@@ -68,6 +68,7 @@ from core.app.entities.queue_entities import (
)
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id, resolve_workflow_node_class
from core.workflow.runtime_state import create_graph_runtime_state
from core.workflow.system_variables import (
build_bootstrap_variables,
default_system_variables,
@@ -191,7 +192,11 @@ class WorkflowBasedAppRunner:
environment_variables=workflow.environment_variables,
),
)
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.time())
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.time(),
workflow_id=workflow.id,
)
# Determine which type of single node execution and get graph/variable_pool
if single_iteration_run:

View File

@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Annotated, Literal, Self, TypeAlias
from typing import Annotated, Literal, Self
from graphon.graph_engine.layers import GraphEngineLayer
from graphon.graph_events import GraphEngineEvent, GraphRunPausedEvent
@@ -27,7 +27,7 @@ class _AdvancedChatAppGenerateEntityWrapper(BaseModel):
entity: AdvancedChatAppGenerateEntity
_GenerateEntityUnion: TypeAlias = Annotated[
type _GenerateEntityUnion = Annotated[
_WorkflowGenerateEntityWrapper | _AdvancedChatAppGenerateEntityWrapper,
Field(discriminator="type"),
]

View File

@@ -2,7 +2,7 @@ import logging
import time
from collections.abc import Generator
from threading import Thread
from typing import Any, Union, cast
from typing import Any, cast
from graphon.file import FileTransferMethod
from graphon.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
@@ -72,14 +72,12 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
"""
_task_state: EasyUITaskState
_application_generate_entity: Union[ChatAppGenerateEntity, CompletionAppGenerateEntity, AgentChatAppGenerateEntity]
_application_generate_entity: ChatAppGenerateEntity | CompletionAppGenerateEntity | AgentChatAppGenerateEntity
_precomputed_event_type: StreamEvent | None = None
def __init__(
self,
application_generate_entity: Union[
ChatAppGenerateEntity, CompletionAppGenerateEntity, AgentChatAppGenerateEntity
],
application_generate_entity: ChatAppGenerateEntity | CompletionAppGenerateEntity | AgentChatAppGenerateEntity,
queue_manager: AppQueueManager,
conversation: Conversation,
message: Message,
@@ -117,11 +115,11 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
def process(
self,
) -> Union[
ChatbotAppBlockingResponse,
CompletionAppBlockingResponse,
Generator[Union[ChatbotAppStreamResponse, CompletionAppStreamResponse], None, None],
]:
) -> (
ChatbotAppBlockingResponse
| CompletionAppBlockingResponse
| Generator[ChatbotAppStreamResponse | CompletionAppStreamResponse, None, None]
):
if self._application_generate_entity.app_config.app_mode != AppMode.COMPLETION:
# start generate conversation name thread
self._conversation_name_generate_thread = self._message_cycle_manager.generate_conversation_name(
@@ -136,7 +134,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
def _to_blocking_response(
self, generator: Generator[StreamResponse, None, None]
) -> Union[ChatbotAppBlockingResponse, CompletionAppBlockingResponse]:
) -> ChatbotAppBlockingResponse | CompletionAppBlockingResponse:
"""
Process blocking response.
:return:
@@ -148,7 +146,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
extras = {"usage": self._task_state.llm_result.usage.model_dump()}
if self._task_state.metadata:
extras["metadata"] = self._task_state.metadata.model_dump()
response: Union[ChatbotAppBlockingResponse, CompletionAppBlockingResponse]
response: ChatbotAppBlockingResponse | CompletionAppBlockingResponse
if self._conversation_mode == AppMode.COMPLETION:
response = CompletionAppBlockingResponse(
task_id=self._application_generate_entity.task_id,
@@ -183,7 +181,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
def _to_stream_response(
self, generator: Generator[StreamResponse, None, None]
) -> Generator[Union[ChatbotAppStreamResponse, CompletionAppStreamResponse], None, None]:
) -> Generator[ChatbotAppStreamResponse | CompletionAppStreamResponse, None, None]:
"""
To stream response.
:return:

View File

@@ -5,14 +5,13 @@ This layer centralizes model-quota deduction outside node implementations.
"""
import logging
from typing import TYPE_CHECKING, cast, final
from typing import TYPE_CHECKING, cast, final, override
from graphon.enums import BuiltinNodeTypes
from graphon.graph_engine.entities.commands import AbortCommand, CommandType
from graphon.graph_engine.layers import GraphEngineLayer
from graphon.graph_events import GraphEngineEvent, GraphNodeEventBase, NodeRunSucceededEvent
from graphon.nodes.base.node import Node
from typing_extensions import override
from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext
from core.app.llm import deduct_llm_quota, ensure_llm_quota_available

View File

@@ -10,7 +10,7 @@ associates with the node span.
import logging
from contextvars import Token
from dataclasses import dataclass
from typing import cast, final
from typing import cast, final, override
from graphon.enums import BuiltinNodeTypes, NodeType
from graphon.graph_engine.layers import GraphEngineLayer
@@ -18,7 +18,6 @@ from graphon.graph_events import GraphNodeEventBase
from graphon.nodes.base.node import Node
from opentelemetry import context as context_api
from opentelemetry.trace import Span, SpanKind, Tracer, get_tracer, set_span_in_context
from typing_extensions import override
from configs import dify_config
from extensions.otel.parser import (

View File

@@ -44,7 +44,8 @@ class HumanInputContent(BaseModel):
type: ExecutionContentType = Field(default=ExecutionContentType.HUMAN_INPUT)
ExecutionExtraContentDomainModel: TypeAlias = HumanInputContent
# Keep a runtime alias here: callers and tests expect identity with HumanInputContent.
ExecutionExtraContentDomainModel: TypeAlias = HumanInputContent # noqa: UP040
__all__ = [
"ExecutionExtraContentDomainModel",

View File

@@ -2,12 +2,13 @@ import importlib.util
import logging
import sys
from types import ModuleType
from typing import AnyStr
logger = logging.getLogger(__name__)
def import_module_from_source(*, module_name: str, py_file_path: AnyStr, use_lazy_loader: bool = False) -> ModuleType:
def import_module_from_source[T: (str, bytes)](
*, module_name: str, py_file_path: T, use_lazy_loader: bool = False
) -> ModuleType:
"""
Importing a module from the source file directly
"""

View File

@@ -2,7 +2,6 @@ import os
from collections import OrderedDict
from collections.abc import Callable
from functools import lru_cache
from typing import TypeVar
from configs import dify_config
from core.tools.utils.yaml_utils import load_yaml_file_cached
@@ -65,10 +64,7 @@ def pin_position_map(original_position_map: dict[str, int], pin_list: list[str])
return position_map
T = TypeVar("T")
def is_filtered(
def is_filtered[T](
include_set: set[str],
exclude_set: set[str],
data: T,
@@ -97,11 +93,11 @@ def is_filtered(
return False
def sort_by_position_map(
def sort_by_position_map[T](
position_map: dict[str, int],
data: list[T],
name_func: Callable[[T], str],
):
) -> list[T]:
"""
Sort the objects by the position map.
If the name of the object is not in the position map, it will be put at the end.
@@ -116,11 +112,11 @@ def sort_by_position_map(
return sorted(data, key=lambda x: position_map.get(name_func(x), float("inf")))
def sort_to_dict_by_position_map(
def sort_to_dict_by_position_map[T](
position_map: dict[str, int],
data: list[T],
name_func: Callable[[T], str],
):
) -> OrderedDict[str, T]:
"""
Sort the objects into a ordered dict by the position map.
If the name of the object is not in the position map, it will be put at the end.

View File

@@ -4,7 +4,7 @@ Proxy requests to avoid SSRF
import logging
import time
from typing import Any, TypeAlias
from typing import Any
import httpx
from pydantic import TypeAdapter, ValidationError
@@ -20,8 +20,8 @@ SSRF_DEFAULT_MAX_RETRIES = dify_config.SSRF_DEFAULT_MAX_RETRIES
BACKOFF_FACTOR = 0.5
STATUS_FORCELIST = [429, 500, 502, 503, 504]
Headers: TypeAlias = dict[str, str]
_HEADERS_ADAPTER = TypeAdapter(Headers)
type Headers = dict[str, str]
_HEADERS_ADAPTER: TypeAdapter[Headers] = TypeAdapter(Headers)
_SSL_VERIFIED_POOL_KEY = "ssrf:verified"
_SSL_UNVERIFIED_POOL_KEY = "ssrf:unverified"

View File

@@ -3,13 +3,19 @@
import logging
import traceback
from datetime import UTC, datetime
from typing import Any
from typing import Any, TypedDict
import orjson
from configs import dify_config
class IdentityDict(TypedDict, total=False):
tenant_id: str
user_id: str
user_type: str
class StructuredJSONFormatter(logging.Formatter):
"""
JSON log formatter following the specified schema:
@@ -84,7 +90,7 @@ class StructuredJSONFormatter(logging.Formatter):
return log_dict
def _extract_identity(self, record: logging.LogRecord) -> dict[str, str] | None:
def _extract_identity(self, record: logging.LogRecord) -> IdentityDict | None:
tenant_id = getattr(record, "tenant_id", None)
user_id = getattr(record, "user_id", None)
user_type = getattr(record, "user_type", None)
@@ -92,7 +98,7 @@ class StructuredJSONFormatter(logging.Formatter):
if not any([tenant_id, user_id, user_type]):
return None
identity: dict[str, str] = {}
identity: IdentityDict = {}
if tenant_id:
identity["tenant_id"] = tenant_id
if user_id:

View File

@@ -3,7 +3,7 @@ import queue
from collections.abc import Generator
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from typing import Any, TypeAlias, final
from typing import Any, final
from urllib.parse import urljoin, urlparse
import httpx
@@ -33,9 +33,9 @@ class _StatusError:
# Type aliases for better readability
ReadQueue: TypeAlias = queue.Queue[SessionMessage | Exception | None]
WriteQueue: TypeAlias = queue.Queue[SessionMessage | Exception | None]
StatusQueue: TypeAlias = queue.Queue[_StatusReady | _StatusError]
type ReadQueue = queue.Queue[SessionMessage | Exception | None]
type WriteQueue = queue.Queue[SessionMessage | Exception | None]
type StatusQueue = queue.Queue[_StatusReady | _StatusError]
class SSETransport:

View File

@@ -1,6 +1,6 @@
from dataclasses import dataclass
from enum import StrEnum
from typing import Any, Generic, TypeVar
from typing import Any, TypeVar
from pydantic import BaseModel
@@ -9,13 +9,12 @@ from core.mcp.types import LATEST_PROTOCOL_VERSION, OAuthClientInformation, OAut
SUPPORTED_PROTOCOL_VERSIONS: list[str] = ["2024-11-05", "2025-03-26", LATEST_PROTOCOL_VERSION]
SessionT = TypeVar("SessionT", bound=BaseSession[Any, Any, Any, Any, Any])
LifespanContextT = TypeVar("LifespanContextT")
@dataclass
class RequestContext(Generic[SessionT, LifespanContextT]):
class RequestContext[SessionT: BaseSession[Any, Any, Any, Any, Any], LifespanContextT]:
request_id: RequestId
meta: RequestParams.Meta | None
session: SessionT

View File

@@ -4,7 +4,7 @@ from collections.abc import Callable
from concurrent.futures import Future, ThreadPoolExecutor, TimeoutError
from datetime import timedelta
from types import TracebackType
from typing import Any, Generic, Self, TypeVar
from typing import Any, Self
from httpx import HTTPStatusError
from pydantic import BaseModel
@@ -34,16 +34,10 @@ from core.mcp.types import (
logger = logging.getLogger(__name__)
SendRequestT = TypeVar("SendRequestT", ClientRequest, ServerRequest)
SendResultT = TypeVar("SendResultT", ClientResult, ServerResult)
SendNotificationT = TypeVar("SendNotificationT", ClientNotification, ServerNotification)
ReceiveRequestT = TypeVar("ReceiveRequestT", ClientRequest, ServerRequest)
ReceiveResultT = TypeVar("ReceiveResultT", bound=BaseModel)
ReceiveNotificationT = TypeVar("ReceiveNotificationT", ClientNotification, ServerNotification)
DEFAULT_RESPONSE_READ_TIMEOUT = 1.0
class RequestResponder(Generic[ReceiveRequestT, SendResultT]):
class RequestResponder[ReceiveRequestT: ClientRequest | ServerRequest, SendResultT: ClientResult | ServerResult]:
"""Handles responding to MCP requests and manages request lifecycle.
This class MUST be used as a context manager to ensure proper cleanup and
@@ -60,7 +54,7 @@ class RequestResponder(Generic[ReceiveRequestT, SendResultT]):
"""
request: ReceiveRequestT
_session: Any
_session: "BaseSession[Any, Any, SendResultT, ReceiveRequestT, Any]"
_on_complete: Callable[["RequestResponder[ReceiveRequestT, SendResultT]"], Any]
def __init__(
@@ -68,7 +62,7 @@ class RequestResponder(Generic[ReceiveRequestT, SendResultT]):
request_id: RequestId,
request_meta: RequestParams.Meta | None,
request: ReceiveRequestT,
session: """BaseSession[SendRequestT, SendNotificationT, SendResultT, ReceiveRequestT, ReceiveNotificationT]""",
session: "BaseSession[Any, Any, SendResultT, ReceiveRequestT, Any]",
on_complete: Callable[["RequestResponder[ReceiveRequestT, SendResultT]"], Any],
):
self.request_id = request_id
@@ -111,7 +105,7 @@ class RequestResponder(Generic[ReceiveRequestT, SendResultT]):
self.completed = True
self._session._send_response(request_id=self.request_id, response=response)
self._session.send_response(request_id=self.request_id, response=response)
def cancel(self):
"""Cancel this request and mark it as completed."""
@@ -120,21 +114,19 @@ class RequestResponder(Generic[ReceiveRequestT, SendResultT]):
self.completed = True # Mark as completed so it's removed from in_flight
# Send an error response to indicate cancellation
self._session._send_response(
self._session.send_response(
request_id=self.request_id,
response=ErrorData(code=0, message="Request cancelled", data=None),
)
class BaseSession(
Generic[
SendRequestT,
SendNotificationT,
SendResultT,
ReceiveRequestT,
ReceiveNotificationT,
],
):
class BaseSession[
SendRequestT: ClientRequest | ServerRequest,
SendNotificationT: ClientNotification | ServerNotification,
SendResultT: ClientResult | ServerResult,
ReceiveRequestT: ClientRequest | ServerRequest,
ReceiveNotificationT: ClientNotification | ServerNotification,
]:
"""
Implements an MCP "session" on top of read/write streams, including features
like request/response linking, notifications, and progress.
@@ -204,13 +196,13 @@ class BaseSession(
# The receiver thread should have already exited due to the None message in the queue
self._executor.shutdown(wait=False)
def send_request(
def send_request[T: BaseModel](
self,
request: SendRequestT,
result_type: type[ReceiveResultT],
result_type: type[T],
request_read_timeout_seconds: timedelta | None = None,
metadata: MessageMetadata | None = None,
) -> ReceiveResultT:
) -> T:
"""
Sends a request and wait for a response. Raises an McpError if the
response contains an error. If a request read timeout is provided, it
@@ -299,7 +291,7 @@ class BaseSession(
)
self._write_stream.put(session_message)
def _send_response(self, request_id: RequestId, response: SendResultT | ErrorData):
def send_response(self, request_id: RequestId, response: SendResultT | ErrorData):
if isinstance(response, ErrorData):
jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=request_id, error=response)
session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_error))
@@ -350,7 +342,7 @@ class BaseSession(
responder = RequestResponder[ReceiveRequestT, SendResultT](
request_id=message.message.root.id,
request_meta=validated_request.root.params.meta if validated_request.root.params else None,
request=validated_request,
request=validated_request, # type: ignore[arg-type] # mypy can't narrow constrained TypeVar from model_validate
session=self,
on_complete=lambda r: self._in_flight.pop(r.request_id, None),
)
@@ -372,8 +364,8 @@ class BaseSession(
if cancelled_id in self._in_flight:
self._in_flight[cancelled_id].cancel()
else:
self._received_notification(notification)
self._handle_incoming(notification)
self._received_notification(notification) # type: ignore[arg-type]
self._handle_incoming(notification) # type: ignore[arg-type]
except Exception as e:
# For other validation errors, log and continue
logger.warning("Failed to validate notification: %s. Message was: %s", e, message.message.root)

View File

@@ -1,6 +1,6 @@
from collections.abc import Callable
from dataclasses import dataclass
from typing import Annotated, Any, Generic, Literal, TypeAlias, TypeVar
from typing import Annotated, Any, Literal
from pydantic import BaseModel, ConfigDict, Field, FileUrl, RootModel
from pydantic.networks import AnyUrl, UrlConstraints
@@ -31,7 +31,7 @@ ProgressToken = str | int
Cursor = str
Role = Literal["user", "assistant"]
RequestId = Annotated[int | str, Field(union_mode="left_to_right")]
AnyFunction: TypeAlias = Callable[..., Any]
type AnyFunction = Callable[..., Any]
class RequestParams(BaseModel):
@@ -68,12 +68,7 @@ class NotificationParams(BaseModel):
"""
RequestParamsT = TypeVar("RequestParamsT", bound=RequestParams | dict[str, Any] | None)
NotificationParamsT = TypeVar("NotificationParamsT", bound=NotificationParams | dict[str, Any] | None)
MethodT = TypeVar("MethodT", bound=str)
class Request(BaseModel, Generic[RequestParamsT, MethodT]):
class Request[RequestParamsT: RequestParams | dict[str, Any] | None, MethodT: str](BaseModel):
"""Base class for JSON-RPC requests."""
method: MethodT
@@ -81,14 +76,14 @@ class Request(BaseModel, Generic[RequestParamsT, MethodT]):
model_config = ConfigDict(extra="allow")
class PaginatedRequest(Request[PaginatedRequestParams | None, MethodT], Generic[MethodT]):
class PaginatedRequest[T: str](Request[PaginatedRequestParams | None, T]):
"""Base class for paginated requests,
matching the schema's PaginatedRequest interface."""
params: PaginatedRequestParams | None = None
class Notification(BaseModel, Generic[NotificationParamsT, MethodT]):
class Notification[NotificationParamsT: NotificationParams | dict[str, Any] | None, MethodT: str](BaseModel):
"""Base class for JSON-RPC notifications."""
method: MethodT
@@ -736,7 +731,7 @@ class ResourceLink(Resource):
ContentBlock = TextContent | ImageContent | AudioContent | ResourceLink | EmbeddedResource
"""A content block that can be used in prompts and tool results."""
Content: TypeAlias = ContentBlock
type Content = ContentBlock
# """DEPRECATED: Content is deprecated, you should use ContentBlock directly."""

View File

@@ -6,7 +6,7 @@ import queue
import threading
import time
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Optional, Union
from typing import TYPE_CHECKING, Any, TypedDict
from uuid import UUID, uuid4
from cachetools import LRUCache
@@ -14,11 +14,11 @@ from flask import current_app
from pydantic import TypeAdapter
from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
from typing_extensions import TypedDict
from core.helper.encrypter import batch_decrypt_token, encrypt_token, obfuscated_token
from core.ops.entities.config_entity import (
OPS_FILE_PATH,
BaseTracingConfig,
TracingProviderEnum,
)
from core.ops.entities.trace_entity import (
@@ -195,8 +195,15 @@ def _lookup_llm_credential_info(
return None, ""
class OpsTraceProviderConfigMap(collections.UserDict[str, dict[str, Any]]):
def __getitem__(self, provider: str) -> dict[str, Any]:
class TracingProviderConfigEntry(TypedDict):
config_class: type[BaseTracingConfig]
secret_keys: list[str]
other_keys: list[str]
trace_instance: type[Any]
class OpsTraceProviderConfigMap(collections.UserDict[str, TracingProviderConfigEntry]):
def __getitem__(self, provider: str) -> TracingProviderConfigEntry:
match provider:
case TracingProviderEnum.LANGFUSE:
from core.ops.entities.config_entity import LangfuseConfig
@@ -456,7 +463,7 @@ class OpsTraceManager:
@classmethod
def get_ops_trace_instance(
cls,
app_id: Union[UUID, str] | None = None,
app_id: UUID | str | None = None,
):
"""
Get ops trace through model config
@@ -585,8 +592,8 @@ class OpsTraceManager:
provider_config_map[tracing_provider]["config_class"],
provider_config_map[tracing_provider]["trace_instance"],
)
tracing_config = config_type(**tracing_config)
return trace_instance(tracing_config).api_check()
config = config_type(**tracing_config)
return trace_instance(config).api_check()
@staticmethod
def get_trace_config_project_key(tracing_config: dict, tracing_provider: str):
@@ -600,8 +607,8 @@ class OpsTraceManager:
provider_config_map[tracing_provider]["config_class"],
provider_config_map[tracing_provider]["trace_instance"],
)
tracing_config = config_type(**tracing_config)
return trace_instance(tracing_config).get_project_key()
config = config_type(**tracing_config)
return trace_instance(config).get_project_key()
@staticmethod
def get_trace_config_project_url(tracing_config: dict, tracing_provider: str):
@@ -615,8 +622,8 @@ class OpsTraceManager:
provider_config_map[tracing_provider]["config_class"],
provider_config_map[tracing_provider]["trace_instance"],
)
tracing_config = config_type(**tracing_config)
return trace_instance(tracing_config).get_project_url()
config = config_type(**tracing_config)
return trace_instance(config).get_project_url()
class TraceTask:
@@ -709,7 +716,7 @@ class TraceTask:
self,
trace_type: Any,
message_id: str | None = None,
workflow_execution: Optional["WorkflowExecution"] = None,
workflow_execution: "WorkflowExecution | None" = None,
conversation_id: str | None = None,
user_id: str | None = None,
timer: Any | None = None,

View File

@@ -1,5 +1,4 @@
from collections.abc import Generator, Mapping
from typing import Generic, TypeVar
from pydantic import BaseModel
@@ -19,9 +18,6 @@ class BaseBackwardsInvocation:
yield BaseBackwardsInvocationResponse(data=response).model_dump_json().encode()
T = TypeVar("T", bound=dict | Mapping | str | bool | int | BaseModel)
class BaseBackwardsInvocationResponse(BaseModel, Generic[T]):
class BaseBackwardsInvocationResponse[T: dict | Mapping | str | bool | int | BaseModel](BaseModel):
data: T | None = None
error: str = ""

View File

@@ -4,7 +4,7 @@ import enum
from collections.abc import Mapping, Sequence
from datetime import datetime
from enum import StrEnum
from typing import Any, Generic, TypeVar
from typing import Any
from graphon.model_runtime.entities.model_entities import AIModelEntity
from graphon.model_runtime.entities.provider_entities import ProviderEntity
@@ -19,10 +19,8 @@ from core.tools.entities.common_entities import I18nObject
from core.tools.entities.tool_entities import ToolProviderEntityWithPlugin
from core.trigger.entities.entities import TriggerProviderEntity
T = TypeVar("T", bound=(BaseModel | dict | list | bool | str))
class PluginDaemonBasicResponse(BaseModel, Generic[T]):
class PluginDaemonBasicResponse[T: BaseModel | dict | list | bool | str](BaseModel):
"""
Basic response from plugin daemon.
"""

View File

@@ -2,7 +2,7 @@ import inspect
import json
import logging
from collections.abc import Callable, Generator
from typing import Any, TypeVar, cast
from typing import Any, cast
import httpx
from graphon.model_runtime.errors.invoke import (
@@ -51,8 +51,6 @@ elif isinstance(_plugin_daemon_timeout_config, httpx.Timeout):
else:
plugin_daemon_request_timeout = httpx.Timeout(_plugin_daemon_timeout_config)
T = TypeVar("T", bound=(BaseModel | dict[str, Any] | list[Any] | bool | str))
logger = logging.getLogger(__name__)
_httpx_client: httpx.Client = get_pooled_http_client(
@@ -191,7 +189,7 @@ class BasePluginClient:
logger.exception("Stream request to Plugin Daemon Service failed")
raise PluginDaemonInnerError(code=-500, message="Request to Plugin Daemon Service failed")
def _stream_request_with_model(
def _stream_request_with_model[T: BaseModel | dict[str, Any] | list[Any] | bool | str](
self,
method: str,
path: str,
@@ -207,7 +205,7 @@ class BasePluginClient:
for line in self._stream_request(method, path, params, headers, data, files):
yield type_(**json.loads(line)) # type: ignore
def _request_with_model(
def _request_with_model[T: BaseModel | dict[str, Any] | list[Any] | bool | str](
self,
method: str,
path: str,
@@ -223,7 +221,7 @@ class BasePluginClient:
response = self._request(method, path, headers, data, params, files)
return type_(**response.json()) # type: ignore[return-value]
def _request_with_plugin_daemon_response(
def _request_with_plugin_daemon_response[T: BaseModel | dict[str, Any] | list[Any] | bool | str](
self,
method: str,
path: str,
@@ -278,7 +276,7 @@ class BasePluginClient:
return rep.data
def _request_with_plugin_daemon_response_stream(
def _request_with_plugin_daemon_response_stream[T: BaseModel | dict[str, Any] | list[Any] | bool | str](
self,
method: str,
path: str,

View File

@@ -1,12 +1,9 @@
from collections.abc import Generator
from dataclasses import dataclass, field
from typing import TypeVar, Union
from core.agent.entities import AgentInvokeMessage
from core.tools.entities.tool_entities import ToolInvokeMessage
MessageType = TypeVar("MessageType", bound=Union[ToolInvokeMessage, AgentInvokeMessage])
@dataclass
class FileChunk:
@@ -22,11 +19,11 @@ class FileChunk:
self.data = bytearray(self.total_length)
def merge_blob_chunks(
response: Generator[MessageType, None, None],
def merge_blob_chunks[T: ToolInvokeMessage | AgentInvokeMessage](
response: Generator[T, None, None],
max_file_size: int = 30 * 1024 * 1024,
max_chunk_size: int = 8192,
) -> Generator[MessageType, None, None]:
) -> Generator[T, None, None]:
"""
Merge streaming blob chunks into complete blob messages.

View File

@@ -1,6 +1,7 @@
from typing import TypedDict
from graphon.model_runtime.entities.model_entities import ModelType
from graphon.model_runtime.errors.invoke import InvokeAuthorizationError
from typing_extensions import TypedDict
from core.model_manager import ModelInstance, ModelManager
from core.rag.data_post_processor.reorder import ReorderRunner

View File

@@ -1,10 +1,9 @@
from collections import defaultdict
from typing import Any
from typing import Any, TypedDict
import orjson
from pydantic import BaseModel
from sqlalchemy import select
from typing_extensions import TypedDict
from configs import dify_config
from core.rag.datasource.keyword.jieba.jieba_keyword_table_handler import JiebaKeywordTableHandler

View File

@@ -1,13 +1,12 @@
import concurrent.futures
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Any, NotRequired
from typing import Any, NotRequired, TypedDict
from flask import Flask, current_app
from graphon.model_runtime.entities.model_entities import ModelType
from sqlalchemy import select
from sqlalchemy.orm import Session, load_only
from typing_extensions import TypedDict
from configs import dify_config
from core.db.session_factory import session_factory

View File

@@ -3,7 +3,7 @@ import logging
import uuid
from collections.abc import Callable
from functools import wraps
from typing import Any, Concatenate, ParamSpec, TypeVar
from typing import Any, Concatenate
from mo_vector.client import MoVectorClient # type: ignore
from pydantic import BaseModel, model_validator
@@ -20,15 +20,12 @@ from models.dataset import Dataset
logger = logging.getLogger(__name__)
P = ParamSpec("P")
R = TypeVar("R")
T = TypeVar("T", bound="MatrixoneVector")
def ensure_client(func: Callable[Concatenate[T, P], R]):
def ensure_client[T: MatrixoneVector, **P, R](
func: Callable[Concatenate[T, P], R],
) -> Callable[Concatenate[T, P], R]:
@wraps(func)
def wrapper(self: T, *args: P.args, **kwargs: P.kwargs):
def wrapper(self: T, *args: P.args, **kwargs: P.kwargs) -> R:
if self.client is None:
self.client = self._get_client(None, False)
return func(self, *args, **kwargs)

View File

@@ -3,7 +3,7 @@ import os
import uuid
from collections.abc import Generator, Iterable, Sequence
from itertools import islice
from typing import TYPE_CHECKING, Any, Union
from typing import TYPE_CHECKING, Any
import qdrant_client
from flask import current_app
@@ -36,8 +36,8 @@ if TYPE_CHECKING:
from qdrant_client.conversions import common_types
from qdrant_client.http import models as rest
DictFilter = dict[str, Union[str, int, bool, dict, list]]
MetadataFilter = Union[DictFilter, common_types.Filter]
type DictFilter = dict[str, str | int | bool | dict | list]
type MetadataFilter = DictFilter | common_types.Filter
class PathQdrantParams(BaseModel):

View File

@@ -3,7 +3,7 @@ import os
import uuid
from collections.abc import Generator, Iterable, Sequence
from itertools import islice
from typing import TYPE_CHECKING, Any, Union
from typing import TYPE_CHECKING, Any
import httpx
import qdrant_client
@@ -40,8 +40,8 @@ if TYPE_CHECKING:
from qdrant_client.conversions import common_types
from qdrant_client.http import models as rest
DictFilter = dict[str, Union[str, int, bool, dict, list]]
MetadataFilter = Union[DictFilter, common_types.Filter]
type DictFilter = dict[str, str | int | bool | dict | list]
type MetadataFilter = DictFilter | common_types.Filter
class TidbOnQdrantConfig(BaseModel):

View File

@@ -1,5 +1,6 @@
from typing import TypedDict
from pydantic import BaseModel
from typing_extensions import TypedDict
from models.dataset import DocumentSegment

View File

@@ -12,11 +12,11 @@ import mimetypes
from collections.abc import Generator, Mapping
from io import BufferedReader, BytesIO
from pathlib import Path, PurePath
from typing import Any, Union
from typing import Any
from pydantic import BaseModel, ConfigDict, model_validator
PathLike = Union[str, PurePath]
type PathLike = str | PurePath
class Blob(BaseModel):
@@ -29,7 +29,7 @@ class Blob(BaseModel):
Inspired by: https://developer.mozilla.org/en-US/docs/Web/API/Blob
"""
data: Union[bytes, str, None] = None # Raw data
data: bytes | str | None = None # Raw data
mimetype: str | None = None # Not to be confused with a file extension
encoding: str = "utf-8" # Use utf-8 as default encoding, if decoding to string
# Location where the original content was found
@@ -75,7 +75,7 @@ class Blob(BaseModel):
raise ValueError(f"Unable to get bytes for blob {self}")
@contextlib.contextmanager
def as_bytes_io(self) -> Generator[Union[BytesIO, BufferedReader], None, None]:
def as_bytes_io(self) -> Generator[BytesIO | BufferedReader, None, None]:
"""Read data as a byte stream."""
if isinstance(self.data, bytes):
yield BytesIO(self.data)
@@ -117,7 +117,7 @@ class Blob(BaseModel):
@classmethod
def from_data(
cls,
data: Union[str, bytes],
data: str | bytes,
*,
encoding: str = "utf-8",
mime_type: str | None = None,

View File

@@ -1,9 +1,8 @@
import json
import time
from typing import Any, NotRequired, cast
from typing import Any, NotRequired, TypedDict, cast
import httpx
from typing_extensions import TypedDict
from extensions.ext_storage import storage

View File

@@ -1,11 +1,10 @@
import json
from collections.abc import Generator
from typing import Any, Union
from typing import Any, TypedDict
from urllib.parse import urljoin
import httpx
from httpx import Response
from typing_extensions import TypedDict
from core.rag.extractor.watercrawl.exceptions import (
WaterCrawlAuthenticationError,
@@ -142,7 +141,7 @@ class WaterCrawlAPIClient(BaseAPIClient):
def create_crawl_request(
self,
url: Union[list, str] | None = None,
url: list | str | None = None,
spider_options: SpiderOptions | None = None,
page_options: PageOptions | None = None,
plugin_options: dict[str, Any] | None = None,

View File

@@ -1,8 +1,6 @@
from collections.abc import Generator
from datetime import datetime
from typing import Any
from typing_extensions import TypedDict
from typing import Any, TypedDict
from core.rag.extractor.watercrawl.client import PageOptions, SpiderOptions, WaterCrawlAPIClient

View File

@@ -7,12 +7,11 @@ import os
import re
from abc import ABC, abstractmethod
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, NotRequired, Optional
from typing import TYPE_CHECKING, Any, NotRequired, TypedDict
from urllib.parse import unquote, urlparse
import httpx
from sqlalchemy import select
from typing_extensions import TypedDict
from configs import dify_config
from core.entities.knowledge_entities import PreviewDetail
@@ -118,11 +117,12 @@ class BaseIndexProcessor(ABC):
max_tokens: int,
chunk_overlap: int,
separator: str,
embedding_model_instance: Optional["ModelInstance"],
embedding_model_instance: "ModelInstance | None",
) -> TextSplitter:
"""
Get the NodeParser object according to the processing rule.
"""
character_splitter: TextSplitter
if processing_rule_mode in ["custom", "hierarchical"]:
# The user-defined segmentation rule
max_segmentation_tokens_length = dify_config.INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH
@@ -148,7 +148,7 @@ class BaseIndexProcessor(ABC):
embedding_model_instance=embedding_model_instance,
)
return character_splitter # type: ignore
return character_splitter
def _get_content_files(self, document: Document, current_user: Account | None = None) -> list[AttachmentDocument]:
"""

View File

@@ -4,19 +4,13 @@ from __future__ import annotations
import codecs
import re
from typing import Any
from collections.abc import Collection
from typing import Any, Literal
from graphon.model_runtime.model_providers.__base.tokenizers.gpt2_tokenizer import GPT2Tokenizer
from core.model_manager import ModelInstance
from core.rag.splitter.text_splitter import (
TS,
Collection,
Literal,
RecursiveCharacterTextSplitter,
Set,
Union,
)
from core.rag.splitter.text_splitter import RecursiveCharacterTextSplitter
class EnhanceRecursiveCharacterTextSplitter(RecursiveCharacterTextSplitter):
@@ -25,13 +19,13 @@ class EnhanceRecursiveCharacterTextSplitter(RecursiveCharacterTextSplitter):
"""
@classmethod
def from_encoder(
cls: type[TS],
def from_encoder[T: EnhanceRecursiveCharacterTextSplitter](
cls: type[T],
embedding_model_instance: ModelInstance | None,
allowed_special: Union[Literal["all"], Set[str]] = set(), # noqa: UP037
disallowed_special: Union[Literal["all"], Collection[str]] = "all", # noqa: UP037
allowed_special: Literal["all"] | set[str] = set(),
disallowed_special: Literal["all"] | Collection[str] = "all",
**kwargs: Any,
):
) -> T:
def _token_encoder(texts: list[str]) -> list[int]:
if not texts:
return []

View File

@@ -6,19 +6,12 @@ import re
from abc import ABC, abstractmethod
from collections.abc import Callable, Collection, Iterable, Sequence, Set
from dataclasses import dataclass
from typing import (
Any,
Literal,
TypeVar,
Union,
)
from typing import Any, Literal
from core.rag.models.document import BaseDocumentTransformer, Document
logger = logging.getLogger(__name__)
TS = TypeVar("TS", bound="TextSplitter")
def _split_text_with_regex(text: str, separator: str, keep_separator: bool) -> list[str]:
# Now that we have the separator, split the text
@@ -194,8 +187,8 @@ class TokenTextSplitter(TextSplitter):
self,
encoding_name: str = "gpt2",
model_name: str | None = None,
allowed_special: Union[Literal["all"], Set[str]] = set(),
disallowed_special: Union[Literal["all"], Collection[str]] = "all",
allowed_special: Literal["all"] | Set[str] = set(),
disallowed_special: Literal["all"] | Collection[str] = "all",
**kwargs: Any,
):
"""Create a new TextSplitter."""

View File

@@ -6,7 +6,6 @@ providing improved performance by offloading database operations to background w
"""
import logging
from typing import Union
from graphon.entities import WorkflowExecution
from sqlalchemy.engine import Engine
@@ -47,7 +46,7 @@ class CeleryWorkflowExecutionRepository(WorkflowExecutionRepository):
def __init__(
self,
session_factory: sessionmaker | Engine,
user: Union[Account, EndUser],
user: Account | EndUser,
app_id: str | None,
triggered_from: WorkflowRunTriggeredFrom | None,
):

View File

@@ -7,7 +7,6 @@ providing improved performance by offloading database operations to background w
import logging
from collections.abc import Sequence
from typing import Union
from graphon.entities import WorkflowNodeExecution
from sqlalchemy.engine import Engine
@@ -54,7 +53,7 @@ class CeleryWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
def __init__(
self,
session_factory: sessionmaker | Engine,
user: Union[Account, EndUser],
user: Account | EndUser,
app_id: str | None,
triggered_from: WorkflowNodeExecutionTriggeredFrom | None,
):

View File

@@ -7,7 +7,7 @@ allowing users to configure different repository backends through string paths.
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Literal, Protocol, Union
from typing import Literal, Protocol
from graphon.entities import WorkflowExecution, WorkflowNodeExecution
from sqlalchemy.engine import Engine
@@ -61,8 +61,8 @@ class DifyCoreRepositoryFactory:
@classmethod
def create_workflow_execution_repository(
cls,
session_factory: Union[sessionmaker, Engine],
user: Union[Account, EndUser],
session_factory: sessionmaker | Engine,
user: Account | EndUser,
app_id: str,
triggered_from: WorkflowRunTriggeredFrom,
) -> WorkflowExecutionRepository:
@@ -97,8 +97,8 @@ class DifyCoreRepositoryFactory:
@classmethod
def create_workflow_node_execution_repository(
cls,
session_factory: Union[sessionmaker, Engine],
user: Union[Account, EndUser],
session_factory: sessionmaker | Engine,
user: Account | EndUser,
app_id: str,
triggered_from: WorkflowNodeExecutionTriggeredFrom,
) -> WorkflowNodeExecutionRepository:

View File

@@ -4,7 +4,6 @@ SQLAlchemy implementation of the WorkflowExecutionRepository.
import json
import logging
from typing import Union
from graphon.entities import WorkflowExecution
from graphon.enums import WorkflowExecutionStatus, WorkflowType
@@ -40,7 +39,7 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository):
def __init__(
self,
session_factory: sessionmaker | Engine,
user: Union[Account, EndUser],
user: Account | EndUser,
app_id: str | None,
triggered_from: WorkflowRunTriggeredFrom | None,
):

View File

@@ -7,7 +7,7 @@ import json
import logging
from collections.abc import Callable, Mapping, Sequence
from concurrent.futures import ThreadPoolExecutor
from typing import Any, TypeVar, Union
from typing import Any
import psycopg2.errors
from graphon.entities import WorkflowNodeExecution
@@ -63,7 +63,7 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository)
def __init__(
self,
session_factory: sessionmaker | Engine,
user: Union[Account, EndUser],
user: Account | EndUser,
app_id: str | None,
triggered_from: WorkflowNodeExecutionTriggeredFrom | None,
):
@@ -551,10 +551,7 @@ def _deterministic_json_dump(value: Mapping[str, Any]) -> str:
return json.dumps(value, sort_keys=True)
_T = TypeVar("_T")
def _find_first(seq: Sequence[_T], pred: Callable[[_T], bool]) -> _T | None:
def _find_first[T](seq: Sequence[T], pred: Callable[[T], bool]) -> T | None:
filtered = [i for i in seq if pred(i)]
if filtered:
return filtered[0]

View File

@@ -3,15 +3,15 @@ import re
import threading
from collections import deque
from dataclasses import dataclass
from typing import Any, Union
from typing import Any
from core.schemas.registry import SchemaRegistry
logger = logging.getLogger(__name__)
# Type aliases for better clarity
SchemaType = Union[dict[str, Any], list[Any], str, int, float, bool, None]
SchemaDict = dict[str, Any]
type SchemaType = dict[str, Any] | list[Any] | str | int | float | bool | None
type SchemaDict = dict[str, Any]
# Pre-compiled pattern for better performance
_DIFY_SCHEMA_PATTERN = re.compile(r"^https://dify\.ai/schemas/(v\d+)/(.+)\.json$")
@@ -54,7 +54,7 @@ class QueueItem:
current: Any
parent: Any | None
key: Union[str, int] | None
key: str | int | None
depth: int
ref_path: set[str]

View File

@@ -1,6 +1,5 @@
import hashlib
import logging
from typing import TypeVar
from redis import RedisError
@@ -11,8 +10,6 @@ logger = logging.getLogger(__name__)
TRIGGER_DEBUG_EVENT_TTL = 300
TTriggerDebugEvent = TypeVar("TTriggerDebugEvent", bound="BaseDebugEvent")
class TriggerDebugEventBus:
"""
@@ -81,15 +78,15 @@ class TriggerDebugEventBus:
return 0
@classmethod
def poll(
def poll[T: BaseDebugEvent](
cls,
event_type: type[TTriggerDebugEvent],
event_type: type[T],
pool_key: str,
tenant_id: str,
user_id: str,
app_id: str,
node_id: str,
) -> TTriggerDebugEvent | None:
) -> T | None:
"""
Poll for an event or register to the waiting pool.

View File

@@ -2,7 +2,7 @@ import importlib
import pkgutil
from collections.abc import Callable, Iterator, Mapping, MutableMapping
from functools import lru_cache
from typing import TYPE_CHECKING, Any, TypeAlias, cast, final
from typing import TYPE_CHECKING, Any, cast, final, override
from graphon.entities.base_node_data import BaseNodeData
from graphon.entities.graph_config import NodeConfigDict, NodeConfigDictAdapter
@@ -22,7 +22,6 @@ from graphon.nodes.parameter_extractor.entities import ParameterExtractorNodeDat
from graphon.nodes.question_classifier.entities import QuestionClassifierNodeData
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import override
from configs import dify_config
from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext
@@ -192,7 +191,7 @@ class _LazyNodeTypeClassesMapping(MutableMapping[NodeType, Mapping[str, type[Nod
NODE_TYPE_CLASSES_MAPPING: MutableMapping[NodeType, Mapping[str, type[Node]]] = _LazyNodeTypeClassesMapping()
LLMCompatibleNodeData: TypeAlias = LLMNodeData | QuestionClassifierNodeData | ParameterExtractorNodeData
type LLMCompatibleNodeData = LLMNodeData | QuestionClassifierNodeData | ParameterExtractorNodeData
def fetch_memory(

View File

@@ -0,0 +1,123 @@
"""Helpers for explicitly wiring GraphRuntimeState collaborators.
GraphOn currently supports lazy construction of several runtime-state
collaborators such as the ready queue, graph execution aggregate, and response
coordinator. Dify initializes those collaborators eagerly so repository code
does not depend on that implicit behavior.
"""
from __future__ import annotations
from contextlib import AbstractContextManager
from graphon.graph import Graph
from graphon.graph_engine.domain.graph_execution import GraphExecution
from graphon.graph_engine.ready_queue import InMemoryReadyQueue
from graphon.graph_engine.response_coordinator import ResponseStreamCoordinator
from graphon.model_runtime.entities.llm_entities import LLMUsage
from graphon.runtime import GraphRuntimeState, VariablePool
def _require_workflow_id(workflow_id: str) -> str:
"""Validate that workflow-scoped runtime collaborators receive a real id."""
if not workflow_id:
raise ValueError("workflow_id must be a non-empty string")
return workflow_id
def create_graph_runtime_state(
*,
variable_pool: VariablePool,
start_at: float,
workflow_id: str,
total_tokens: int = 0,
llm_usage: LLMUsage | None = None,
outputs: dict[str, object] | None = None,
node_run_steps: int = 0,
execution_context: AbstractContextManager[object] | None = None,
) -> GraphRuntimeState:
"""Create a runtime state with explicit non-graph collaborators.
The graph itself is attached later, once node construction has completed and
the final Graph instance exists.
"""
workflow_id = _require_workflow_id(workflow_id)
return GraphRuntimeState(
variable_pool=variable_pool,
start_at=start_at,
total_tokens=total_tokens,
llm_usage=llm_usage or LLMUsage.empty_usage(),
outputs=outputs or {},
node_run_steps=node_run_steps,
ready_queue=InMemoryReadyQueue(),
graph_execution=GraphExecution(workflow_id=workflow_id),
execution_context=execution_context,
)
def ensure_graph_runtime_state_initialized(
graph_runtime_state: GraphRuntimeState,
*,
workflow_id: str,
) -> GraphRuntimeState:
"""Materialize non-graph collaborators when loading legacy or sparse state."""
workflow_id = _require_workflow_id(workflow_id)
if graph_runtime_state._ready_queue is None:
graph_runtime_state._ready_queue = InMemoryReadyQueue()
graph_execution = graph_runtime_state._graph_execution
if graph_execution is None:
graph_runtime_state._graph_execution = GraphExecution(
workflow_id=workflow_id,
)
elif not graph_execution.workflow_id:
graph_execution.workflow_id = workflow_id
elif graph_execution.workflow_id != workflow_id:
raise ValueError("GraphRuntimeState workflow_id does not match graph execution workflow_id")
return graph_runtime_state
def bind_graph_runtime_state_to_graph(
graph_runtime_state: GraphRuntimeState,
graph: Graph,
*,
workflow_id: str,
) -> GraphRuntimeState:
"""Attach graph-scoped collaborators without relying on GraphOn lazy setup."""
ensure_graph_runtime_state_initialized(
graph_runtime_state,
workflow_id=workflow_id,
)
attached_graph = graph_runtime_state._graph
if attached_graph is not None and attached_graph is not graph:
raise ValueError("GraphRuntimeState already attached to a different graph instance")
if graph_runtime_state._response_coordinator is None:
response_coordinator = ResponseStreamCoordinator(
variable_pool=graph_runtime_state.variable_pool,
graph=graph,
)
graph_runtime_state._response_coordinator = response_coordinator
graph_runtime_state.attach_graph(graph)
return graph_runtime_state
def snapshot_graph_runtime_state(
graph_runtime_state: GraphRuntimeState,
*,
workflow_id: str,
) -> str:
"""Serialize runtime state after explicit collaborator initialization."""
ensure_graph_runtime_state_initialized(
graph_runtime_state,
workflow_id=workflow_id,
)
return graph_runtime_state.dumps()

View File

@@ -25,6 +25,10 @@ from core.app.file_access import DatabaseFileAccessController
from core.app.workflow.layers.llm_quota import LLMQuotaLayer
from core.app.workflow.layers.observability import ObservabilityLayer
from core.workflow.node_factory import DifyNodeFactory, is_start_node_type, resolve_workflow_node_class
from core.workflow.runtime_state import (
bind_graph_runtime_state_to_graph,
create_graph_runtime_state,
)
from core.workflow.system_variables import (
default_system_variables,
get_node_creation_preload_selectors,
@@ -72,9 +76,10 @@ class _WorkflowChildEngineBuilder:
variable_pool: VariablePool | None = None,
) -> GraphEngine:
"""Build a child engine with a fresh runtime state and only child-safe layers."""
child_graph_runtime_state = GraphRuntimeState(
child_graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool if variable_pool is not None else parent_graph_runtime_state.variable_pool,
start_at=time.perf_counter(),
workflow_id=workflow_id,
execution_context=parent_graph_runtime_state.execution_context,
)
node_factory = DifyNodeFactory(
@@ -92,6 +97,11 @@ class _WorkflowChildEngineBuilder:
node_factory=node_factory,
root_node_id=root_node_id,
)
bind_graph_runtime_state_to_graph(
child_graph_runtime_state,
child_graph,
workflow_id=workflow_id,
)
command_channel = InMemoryChannel()
config = GraphEngineConfig()
@@ -152,6 +162,11 @@ class WorkflowEntry:
self.command_channel = command_channel
execution_context = capture_current_context()
graph_runtime_state.execution_context = execution_context
bind_graph_runtime_state_to_graph(
graph_runtime_state,
graph,
workflow_id=workflow_id,
)
self._child_engine_builder = _WorkflowChildEngineBuilder()
self.graph_engine = GraphEngine(
workflow_id=workflow_id,
@@ -244,9 +259,10 @@ class WorkflowEntry:
),
call_depth=0,
)
graph_runtime_state = GraphRuntimeState(
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.perf_counter(),
workflow_id=workflow.id,
execution_context=capture_current_context(),
)
@@ -402,7 +418,7 @@ class WorkflowEntry:
),
call_depth=0,
)
graph_runtime_state = GraphRuntimeState(
graph_runtime_state = create_graph_runtime_state(
variable_pool=variable_pool,
start_at=time.perf_counter(),
execution_context=capture_current_context(),

View File

@@ -1,5 +1,14 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from flask import Flask
if TYPE_CHECKING:
from extensions.ext_login import DifyLoginManager
class DifyApp(Flask):
pass
"""Flask application type with Dify-specific extension attributes."""
login_manager: DifyLoginManager

View File

@@ -1,7 +1,8 @@
import json
from typing import cast
import flask_login
from flask import Response, request
from flask import Request, Response, request
from flask_login import user_loaded_from_request, user_logged_in
from sqlalchemy import select
from werkzeug.exceptions import NotFound, Unauthorized
@@ -16,13 +17,35 @@ from models import Account, Tenant, TenantAccountJoin
from models.model import AppMCPServer, EndUser
from services.account_service import AccountService
login_manager = flask_login.LoginManager()
type LoginUser = Account | EndUser
class DifyLoginManager(flask_login.LoginManager):
"""Project-specific Flask-Login manager with a stable unauthorized contract.
Dify registers `unauthorized_handler` below to always return a JSON `Response`.
Overriding this method lets callers rely on that narrower return type instead of
Flask-Login's broader callback contract.
"""
def unauthorized(self) -> Response:
"""Return the registered unauthorized handler result as a Flask `Response`."""
return cast(Response, super().unauthorized())
def load_user_from_request_context(self) -> None:
"""Populate Flask-Login's request-local user cache for the current request."""
self._load_user()
login_manager = DifyLoginManager()
# Flask-Login configuration
@login_manager.request_loader
def load_user_from_request(request_from_flask_login):
def load_user_from_request(request_from_flask_login: Request) -> LoginUser | None:
"""Load user based on the request."""
del request_from_flask_login
# Skip authentication for documentation endpoints
if dify_config.SWAGGER_UI_ENABLED and request.path.endswith((dify_config.SWAGGER_UI_PATH, "/swagger.json")):
return None
@@ -100,10 +123,12 @@ def load_user_from_request(request_from_flask_login):
raise NotFound("End user not found.")
return end_user
return None
@user_logged_in.connect
@user_loaded_from_request.connect
def on_user_logged_in(_sender, user):
def on_user_logged_in(_sender: object, user: LoginUser) -> None:
"""Called when a user logged in.
Note: AccountService.load_logged_in_account will populate user.current_tenant_id
@@ -114,8 +139,10 @@ def on_user_logged_in(_sender, user):
@login_manager.unauthorized_handler
def unauthorized_handler():
def unauthorized_handler() -> Response:
"""Handle unauthorized requests."""
# Keep this as a concrete `Response`; `DifyLoginManager.unauthorized()` narrows
# Flask-Login's callback contract based on this override.
return Response(
json.dumps({"code": "unauthorized", "message": "Unauthorized."}),
status=401,
@@ -123,5 +150,5 @@ def unauthorized_handler():
)
def init_app(app: DifyApp):
def init_app(app: DifyApp) -> None:
login_manager.init_app(app)

View File

@@ -3,7 +3,7 @@ import logging
import ssl
from collections.abc import Callable
from datetime import timedelta
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, Union
from typing import TYPE_CHECKING, Any, Union
import redis
from redis import RedisError
@@ -297,12 +297,7 @@ def get_pubsub_broadcast_channel() -> BroadcastChannelProtocol:
return RedisBroadcastChannel(_pubsub_redis_client)
P = ParamSpec("P")
R = TypeVar("R")
T = TypeVar("T")
def redis_fallback(default_return: T | None = None): # type: ignore
def redis_fallback[T](default_return: T | None = None): # type: ignore
"""
decorator to handle Redis operation exceptions and return a default value when Redis is unavailable.
@@ -310,9 +305,9 @@ def redis_fallback(default_return: T | None = None): # type: ignore
default_return: The value to return when a Redis operation fails. Defaults to None.
"""
def decorator(func: Callable[P, R]):
def decorator[**P, R](func: Callable[P, R]) -> Callable[P, R | T | None]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs):
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R | T | None:
try:
return func(*args, **kwargs)
except RedisError as e:

View File

@@ -2,7 +2,6 @@ import json
import logging
import os
import time
from typing import Union
from graphon.entities import WorkflowExecution
from graphon.workflow_type_encoder import WorkflowRuntimeTypeConverter
@@ -27,7 +26,7 @@ class LogstoreWorkflowExecutionRepository(WorkflowExecutionRepository):
def __init__(
self,
session_factory: sessionmaker | Engine,
user: Union[Account, EndUser],
user: Account | EndUser,
app_id: str | None,
triggered_from: WorkflowRunTriggeredFrom | None,
):

View File

@@ -11,7 +11,7 @@ import os
import time
from collections.abc import Sequence
from datetime import datetime
from typing import Any, Union
from typing import Any
from graphon.entities import WorkflowNodeExecution
from graphon.enums import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
@@ -109,7 +109,7 @@ class LogstoreWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
def __init__(
self,
session_factory: sessionmaker | Engine,
user: Union[Account, EndUser],
user: Account | EndUser,
app_id: str | None,
triggered_from: WorkflowNodeExecutionTriggeredFrom | None,
):

View File

@@ -1,6 +1,6 @@
import functools
from collections.abc import Callable
from typing import ParamSpec, TypeVar, cast
from typing import cast
from opentelemetry.trace import get_tracer
@@ -8,9 +8,6 @@ from configs import dify_config
from extensions.otel.decorators.handler import SpanHandler
from extensions.otel.runtime import is_instrument_flag_enabled
P = ParamSpec("P")
R = TypeVar("R")
_HANDLER_INSTANCES: dict[type[SpanHandler], SpanHandler] = {SpanHandler: SpanHandler()}
@@ -21,7 +18,7 @@ def _get_handler_instance(handler_class: type[SpanHandler]) -> SpanHandler:
return _HANDLER_INSTANCES[handler_class]
def trace_span(handler_class: type[SpanHandler] | None = None) -> Callable[[Callable[P, R]], Callable[P, R]]:
def trace_span[**P, R](handler_class: type[SpanHandler] | None = None) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""
Decorator that traces a function with an OpenTelemetry span.

View File

@@ -1,11 +1,9 @@
import inspect
from collections.abc import Callable, Mapping
from typing import Any, TypeVar
from typing import Any
from opentelemetry.trace import SpanKind, Status, StatusCode
R = TypeVar("R")
class SpanHandler:
"""
@@ -31,9 +29,9 @@ class SpanHandler:
"""
return f"{wrapped.__module__}.{wrapped.__qualname__}"
def _extract_arguments(
def _extract_arguments[T](
self,
wrapped: Callable[..., R],
wrapped: Callable[..., T],
args: tuple[object, ...],
kwargs: Mapping[str, object],
) -> dict[str, Any] | None:
@@ -61,13 +59,13 @@ class SpanHandler:
except Exception:
return None
def wrapper(
def wrapper[T](
self,
tracer: Any,
wrapped: Callable[..., R],
wrapped: Callable[..., T],
args: tuple[object, ...],
kwargs: Mapping[str, object],
) -> R:
) -> T:
"""
Fully control the wrapper behavior.

View File

@@ -1,6 +1,6 @@
import logging
from collections.abc import Callable, Mapping
from typing import Any, TypeVar
from typing import Any
from opentelemetry.trace import SpanKind, Status, StatusCode
from opentelemetry.util.types import AttributeValue
@@ -12,19 +12,16 @@ from models.model import Account
logger = logging.getLogger(__name__)
R = TypeVar("R")
class AppGenerateHandler(SpanHandler):
"""Span handler for ``AppGenerateService.generate``."""
def wrapper(
def wrapper[T](
self,
tracer: Any,
wrapped: Callable[..., R],
wrapped: Callable[..., T],
args: tuple[object, ...],
kwargs: Mapping[str, object],
) -> R:
) -> T:
try:
arguments = self._extract_arguments(wrapped, args, kwargs)
if not arguments:

View File

@@ -1,12 +1,12 @@
from __future__ import annotations
from datetime import datetime
from typing import Any, TypeAlias
from typing import Any
from graphon.file import File
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
JSONValue: TypeAlias = Any
type JSONValue = Any
class ResponseModel(BaseModel):

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
from datetime import datetime
from typing import TypeAlias
from uuid import uuid4
from graphon.file import File
@@ -10,7 +9,7 @@ from pydantic import BaseModel, ConfigDict, Field, field_validator
from core.entities.execution_extra_content import ExecutionExtraContentDomainModel
from fields.conversation_fields import AgentThought, JSONValue, MessageFile
JSONValueType: TypeAlias = JSONValue
type JSONValueType = JSONValue
class ResponseModel(BaseModel):

View File

@@ -1,12 +1,10 @@
import contextvars
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, TypeVar
from typing import TYPE_CHECKING
from flask import Flask, g
T = TypeVar("T")
if TYPE_CHECKING:
from models import Account, EndUser

View File

@@ -2,19 +2,19 @@ from __future__ import annotations
from collections.abc import Callable
from functools import wraps
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, cast
from flask import current_app, g, has_request_context, request
from flask import Response, current_app, g, has_request_context, request
from flask_login.config import EXEMPT_METHODS
from werkzeug.local import LocalProxy
from configs import dify_config
from dify_app import DifyApp
from extensions.ext_login import DifyLoginManager
from libs.token import check_csrf_token
from models import Account
if TYPE_CHECKING:
from flask.typing import ResponseReturnValue
from models.model import EndUser
@@ -29,7 +29,13 @@ def _resolve_current_user() -> EndUser | Account | None:
return get_current_object() if callable(get_current_object) else user_proxy # type: ignore
def current_account_with_tenant():
def _get_login_manager() -> DifyLoginManager:
"""Return the project login manager with Dify's narrowed unauthorized contract."""
app = cast(DifyApp, current_app)
return app.login_manager
def current_account_with_tenant() -> tuple[Account, str]:
"""
Resolve the underlying account for the current user proxy and ensure tenant context exists.
Allows tests to supply plain Account mocks without the LocalProxy helper.
@@ -42,13 +48,7 @@ def current_account_with_tenant():
return user, user.current_tenant_id
from typing import ParamSpec, TypeVar
P = ParamSpec("P")
R = TypeVar("R")
def login_required(func: Callable[P, R]) -> Callable[P, R | ResponseReturnValue]:
def login_required[**P, R](func: Callable[P, R]) -> Callable[P, R | Response]:
"""
If you decorate a view with this, it will ensure that the current user is
logged in and authenticated before calling the actual view. (If they are
@@ -83,13 +83,16 @@ def login_required(func: Callable[P, R]) -> Callable[P, R | ResponseReturnValue]
"""
@wraps(func)
def decorated_view(*args: P.args, **kwargs: P.kwargs) -> R | ResponseReturnValue:
def decorated_view(*args: P.args, **kwargs: P.kwargs) -> R | Response:
if request.method in EXEMPT_METHODS or dify_config.LOGIN_DISABLED:
return current_app.ensure_sync(func)(*args, **kwargs)
user = _resolve_current_user()
if user is None or not user.is_authenticated:
return current_app.login_manager.unauthorized() # type: ignore
# `DifyLoginManager` guarantees that the registered unauthorized handler
# is surfaced here as a concrete Flask `Response`.
unauthorized_response: Response = _get_login_manager().unauthorized()
return unauthorized_response
g._login_user = user
# we put csrf validation here for less conflicts
# TODO: maybe find a better place for it.
@@ -102,7 +105,7 @@ def login_required(func: Callable[P, R]) -> Callable[P, R | ResponseReturnValue]
def _get_user() -> EndUser | Account | None:
if has_request_context():
if "_login_user" not in g:
current_app.login_manager._load_user() # type: ignore
_get_login_manager().load_user_from_request_context()
return g._login_user

View File

@@ -1,26 +1,20 @@
import logging
import sys
import urllib.parse
from dataclasses import dataclass
from typing import NotRequired
from typing import NotRequired, TypedDict
import httpx
from pydantic import TypeAdapter, ValidationError
from core.helper.http_client_pooling import get_pooled_http_client
if sys.version_info >= (3, 12):
from typing import TypedDict
else:
from typing_extensions import TypedDict
logger = logging.getLogger(__name__)
JsonObject = dict[str, object]
JsonObjectList = list[JsonObject]
type JsonObject = dict[str, object]
type JsonObjectList = list[JsonObject]
JSON_OBJECT_ADAPTER = TypeAdapter(JsonObject)
JSON_OBJECT_LIST_ADAPTER = TypeAdapter(JsonObjectList)
JSON_OBJECT_ADAPTER: TypeAdapter[JsonObject] = TypeAdapter(JsonObject)
JSON_OBJECT_LIST_ADAPTER: TypeAdapter[JsonObjectList] = TypeAdapter(JsonObjectList)
# Reuse a pooled httpx.Client for OAuth flows (public endpoints, no SSRF proxy).
_http_client: httpx.Client = get_pooled_http_client(

View File

@@ -1,6 +1,5 @@
import sys
import urllib.parse
from typing import Any, Literal
from typing import Any, Literal, TypedDict
import httpx
from flask_login import current_user
@@ -12,11 +11,6 @@ from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.source import DataSourceOauthBinding
if sys.version_info >= (3, 12):
from typing import TypedDict
else:
from typing_extensions import TypedDict
class NotionPageSummary(TypedDict):
page_id: str

View File

@@ -113,6 +113,7 @@ class DataSourceType(StrEnum):
WEBSITE_CRAWL = "website_crawl"
LOCAL_FILE = "local_file"
ONLINE_DOCUMENT = "online_document"
ONLINE_DRIVE = "online_drive"
class ProcessRuleMode(StrEnum):

View File

@@ -8,7 +8,7 @@ from datetime import datetime
from decimal import Decimal
from enum import StrEnum, auto
from functools import lru_cache
from typing import TYPE_CHECKING, Any, Literal, NotRequired, cast
from typing import TYPE_CHECKING, Any, Literal, NotRequired, TypedDict, cast
from uuid import uuid4
import sqlalchemy as sa
@@ -19,7 +19,6 @@ from graphon.file import FILE_MODEL_IDENTITY, File, FileTransferMethod, FileType
from graphon.file import helpers as file_helpers
from sqlalchemy import BigInteger, Float, Index, PrimaryKeyConstraint, String, exists, func, select, text
from sqlalchemy.orm import Mapped, Session, mapped_column
from typing_extensions import TypedDict
from configs import dify_config
from constants import DEFAULT_FILE_NUMBER_LIMITS

View File

@@ -1,6 +1,6 @@
import enum
import uuid
from typing import Any, Generic, TypeVar
from typing import Any
import sqlalchemy as sa
from sqlalchemy import CHAR, TEXT, VARCHAR, LargeBinary, TypeDecorator
@@ -110,17 +110,14 @@ class AdjustedJSON(TypeDecorator[dict | list | None]):
return value
_E = TypeVar("_E", bound=enum.StrEnum)
class EnumText(TypeDecorator[_E | None], Generic[_E]):
class EnumText[T: enum.StrEnum](TypeDecorator[T | None]):
impl = VARCHAR
cache_ok = True
_length: int
_enum_class: type[_E]
_enum_class: type[T]
def __init__(self, enum_class: type[_E], length: int | None = None):
def __init__(self, enum_class: type[T], length: int | None = None):
self._enum_class = enum_class
max_enum_value_len = max(len(e.value) for e in enum_class)
if length is not None:
@@ -131,25 +128,25 @@ class EnumText(TypeDecorator[_E | None], Generic[_E]):
# leave some rooms for future longer enum values.
self._length = max(max_enum_value_len, 20)
def process_bind_param(self, value: _E | str | None, dialect: Dialect) -> str | None:
def process_bind_param(self, value: T | str | None, dialect: Dialect) -> str | None:
if value is None:
return value
if isinstance(value, self._enum_class):
return value.value
# Since _E is bound to StrEnum which inherits from str, at this point value must be str
# Since T is bound to StrEnum which inherits from str, at this point value must be str
self._enum_class(value)
return value
def load_dialect_impl(self, dialect: Dialect) -> TypeEngine[Any]:
return dialect.type_descriptor(VARCHAR(self._length))
def process_result_value(self, value: str | None, dialect: Dialect) -> _E | None:
def process_result_value(self, value: str | None, dialect: Dialect) -> T | None:
if value is None or value == "":
return None
# Type annotation guarantees value is str at this point
return self._enum_class(value)
def compare_values(self, x: _E | None, y: _E | None) -> bool:
def compare_values(self, x: T | None, y: T | None) -> bool:
if x is None or y is None:
return x is y
return x == y

View File

@@ -1,7 +1,7 @@
[project]
name = "dify-api"
version = "1.13.3"
requires-python = ">=3.11,<3.13"
requires-python = "~=3.12.0"
dependencies = [
"aliyun-log-python-sdk~=0.9.37",
@@ -171,7 +171,7 @@ dev = [
"sseclient-py>=1.8.0",
"pytest-timeout>=2.4.0",
"pytest-xdist>=3.8.0",
"pyrefly>=0.57.1",
"pyrefly>=0.59.1",
]
############################################################
@@ -232,5 +232,5 @@ vdb = [
project-includes = ["."]
project-excludes = [".venv", "migrations/"]
python-platform = "linux"
python-version = "3.11.0"
python-version = "3.12.0"
infer-with-first-use = false

View File

@@ -50,6 +50,6 @@
"reportUntypedFunctionDecorator": "hint",
"reportUnnecessaryTypeIgnoreComment": "hint",
"reportAttributeAccessIssue": "hint",
"pythonVersion": "3.11",
"pythonVersion": "3.12",
"pythonPlatform": "All"
}
}

View File

@@ -5,12 +5,11 @@ import secrets
import uuid
from datetime import UTC, datetime, timedelta
from hashlib import sha256
from typing import Any, cast
from typing import Any, TypedDict, cast
from pydantic import BaseModel, TypeAdapter
from sqlalchemy import func, select
from sqlalchemy import delete, func, select
from sqlalchemy.orm import Session
from typing_extensions import TypedDict
class InvitationData(TypedDict):
@@ -145,22 +144,26 @@ class AccountService:
@staticmethod
def load_user(user_id: str) -> None | Account:
account = db.session.query(Account).filter_by(id=user_id).first()
account = db.session.get(Account, user_id)
if not account:
return None
if account.status == AccountStatus.BANNED:
raise Unauthorized("Account is banned.")
current_tenant = db.session.query(TenantAccountJoin).filter_by(account_id=account.id, current=True).first()
current_tenant = db.session.scalar(
select(TenantAccountJoin)
.where(TenantAccountJoin.account_id == account.id, TenantAccountJoin.current == True)
.limit(1)
)
if current_tenant:
account.set_tenant_id(current_tenant.tenant_id)
else:
available_ta = (
db.session.query(TenantAccountJoin)
.filter_by(account_id=account.id)
available_ta = db.session.scalar(
select(TenantAccountJoin)
.where(TenantAccountJoin.account_id == account.id)
.order_by(TenantAccountJoin.id.asc())
.first()
.limit(1)
)
if not available_ta:
return None
@@ -196,7 +199,7 @@ class AccountService:
def authenticate(email: str, password: str, invite_token: str | None = None) -> Account:
"""authenticate account with email and password"""
account = db.session.query(Account).filter_by(email=email).first()
account = db.session.scalar(select(Account).where(Account.email == email).limit(1))
if not account:
raise AccountPasswordError("Invalid email or password.")
@@ -372,8 +375,10 @@ class AccountService:
"""Link account integrate"""
try:
# Query whether there is an existing binding record for the same provider
account_integrate: AccountIntegrate | None = (
db.session.query(AccountIntegrate).filter_by(account_id=account.id, provider=provider).first()
account_integrate: AccountIntegrate | None = db.session.scalar(
select(AccountIntegrate)
.where(AccountIntegrate.account_id == account.id, AccountIntegrate.provider == provider)
.limit(1)
)
if account_integrate:
@@ -417,7 +422,9 @@ class AccountService:
def update_account_email(account: Account, email: str) -> Account:
"""Update account email"""
account.email = email
account_integrate = db.session.query(AccountIntegrate).filter_by(account_id=account.id).first()
account_integrate = db.session.scalar(
select(AccountIntegrate).where(AccountIntegrate.account_id == account.id).limit(1)
)
if account_integrate:
db.session.delete(account_integrate)
db.session.add(account)
@@ -819,7 +826,7 @@ class AccountService:
)
)
account = db.session.query(Account).where(Account.email == email).first()
account = db.session.scalar(select(Account).where(Account.email == email).limit(1))
if not account:
return None
@@ -1019,7 +1026,7 @@ class AccountService:
@staticmethod
def check_email_unique(email: str) -> bool:
return db.session.query(Account).filter_by(email=email).first() is None
return db.session.scalar(select(Account).where(Account.email == email).limit(1)) is None
class TenantService:
@@ -1385,10 +1392,10 @@ class RegisterService:
db.session.add(dify_setup)
db.session.commit()
except Exception as e:
db.session.query(DifySetup).delete()
db.session.query(TenantAccountJoin).delete()
db.session.query(Account).delete()
db.session.query(Tenant).delete()
db.session.execute(delete(DifySetup))
db.session.execute(delete(TenantAccountJoin))
db.session.execute(delete(Account))
db.session.execute(delete(Tenant))
db.session.commit()
logger.exception("Setup account failed, email: %s, name: %s", email, name)
@@ -1489,7 +1496,11 @@ class RegisterService:
TenantService.switch_tenant(account, tenant.id)
else:
TenantService.check_member_permission(tenant, inviter, account, "add")
ta = db.session.query(TenantAccountJoin).filter_by(tenant_id=tenant.id, account_id=account.id).first()
ta = db.session.scalar(
select(TenantAccountJoin)
.where(TenantAccountJoin.tenant_id == tenant.id, TenantAccountJoin.account_id == account.id)
.limit(1)
)
if not ta:
TenantService.create_tenant_member(tenant, account, role)
@@ -1546,21 +1557,18 @@ class RegisterService:
if not invitation_data:
return None
tenant = (
db.session.query(Tenant)
.where(Tenant.id == invitation_data["workspace_id"], Tenant.status == "normal")
.first()
tenant = db.session.scalar(
select(Tenant).where(Tenant.id == invitation_data["workspace_id"], Tenant.status == "normal").limit(1)
)
if not tenant:
return None
tenant_account = (
db.session.query(Account, TenantAccountJoin.role)
tenant_account = db.session.execute(
select(Account, TenantAccountJoin.role)
.join(TenantAccountJoin, Account.id == TenantAccountJoin.account_id)
.where(Account.email == invitation_data["email"], TenantAccountJoin.tenant_id == tenant.id)
.first()
)
).first()
if not tenant_account:
return None

View File

@@ -4,6 +4,8 @@ import uuid
import pandas as pd
logger = logging.getLogger(__name__)
from typing import TypedDict
from sqlalchemy import or_, select
from werkzeug.datastructures import FileStorage
from werkzeug.exceptions import NotFound
@@ -23,6 +25,27 @@ from tasks.annotation.enable_annotation_reply_task import enable_annotation_repl
from tasks.annotation.update_annotation_to_index_task import update_annotation_to_index_task
class AnnotationJobStatusDict(TypedDict):
job_id: str
job_status: str
class EmbeddingModelDict(TypedDict):
embedding_provider_name: str
embedding_model_name: str
class AnnotationSettingDict(TypedDict):
id: str
enabled: bool
score_threshold: float
embedding_model: EmbeddingModelDict | dict
class AnnotationSettingDisabledDict(TypedDict):
enabled: bool
class AppAnnotationService:
@classmethod
def up_insert_app_annotation_from_message(cls, args: dict, app_id: str) -> MessageAnnotation:
@@ -85,7 +108,7 @@ class AppAnnotationService:
return annotation
@classmethod
def enable_app_annotation(cls, args: dict, app_id: str):
def enable_app_annotation(cls, args: dict, app_id: str) -> AnnotationJobStatusDict:
enable_app_annotation_key = f"enable_app_annotation_{str(app_id)}"
cache_result = redis_client.get(enable_app_annotation_key)
if cache_result is not None:
@@ -109,7 +132,7 @@ class AppAnnotationService:
return {"job_id": job_id, "job_status": "waiting"}
@classmethod
def disable_app_annotation(cls, app_id: str):
def disable_app_annotation(cls, app_id: str) -> AnnotationJobStatusDict:
_, current_tenant_id = current_account_with_tenant()
disable_app_annotation_key = f"disable_app_annotation_{str(app_id)}"
cache_result = redis_client.get(disable_app_annotation_key)
@@ -567,7 +590,7 @@ class AppAnnotationService:
db.session.commit()
@classmethod
def get_app_annotation_setting_by_app_id(cls, app_id: str):
def get_app_annotation_setting_by_app_id(cls, app_id: str) -> AnnotationSettingDict | AnnotationSettingDisabledDict:
_, current_tenant_id = current_account_with_tenant()
# get app info
app = (
@@ -602,7 +625,9 @@ class AppAnnotationService:
return {"enabled": False}
@classmethod
def update_app_annotation_setting(cls, app_id: str, annotation_setting_id: str, args: dict):
def update_app_annotation_setting(
cls, app_id: str, annotation_setting_id: str, args: dict
) -> AnnotationSettingDict:
current_user, current_tenant_id = current_account_with_tenant()
# get app info
app = (

View File

@@ -1,7 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any
from typing_extensions import TypedDict
from typing import Any, TypedDict
class AuthCredentials(TypedDict):

Some files were not shown because too many files have changed in this diff Show More