Compare commits

...

102 Commits

Author SHA1 Message Date
Yansong Zhang
77c182f738 feat(api): propagate all app features in transparent upgrade
VirtualWorkflowSynthesizer._build_features() now extracts ALL legacy
app features from AppModelConfig into the synthesized workflow.features:

- opening_statement + suggested_questions
- sensitive_word_avoidance (keywords/API moderation)
- more_like_this
- speech_to_text / text_to_speech
- retriever_resource

Previously workflow.features was hardcoded to "{}", losing all these
features during transparent upgrade. Now AdvancedChatAppRunner's
moderation, opening text, and other feature layers work correctly
for transparently upgraded old apps.

Made-with: Cursor
2026-04-10 18:47:18 +08:00
Yansong Zhang
e04f00d29b feat(api): add context injection and Jinja2 support to Agent V2 node
Agent V2 now fully covers all LLM node capabilities:
- Context injection: {{#context#}} placeholder replaced with upstream
  knowledge retrieval results via _build_context_string()
- Jinja2 template rendering via _render_jinja2() with variable pool
- Multi-variable references across upstream nodes

Compatibility verified (7/7):
- T1: Context injection ({{#context#}})
- T2: Variable template resolution ({{#start.var#}})
- T3: Multi-upstream variable refs
- T4: Old Chat app with opening_statement
- T5: Old app sensitive_word_avoidance
- T6: Old app more_like_this
- T7: Old Completion app with variable substitution

Made-with: Cursor
2026-04-10 17:05:48 +08:00
Yansong Zhang
bbed99a4cb fix(web): add AGENT mode to AppPreview and AppScreenShot maps
Made-with: Cursor
2026-04-10 16:17:34 +08:00
Yansong Zhang
df6c1064c6 fix(web): resolve all TypeScript errors in Agent V2 frontend
- Fix toast API: use toast.success()/toast.error() instead of object
- Fix panel: use native HTML elements instead of mismatched component APIs
- Add BlockEnum.AgentV2 to block-icon map (icon + color)
- Add BlockEnum.AgentV2 to use-last-run.ts form params maps
- Add i18n keys: blocks.agent-v2, blocksAbout.agent-v2 (en + zh)
- TypeScript: 0 errors

Made-with: Cursor
2026-04-10 16:00:16 +08:00
Yansong Zhang
f4e04fc872 feat(web): add Agent V2 frontend — app creation, node editor, sandbox settings
P0 — Agent App can be created and routed:
- Add AppModeEnum.AGENT to types/app.ts
- Add Agent card to create-app-modal (primary row, with RiRobot2Fill icon)
- Route Agent apps to /workflow editor (same as workflow/advanced-chat)
- Update layout-main.tsx mode guards

P1 — Agent V2 workflow node:
- Add BlockEnum.AgentV2 = 'agent-v2' to workflow types
- Create agent-v2/node.tsx: displays model, strategy, tool count
- Create agent-v2/panel.tsx: model selector, strategy picker, tool list,
  max iterations, memory config, vision toggle
- Register in NodeComponentMap and PanelComponentMap

P2 — Sandbox Provider settings:
- Create sandbox-provider-page: list/configure/activate/delete providers
  (Docker, E2B, SSH, AWS CodeInterpreter)
- Create service/sandbox.ts: API client for sandbox provider endpoints
- Add "Sandbox Providers" to settings menu

i18n: Add en-US and zh-Hans translations for agent V2 description.
Made-with: Cursor
2026-04-10 15:31:48 +08:00
Yansong Zhang
59b9221501 fix(api): fix AWS CodeInterpreter stdout capture failure
Root cause: _WORKDIR was hardcoded to "/home/user" which doesn't exist
in AWS AgentCore Code Interpreter environment (actual pwd is
/opt/amazon/genesis1p-tools/var). Every command was prefixed with
"cd /home/user && ..." which failed silently, producing empty stdout.

Fix:
- Default _WORKDIR to "/tmp" (universally available)
- Auto-detect actual working directory via "pwd" during
  _construct_environment and override _WORKDIR dynamically

Verified: echo, python3, uname all return correct stdout.
Made-with: Cursor
2026-04-10 14:21:06 +08:00
Yansong Zhang
218c10ba4f feat(api): add SSH private key auth support and verify SSH/E2B providers
- SSH Provider: add automatic private key detection in ssh_password
  field (RSA/Ed25519/ECDSA) alongside existing password auth.
- SSH Provider verified end-to-end on EC2: connection, command exec,
  CLI binary upload via SFTP, dify init, tool symlink creation.
- E2B Provider verified: cloud sandbox creation, CLI binary upload,
  dify init with tool symlinks.
- Add linux/amd64 CLI binary for E2B (x86_64 cloud sandboxes).

Made-with: Cursor
2026-04-10 12:57:40 +08:00
Yansong Zhang
4c878da9e6 feat(api): add linux/amd64 dify-cli binary for E2B cloud sandbox
E2B Provider verified end-to-end:
- Cloud sandbox creation/release via E2B API
- CLI binary upload + execution inside E2B
- dify init + symlink creation
- dify execute requires public CLI_API_URL (expected for cloud sandbox)

Made-with: Cursor
2026-04-10 11:40:53 +08:00
Yansong Zhang
698af54c4f feat(api): complete end-to-end Docker sandbox auto tool execution
Full pipeline working: Agent V2 node → Docker container creation →
CLI binary upload (linux/arm64) → dify init (fetch tools from API) →
dify execute (tool callback via CLI API) → result returned.

Fixes:
- Use sandbox.id (not vm.metadata.id) for CLI paths
- Upload CLI binary to container during sandbox creation
- Resolve linux binary separately for Docker containers on macOS
- Save Docker provider config via SandboxProviderService (proper
  encryption) instead of raw DB insert
- Add verbose logging for sandbox tool execution path
- Fix NameError: binary not defined

Made-with: Cursor
2026-04-10 11:28:02 +08:00
Yansong Zhang
10bb276e97 fix(api): complete Docker sandbox tool execution pipeline
- Add linux/arm64 dify-cli binary for Docker containers
- Add DIFY_PORT config field for Docker socat forwarding
- Fix InvokeFrom.AGENT (doesn't exist) → InvokeFrom.DEBUGGER
  in CLI API fetch/tools/batch endpoint

Full pipeline verified: Docker container → dify init → dify execute
→ CLI API callback → plugin invocation → result returned to stdout.

Made-with: Cursor
2026-04-10 11:06:54 +08:00
Yansong Zhang
73fd439541 fix(api): resolve sandbox deadlock under gevent and refine integration
- Skip Local sandbox provider under gevent worker (subprocess pipes
  cause cooperative threading deadlock with Celery's gevent pool).
- Add non-blocking sandbox readiness check before tool execution.
- Add gevent timeout wrapper for sandbox bash session.
- Fix CLI binary resolution: add SANDBOX_DIFY_CLI_ROOT config field.
- Fix ExecutionContext.node_id propagation.
- Fix SkillInitializer to gracefully handle missing skill bundles.
- Update _invoke_tool_in_sandbox to use correct `dify execute` CLI
  subcommand format (not `invoke-tool`).

The full sandbox-in-agent pipeline works end-to-end for network-based
providers (Docker, E2B, SSH). Local provider is skipped under gevent
but works in non-gevent contexts.

Made-with: Cursor
2026-04-10 10:51:40 +08:00
Yansong Zhang
5cdae671d5 feat(api): integrate Sandbox Provider into Agent V2 execution pipeline
Close 3 integration gaps between the ported Sandbox system and Agent V2:

1. Fix _invoke_tool_in_sandbox to use SandboxBashSession context manager
   API correctly (keyword args, bash_tool, ToolReference), with graceful
   fallback to direct invocation when DifyCli binary is unavailable.

2. Inject sandbox into run_context via _resolve_sandbox_context() in
   WorkflowBasedAppRunner — automatically creates a sandbox when a
   tenant has an active sandbox provider configured.

3. Register SandboxLayer in both advanced_chat and workflow app runners
   for proper sandbox lifecycle cleanup on graph end.

Also: make SkillInitializer non-fatal when no skill bundle exists,
add node_id to ExecutionContext for sandbox session scoping.

Made-with: Cursor
2026-04-10 10:14:42 +08:00
Yansong Zhang
e50c36526e fix(api): fix transparent upgrade SSE channel mismatch and chat mode routing
- workflow_execute_task: add AppMode.CHAT/AGENT_CHAT/COMPLETION to the
  AdvancedChatAppGenerator routing branch so transparently upgraded old
  apps can execute through the workflow engine.
- app_generate_service: use app_model.mode (not hardcoded AppMode.AGENT)
  for SSE event subscription channel, ensuring the subscriber and
  Celery publisher use the same Redis channel key.

Made-with: Cursor
2026-04-09 17:27:41 +08:00
Yansong Zhang
2de2a8fd3a fix(api): resolve multi-turn memory failure in Agent apps
- Auto-resolve parent_message_id when not provided by client,
  querying the latest message in the conversation to maintain
  the thread chain that extract_thread_messages() relies on.
- Add AppMode.AGENT to TokenBufferMemory mode checks so file
  attachments in memory are handled via the workflow branch.
- Add debug logging for memory injection in node_factory and node.

Made-with: Cursor
2026-04-09 16:27:38 +08:00
Yansong Zhang
e2e16772a1 fix(api): fix DSL import, memory loading, and remaining test coverage
1. DSL Import fix: change self._session.commit() to self._session.flush()
   in app_dsl_service.py _create_or_update_app() to avoid "closed transaction"
   error. DSL import now works: export agent app -> import -> new app created.

2. Memory loading attempt: added _load_memory_messages() to AgentV2Node
   that loads TokenBufferMemory from conversation history. However, chatflow
   engine manages conversations differently from easy-UI (conversation may
   not be in DB at query time, or uses ConversationVariablePersistenceLayer
   instead of Message table). Memory needs further investigation.

Test results:
- Multi-turn memory: Turn 1 OK, Turn 2 LLM doesn't see history (needs deeper fix)
- Service API with API Key: PASSED (answer="Sixteen" for 8+8)
- DSL Import: PASSED (status=completed, new app created)
- Token aggregation: PASSED (node=49, workflow=49)

Known: memory in multi-turn chatflow needs to use graphon's built-in
memory mechanism (MemoryConfig on node + ConversationVariablePersistenceLayer)
rather than direct DB query.

Made-with: Cursor
2026-04-09 14:47:55 +08:00
Yansong Zhang
b21a443d56 fix(api): resolve all remaining known issues
1. Fix workflow-level total_tokens=0:
   Call graph_runtime_state.add_tokens(usage.total_tokens) in both
   _run_without_tools and _run_with_tools paths after node execution.
   Previously only graphon's internal ModelInvokeCompletedEvent handler
   called add_tokens, which agent-v2 doesn't emit.

2. Fix Turn 2 SSE empty response:
   Set PUBSUB_REDIS_CHANNEL_TYPE=streams in .env. Redis Streams
   provides durable event delivery (consumers can replay past events),
   solving the pub/sub at-most-once timing issue.

3. Skill -> Agent runtime integration:
   SandboxBuilder.build() now auto-includes SkillInitializer if not
   already present. This ensures sandbox.attrs has the skill bundle
   loaded for downstream consumers (tool execution in sandbox).

4. LegacyResponseAdapter:
   New module at core/app/apps/common/legacy_response_adapter.py.
   Filters workflow-specific SSE events (workflow_started, node_started,
   node_finished, workflow_finished) from the stream, passing through
   only message/message_end/agent_log/error/ping events that old
   clients expect.

46 unit tests pass.

Made-with: Cursor
2026-04-09 12:53:11 +08:00
Yansong Zhang
4f010cd4f5 fix(api): stop emitting StreamChunkEvent from tool path to prevent answer duplication
The EventAdapter was converting every LLMResultChunk from the agent
strategy into StreamChunkEvent. Combined with the answer node's
{{#agent.text#}} variable output, this caused the final answer to
appear twice (e.g., "It is 2026-04-09 04:27:45.It is 2026-04-09 04:27:45.").

Now LLMResultChunk from strategy output is silently consumed (text still
accumulates in AgentResult.text via the strategy). Only AgentLogEvent
(thought/tool_call/round) is forwarded to the pipeline.

Known remaining issues:
- workflow/message level total_tokens=0 (node level is correct at 33)
  because pipeline aggregation doesn't include agent-v2 node tokens
- Turn 2 SSE delivery timing with Redis pubsub (celery executes OK)

Made-with: Cursor
2026-04-09 12:31:49 +08:00
Yansong Zhang
3d4be88d97 fix(api): remove unsupported 'user' param from FC/ReAct invoke_llm calls
FunctionCallStrategy and ReActStrategy were passing user=self.context.user_id
to ModelInstance.invoke_llm() which doesn't accept that parameter.
This caused tool-using agent runs to fail with:
  "ModelInstance.invoke_llm() got an unexpected keyword argument 'user'"

Verified: Agent V2 with current_time tool now works end-to-end:
  ROUND 1: LLM thought -> CALL current_time -> got time
  ROUND 2: LLM generates answer with time info
Made-with: Cursor
2026-04-09 12:18:07 +08:00
Yansong Zhang
482a004efe fix(api): fix duplicate answer and completion app upgrade issues
1. Remove StreamChunkEvent from AgentV2Node._run_without_tools():
   The agent-v2 node was yielding StreamChunkEvent during LLM streaming,
   AND the downstream answer node was outputting the same text via
   {{#agent.text#}} variable reference, causing "FourFour" duplication.
   Now text only flows through outputs.text -> answer node (single path).

2. Map inputs to query for completion app transparent upgrade:
   Completion apps send {inputs: {query: "..."}} not {query: "..."}.
   VirtualWorkflowSynthesizer route now extracts query from inputs
   when the top-level query is missing.

Verified:
- Old chat app: "What is 2+2?" -> "Four" (was "FourFour")
- Old completion app: {inputs: {query: "What is 3+3?"}} -> "3 + 3 = 6" (was failing)
- Old agent-chat app: still works

Made-with: Cursor
2026-04-09 12:02:43 +08:00
Yansong Zhang
7052257c8d fix(api): use lazy workflow persistence for transparent upgrade of old apps
VirtualWorkflowSynthesizer.ensure_workflow() creates a real draft
workflow on first call for a legacy app, persisting it to the database.
On subsequent calls, returns the existing draft.

This is needed because AdvancedChatAppGenerator's worker thread looks
up workflows from the database by ID. Instead of hacking the generator
to skip DB lookups, we treat this as a lazy one-time upgrade: the old
app gets a real workflow that can also be edited in the workflow editor.

Verified: old chat app created on main branch ("What is 2+2?" -> "Four")
and old agent-chat app ("Say hello" -> "Hello!") both successfully
execute through the Agent V2 engine with AGENT_V2_TRANSPARENT_UPGRADE=true.

Made-with: Cursor
2026-04-09 11:28:16 +08:00
Yansong Zhang
edfcab6455 fix(api): add AGENT mode to app list filtering
Add AppMode.AGENT branch in get_paginate_apps() so that
filtering apps by mode=agent works correctly.
Discovered during comprehensive E2E testing.

14/14 E2E tests pass covering:
- A: New Agent app full lifecycle (create, draft, configs, publish, run)
- B: Old app creation compat (chat, completion, agent-chat, advanced-chat, workflow)
- C: App listing and filtering (all modes, agent filter)
- D: Workflow editor compat (block configs)
- E: DSL export

Made-with: Cursor
2026-04-09 10:54:05 +08:00
Yansong Zhang
66212e3575 feat(api): implement zero-migration transparent upgrade (Phase 8)
Add two feature-flag-controlled upgrade paths that allow existing apps
and LLM nodes to transparently run through the Agent V2 engine without
any database migration:

1. AGENT_V2_TRANSPARENT_UPGRADE (default: off):
   When enabled, old apps (chat/completion/agent-chat) bypass legacy
   Easy-UI runners. VirtualWorkflowSynthesizer converts AppModelConfig
   to an in-memory Workflow (start -> agent-v2 -> answer) at runtime,
   then executes via AdvancedChatAppGenerator. Falls back to legacy
   path on any synthesis error.

   VirtualWorkflowSynthesizer maps:
   - model JSON -> ModelConfig
   - pre_prompt/chat_prompt_config -> prompt_template
   - agent_mode.tools -> ToolMetadata[]
   - agent_mode.strategy -> agent_strategy
   - dataset_configs -> context
   - file_upload -> vision

2. AGENT_V2_REPLACES_LLM (default: off):
   When enabled, DifyNodeFactory.create_node() transparently remaps
   nodes with type="llm" to type="agent-v2" before class resolution.
   Since AgentV2NodeData is a strict superset of LLMNodeData, the
   mapping is lossless. With tools=[], Agent V2 behaves identically
   to LLM Node.

Both flags default to False for safety. Turn off = instant rollback.
46 existing tests pass. Flask starts successfully.

Made-with: Cursor
2026-04-09 10:30:52 +08:00
Yansong Zhang
96374d7f6a refactor(api): replace legacy agent runners with StrategyFactory in AgentChatAppRunner (Phase 4)
Replace the hardcoded FunctionCallAgentRunner / CotChatAgentRunner /
CotCompletionAgentRunner selection in AgentChatAppRunner with the new
AgentAppRunner class that uses StrategyFactory from Phase 1.

Before: AgentChatAppRunner manually selects FC/CoT runner class based on
model features and LLM mode, then instantiates it directly.

After: AgentChatAppRunner instantiates AgentAppRunner (from sandbox branch),
which internally uses StrategyFactory.create_strategy() to auto-select
the right strategy, and uses ToolInvokeHook for proper agent_invoke
with file handling and thought persistence.

This unifies the agent execution engine: both the new Agent V2 workflow
node and the legacy agent-chat app now use the same StrategyFactory
and AgentPattern implementations.

Also fix: command and file_upload nodes use string node_type instead of
BuiltinNodeTypes.COMMAND/FILE_UPLOAD (not in current graphon version).

46 tests pass. Flask starts successfully.

Made-with: Cursor
2026-04-09 09:42:23 +08:00
Yansong Zhang
44491e427c feat(api): enable all sandbox/skill controller routes and resolve dependencies (P0)
Resolve the full dependency chain to enable all previously disabled controllers:

Enabled routes:
- sandbox_files: sandbox file browser API
- sandbox_providers: sandbox provider management API
- app_asset: app asset management API
- skills: skill extraction API
- CLI API blueprint: DifyCli callback endpoints (/cli/api/*)

Dependencies extracted (64 files, ~8000 lines):
- models/sandbox.py, models/app_asset.py: DB models
- core/zip_sandbox/: zip-based sandbox execution
- core/session/: CLI API session management
- core/memory/: base memory + node token buffer
- core/helper/creators.py: helper utilities
- core/llm_generator/: context models, output models, utils
- core/workflow/nodes/command/: command node type
- core/workflow/nodes/file_upload/: file upload node type
- core/app/entities/: app_asset_entities, app_bundle_entities, llm_generation_entities
- services/: asset_content, skill, workflow_collaboration, workflow_comment
- controllers/console/app/error.py: AppAsset error classes
- core/tools/utils/system_encryption.py

Import fixes:
- dify_graph.enums -> graphon.enums in skill_service.py
- get_signed_file_url_for_plugin -> get_signed_file_url in cli_api.py

All 5 controllers verified: import OK, Flask starts successfully.
46 existing tests still pass.

Made-with: Cursor
2026-04-09 09:36:16 +08:00
Yansong Zhang
d3d9f21cdf feat(api): wire sandbox into Agent V2 node execution pipeline
Integrate the ported sandbox system with Agent V2 node:

- Add DIFY_SANDBOX_CONTEXT_KEY to app_invoke_entities for passing
  sandbox through run_context without modifying graphon
- DifyNodeFactory._resolve_sandbox() extracts sandbox from run_context
  and passes it to AgentV2Node constructor
- AgentV2Node accepts optional sandbox parameter
- AgentV2ToolManager supports dual execution paths:
  - _invoke_tool_directly(): standard ToolEngine.generic_invoke (no sandbox)
  - _invoke_tool_in_sandbox(): delegates to SandboxBashSession.run_tool()
    which uses DifyCli to call back to Dify API from inside the sandbox
- Graceful fallback: if sandbox execution fails, logs warning and returns
  error message (does not crash the agent loop)

To enable sandbox for an Agent workflow:
1. Create a Sandbox via SandboxBuilder
2. Add it to run_context under DIFY_SANDBOX_CONTEXT_KEY
3. Agent V2 nodes will automatically use sandbox for tool execution

46 existing tests still pass.

Made-with: Cursor
2026-04-08 17:46:34 +08:00
Yansong Zhang
0c7e7e0c4e feat(api): port Sandbox + VirtualEnvironment + Skill system from feat/support-agent-sandbox (Phase 5-6)
Port the complete infrastructure for agent sandbox execution and skill system:

Sandbox & Virtual Environment (core/sandbox/, core/virtual_environment/):
- Sandbox entity with lifecycle management (ready/failed/cancelled states)
- SandboxBuilder with fluent API for configuring providers
- 5 VM providers: Local, SSH, Docker, E2B, AWS CodeInterpreter
- VirtualEnvironment base with command execution, file transfer, transport layers
- Channel transport: pipe, queue, socket implementations
- Bash session management and DifyCli binary integration
- Storage: archive storage, file storage, noop storage, presign storage
- Initializers: DifyCli, AppAssets, DraftAppAssets, Skills
- Inspector: file browser, archive/runtime source, script utils
- Security: encryption utils, debug helpers

Skill & App Assets (core/skill/, core/app_assets/, core/app_bundle/):
- Skill entity and manager
- App asset accessor, builder pipeline (file, skill builders)
- App bundle source zip extractor
- Storage and converter utilities

API Endpoints:
- CLI API blueprint (controllers/cli_api/) for sandbox callback
- Sandbox provider management (workspace/sandbox_providers)
- Sandbox file browser (console/sandbox_files)
- App asset management (console/app/app_asset)
- Skill management (console/app/skills)
- Storage file endpoints (controllers/files/storage_files)

Services:
- Sandbox service, provider service, file service
- App asset service, app bundle service

Config:
- CliApiConfig, CreatorsPlatformConfig, CollaborationConfig
- FILES_API_URL for sandbox file access

Note: Controller route registration temporarily commented out (marked TODO)
pending resolution of deep dependency chains (socketio, workflow_comment,
command node, etc.). Core sandbox modules are fully ported and syntax-validated.
110 files changed, 10,549 insertions.

Made-with: Cursor
2026-04-08 17:39:02 +08:00
Yansong Zhang
d9d1e9b63a fix(api): resolve Agent V2 node E2E runtime issues
Fixes discovered during end-to-end testing of Agent workflow execution:

1. ModelManager instantiation: use ModelManager.for_tenant() instead of
   ModelManager() which requires a ProviderManager argument
2. Variable template resolution: use VariableTemplateParser(template).format()
   instead of non-existent resolve_template() static method
3. invoke_llm() signature: remove unsupported 'user' keyword argument
4. Event dispatch: remove ModelInvokeCompletedEvent from _run() yield
   (graphon base Node._dispatch doesn't support it via singledispatch)
5. NodeRunResult metadata: use WorkflowNodeExecutionMetadataKey enum keys
   (TOTAL_TOKENS, TOTAL_PRICE, CURRENCY) instead of arbitrary string keys
6. SSE topic mismatch: use AppMode.AGENT (not ADVANCED_CHAT) in
   retrieve_events() so publisher and subscriber share the same channel
7. Celery task routing: add AppMode.AGENT to workflow_execute_task._run_app()
   alongside ADVANCED_CHAT

All issues verified fixed: Agent V2 node successfully invokes LLM and
returns "Hello there!" through the full SSE streaming pipeline.

Made-with: Cursor
2026-04-08 16:21:12 +08:00
Yansong Zhang
bebafaa346 fix(api): allow AGENT mode in console chat, message, and debug endpoints
Add AppMode.AGENT to mode checks discovered during E2E testing:
- Console chat-messages endpoint (ChatApi)
- Console chat stop endpoint (ChatMessageStopApi)
- Console message list and detail endpoints
- Advanced-chat debug run endpoints (5 in workflow.py)
- Advanced-chat workflow run endpoints (2 in workflow_run.py)

Made-with: Cursor
2026-04-08 13:27:42 +08:00
Yansong Zhang
1835a1dc5d fix(api): allow AGENT mode in workflow features validation
Add AppMode.AGENT to validate_features_structure() match case
alongside ADVANCED_CHAT, fixing 'Invalid app mode: agent' error
when creating Agent apps (which auto-generate a workflow draft).

Discovered during E2E testing of the full create -> draft -> publish flow.

Made-with: Cursor
2026-04-08 13:19:59 +08:00
Yansong Zhang
8f3a3ea03e feat(api): enable Agent mode in workflow/service APIs and add default config (Phase 7)
Ensure new Agent apps (AppMode.AGENT) can access all workflow-related
APIs and Service API chat endpoints:

- Add AppMode.AGENT to 13 workflow controller mode checks
- Add AppMode.AGENT to 4 workflow_run controller mode checks
- Add AppMode.AGENT to workflow_draft_variable controller
- Add AppMode.AGENT to Service API chat, conversation, message endpoints
- Add AgentV2Node.get_default_config() with prompt templates and strategy defaults
- 46 unit tests all passing (8 new Phase 7 tests)

Old agent/agent-chat paths remain completely unchanged.

Made-with: Cursor
2026-04-08 12:41:37 +08:00
Yansong Zhang
96641a93f6 feat(api): add Agent V2 node and new Agent app type (Phase 1-3)
Introduce a new unified Agent V2 workflow node that combines LLM capabilities
with agent tool-calling loops, along with a new AppMode.AGENT for standalone
agent apps backed by single-node workflows.

Phase 1 — Agent Patterns:
- Add core/agent/patterns/ module (AgentPattern, FunctionCallStrategy,
  ReActStrategy, StrategyFactory) ported from feat/support-agent-sandbox
- Add ExecutionContext, AgentLog, AgentResult entities
- Add Tool.to_prompt_message_tool() for LLM-consumable tool conversion

Phase 2 — Agent V2 Workflow Node:
- Add core/workflow/nodes/agent_v2/ (AgentV2Node, AgentV2NodeData,
  AgentV2ToolManager, AgentV2EventAdapter)
- Register agent-v2 node type in DifyNodeFactory
- No-tools path: single LLM call (LLM Node equivalent)
- Tools path: FC/ReAct loop via StrategyFactory

Phase 3 — Agent App Type:
- Add AppMode.AGENT to model enum
- Add WorkflowGraphFactory for auto-generating start->agent_v2->answer graphs
- AppService.create_app() creates workflow draft for AGENT mode
- AppGenerateService.generate() routes AGENT to AdvancedChatAppGenerator
- Console API and DSL import/export support AGENT mode
- Default app template for AGENT mode

Old agent/agent-chat/LLM node paths are fully preserved.
38 unit tests all passing.

Made-with: Cursor
2026-04-08 12:31:23 +08:00
corevibe555
b1adb5652e refactor(api): deduplicate I18nObject in datasource entities (#34701) 2026-04-08 01:36:56 +00:00
corevibe555
c825d5dcf6 refactor(api): tighten types for Tenant.custom_config_dict and MCPToolProvider.headers (#34698) 2026-04-08 01:36:42 +00:00
Renzo
2127d5850f refactor: replace untyped dicts with TypedDict in VDB config classes (#34697)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/amd64, ubuntu-latest, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/amd64, ubuntu-latest, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Skip Duplicate Checks (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / Run API Tests (push) Has been cancelled
Main CI Pipeline / Skip API Tests (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Run Web Tests (push) Has been cancelled
Main CI Pipeline / Skip Web Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Run Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Skip Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / Run VDB Tests (push) Has been cancelled
Main CI Pipeline / Skip VDB Tests (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / Run DB Migration Test (push) Has been cancelled
Main CI Pipeline / Skip DB Migration Test (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2026-04-08 00:57:11 +00:00
carlos4s
ae9fcc2969 refactor: use sessionmaker in controllers, events, models, and tasks 1 (#34693) 2026-04-07 23:47:20 +00:00
corevibe555
624db69f12 refactor(api): remove duplicated RAG entities from services layer (#34689) 2026-04-07 23:36:59 +00:00
corevibe555
80a7843f45 refactor(api): migrate consumers to shared RAG domain entities from core/rag/entities/ (#34692) 2026-04-07 23:22:56 +00:00
Renzo
cb55176612 refactor: migrate session.query to select API in small task files batch (#34684)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-07 22:58:23 +00:00
Statxc
5aa2524d33 refactor(api): type I18nObject.to_dict with I18nObjectDict TypedDict (#34680) 2026-04-07 22:57:32 +00:00
Pulakesh
2575a3a3ab refactor(api): clean up AssistantPromptMessage typing in CotChatAgentRunner (#34681) 2026-04-07 22:53:14 +00:00
corevibe555
f8f7b0ec1a refactor(api): deduplicate shared auth request payloads into auth_entities.py (#34694) 2026-04-07 22:51:11 +00:00
corevibe555
d2ee486900 refactor(api): extract shared RAG domain entities into core/rag/entity (#34685) 2026-04-07 22:43:37 +00:00
Statxc
c44ddd9831 refactor(api): type Chroma and AnalyticDB config params dicts with TypedDicts (#34678)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/amd64, ubuntu-latest, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/amd64, ubuntu-latest, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Skip Duplicate Checks (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / Run API Tests (push) Has been cancelled
Main CI Pipeline / Skip API Tests (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Run Web Tests (push) Has been cancelled
Main CI Pipeline / Skip Web Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Run Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Skip Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / Run VDB Tests (push) Has been cancelled
Main CI Pipeline / Skip VDB Tests (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / Run DB Migration Test (push) Has been cancelled
Main CI Pipeline / Skip DB Migration Test (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
2026-04-07 13:27:12 +00:00
Statxc
e645cbd8f8 refactor(api): type VDB config params dicts with TypedDicts (#34677) 2026-04-07 13:23:42 +00:00
YBoy
485fc2c416 refactor(api): type Tenant custom config with TypedDict and tighten MCP headers type (#34670) 2026-04-07 13:18:19 +00:00
YBoy
f09be969bb refactor(api): type single-node graph structure with TypedDicts in workflow_entry (#34671) 2026-04-07 13:18:00 +00:00
Statxc
597a0b4d9f refactor(api): type indexing result with IndexingResultDict TypedDict (#34672) 2026-04-07 13:17:39 +00:00
Statxc
779cce3c61 refactor(api): type gen_index_struct_dict with VectorIndexStructDict TypedDict (#34675) 2026-04-07 13:17:20 +00:00
Statxc
b5d9a71cf9 refactor(api): type VDB to_index_struct with VectorIndexStructDict TypedDict (#34674) 2026-04-07 13:17:04 +00:00
corevibe555
c2af415450 refactor(api): Extract shared ResponseModel (#34633) 2026-04-07 13:05:38 +00:00
Dream
89ce61cfea refactor(api): replace json.loads with Pydantic validation in security and tools layers (#34380) 2026-04-07 12:11:51 +00:00
yyh
05c5327f47 chore: remove unused pnpm overrides (#34658) 2026-04-07 09:36:49 +00:00
yyh
3891c0a255 fix(workflow): correct env variable picker validation (#34666) 2026-04-07 09:34:25 +00:00
非法操作
63b1d0c1ea fix: var input label missing icon (#34569) 2026-04-07 09:33:13 +00:00
Pulakesh
75ed38fb3d fix(#34636): replace SimpleNamespace with MagicMock(spec=App) in test_app_dsl_service (#34659)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/amd64, ubuntu-latest, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/amd64, ubuntu-latest, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Skip Duplicate Checks (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / Run API Tests (push) Has been cancelled
Main CI Pipeline / Skip API Tests (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Run Web Tests (push) Has been cancelled
Main CI Pipeline / Skip Web Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Run Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Skip Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / Run VDB Tests (push) Has been cancelled
Main CI Pipeline / Skip VDB Tests (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / Run DB Migration Test (push) Has been cancelled
Main CI Pipeline / Skip DB Migration Test (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
Trigger i18n Sync on Push / trigger (push) Has been cancelled
2026-04-07 07:25:46 +00:00
Statxc
63db9a7a2f refactor(api): type load balancing config dicts with TypedDict (#34639) 2026-04-07 05:58:10 +00:00
Statxc
19c80f0f0e refactor(api): type error stream response with TypedDict (#34641) 2026-04-07 05:57:42 +00:00
YBoy
c5a0bde3ec refactor(api): type aliyun trace utils with TypedDict and tighten return types (#34642) 2026-04-07 05:57:22 +00:00
YBoy
1261e5e5e8 refactor(api): type webhook validation result and workflow inputs with TypedDict (#34645) 2026-04-07 05:57:02 +00:00
Renzo
e2ecd68556 refactor: migrate session.query to select API in rag pipeline task files (#34648) 2026-04-07 05:56:19 +00:00
Pulakesh
bceb0eee9b refactor(api): migrate dict returns to TypedDicts in billing service (#34649) 2026-04-07 05:56:02 +00:00
Renzo
173e818a62 refactor: migrate session.query to select API in summary and remove document tasks (#34650) 2026-04-07 05:55:31 +00:00
YBoy
84d8940dbf refactor(api): type app parameter feature toggles with FeatureToggleD… (#34651) 2026-04-07 05:53:50 +00:00
Renzo
3e995e6a6d refactor: migrate session.query to select API in document task files (#34646) 2026-04-07 05:53:21 +00:00
yyh
459c36f21b fix: improve app delete alert dialog UX (#34644)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-07 05:03:39 +00:00
Renzo
72adb5468c refactor: migrate session.query to select API in retrieval_service (#34638) 2026-04-07 04:46:30 +00:00
Renzo
1194957fde refactor: migrate session.query to select API in end_user_service and small tasks (#34620) 2026-04-07 04:25:55 +00:00
Renzo
68bd29eda2 refactor: migrate session.query to select API in sync task and services (#34619) 2026-04-07 04:23:14 +00:00
YBoy
f67a811f7f refactor: replace dict params with BaseModel payloads in TagService (#34422)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-07 04:20:02 +00:00
yyh
b9c122e7f4 fix: simplify pre-commit hook flow (#34637) 2026-04-07 04:19:31 +00:00
aliworksx08
396b39dff9 refactor: migrate session.query to select API in console controllers (#34607) 2026-04-07 04:19:30 +00:00
Renzo
ac8bd12609 refactor: migrate session.query to select API in small task files (#34617) 2026-04-07 04:13:22 +00:00
Renzo
b55bef4438 refactor: migrate session.query to select API in core misc modules (#34608) 2026-04-07 04:08:34 +00:00
非法操作
2f9667de76 fix: web app user avatar display incorrect black (#34624) 2026-04-07 03:23:56 +00:00
Statxc
a7b6307d32 refactor(api): type dataset service dicts with TypedDict (#34625) 2026-04-07 02:10:52 +00:00
Statxc
2883ad6764 refactor(api): type plugin migration results with TypedDict (#34627) 2026-04-07 02:10:23 +00:00
Pulakesh
0feff5b048 refactor(api): enforce strict typing on retrieval_model to resolve FIXME (#34614) 2026-04-07 01:10:53 +00:00
Statxc
0bce6b35b4 refactor(api): type LLM generator results with TypedDict (#34621) 2026-04-07 01:06:08 +00:00
YBoy
89e23456f0 refactor(api): type invitation detail with InvitationDetailDict TypedDict (#34613)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-07 01:03:31 +00:00
Jake Armstrong
a39173c227 refactor(api): type notification response with NotificationResponseDict TypedDict (#34616) 2026-04-07 01:03:18 +00:00
YBoy
12e93d374f refactor(api): type MCP tool schema and arguments with TypedDict (#34612) 2026-04-07 01:02:06 +00:00
YBoy
922f9242e4 refactor(api): type crawl status dicts with CrawlStatusDict TypedDict (#34611) 2026-04-07 01:01:04 +00:00
YBoy
7fc0a791a2 refactor(api): type document summary status detail with TypedDict (#34610) 2026-04-07 01:00:39 +00:00
YBoy
8d37116fec refactor(api): type storage statistics with StorageStatisticsDict TypedDict (#34609)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-07 00:59:32 +00:00
dependabot[bot]
4b500f988d chore(deps-dev): bump the dev group across 1 directory with 20 updates (#34601)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/amd64, ubuntu-latest, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/amd64, ubuntu-latest, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Skip Duplicate Checks (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / Run API Tests (push) Has been cancelled
Main CI Pipeline / Skip API Tests (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Run Web Tests (push) Has been cancelled
Main CI Pipeline / Skip Web Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Run Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Skip Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / Run VDB Tests (push) Has been cancelled
Main CI Pipeline / Skip VDB Tests (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / Run DB Migration Test (push) Has been cancelled
Main CI Pipeline / Skip DB Migration Test (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-06 13:24:31 +00:00
YBoy
5ad906ea6a refactor(api): type workflow run related counts with RelatedCountsDict TypedDict (#34530) 2026-04-06 13:17:01 +00:00
dependabot[bot]
5b862a43e0 chore(deps-dev): bump the dev group in /api with 6 updates (#34579)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Asuka Minato <i@asukaminato.eu.org>
2026-04-06 11:49:54 +00:00
YBoy
1e5cd69205 refactor(api): type archive manifest with ArchiveManifestDict TypedDict (#34594) 2026-04-06 11:35:11 +00:00
Jake Armstrong
9081c46565 refactor(api): type upload file serialization with UploadFileDict TypedDict (#34589) 2026-04-06 11:34:52 +00:00
dependabot[bot]
40b252be8c chore(deps): bump google-auth-httplib2 from 0.3.0 to 0.3.1 in /api in the google group (#34575)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-06 11:32:30 +00:00
dependabot[bot]
ba1357038a chore(deps): update flask-compress requirement from <1.24,>=1.17 to >=1.17,<1.25 in /api in the flask group (#34573)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-06 11:32:19 +00:00
dependabot[bot]
46d1f4c338 chore(deps-dev): bump the vdb group in /api with 7 updates (#34586)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-06 11:31:36 +00:00
YBoy
9c880dd650 refactor(api): type orphaned draft variable stats with TypedDict (#34590) 2026-04-06 11:30:53 +00:00
YBoy
01ba0e050f refactor(api): reuse IdentityDict TypedDict in logging filters (#34593) 2026-04-06 11:30:21 +00:00
dependabot[bot]
ccc4aae94e chore(deps): bump the llm group in /api with 3 updates (#34583)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-06 11:30:02 +00:00
dependabot[bot]
01242e13d7 chore(deps): bump sqlalchemy from 2.0.48 to 2.0.49 in /api in the database group (#34584)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-06 11:29:50 +00:00
dependabot[bot]
938ee27e42 chore(deps): bump the github-actions-dependencies group with 4 updates (#34582)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-06 11:29:07 +00:00
dependabot[bot]
a101f72153 chore(deps): bump the google group in /api with 4 updates (#34581)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-06 11:29:00 +00:00
dependabot[bot]
40642433d8 chore(deps): bump flask-compress from 1.23 to 1.24 in /api in the flask group (#34580)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-06 11:28:25 +00:00
dependabot[bot]
8979181d5e chore(deps): bump boto3 from 1.42.78 to 1.42.83 in /api in the storage group (#34578)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-04-06 11:27:58 +00:00
dependabot[bot]
c17c6b5c35 chore(deps): bump the storage group in /api with 2 updates (#34585)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-06 11:27:26 +00:00
kurokobo
e83a4090ac fix: lighten the health checks for the Worker and Worker Beat services, and disable them by default (#34572)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/amd64, ubuntu-latest, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/amd64, ubuntu-latest, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Skip Duplicate Checks (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / Run API Tests (push) Has been cancelled
Main CI Pipeline / Skip API Tests (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Run Web Tests (push) Has been cancelled
Main CI Pipeline / Skip Web Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Run Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Skip Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / Run VDB Tests (push) Has been cancelled
Main CI Pipeline / Skip VDB Tests (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / Run DB Migration Test (push) Has been cancelled
Main CI Pipeline / Skip DB Migration Test (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled
2026-04-06 02:26:26 +00:00
451 changed files with 25018 additions and 1656 deletions

View File

@@ -20,4 +20,4 @@
- [x] I understand that this PR may be closed in case there was no previous discussion or issues. (This doesn't apply to typos!)
- [x] I've added a test for each change that was introduced, and I tried as much as possible to make a single atomic change.
- [x] I've updated the documentation accordingly.
- [x] I ran `make lint` and `make type-check` (backend) and `cd web && npx lint-staged` (frontend) to appease the lint gods
- [x] I ran `make lint` and `make type-check` (backend) and `cd web && pnpm exec vp staged` (frontend) to appease the lint gods

View File

@@ -65,7 +65,7 @@ jobs:
echo "PLATFORM_PAIR=${platform//\//-}" >> $GITHUB_ENV
- name: Login to Docker Hub
uses: docker/login-action@b45d80f862d83dbcd57f89517bcf500b2ab88fb2 # v4.0.0
uses: docker/login-action@4907a6ddec9925e35a0a9e82d7399ccc52663121 # v4.1.0
with:
username: ${{ env.DOCKERHUB_USER }}
password: ${{ env.DOCKERHUB_TOKEN }}
@@ -130,7 +130,7 @@ jobs:
merge-multiple: true
- name: Login to Docker Hub
uses: docker/login-action@b45d80f862d83dbcd57f89517bcf500b2ab88fb2 # v4.0.0
uses: docker/login-action@4907a6ddec9925e35a0a9e82d7399ccc52663121 # v4.1.0
with:
username: ${{ env.DOCKERHUB_USER }}
password: ${{ env.DOCKERHUB_TOKEN }}

View File

@@ -149,7 +149,7 @@ jobs:
.editorconfig
- name: Super-linter
uses: super-linter/super-linter/slim@61abc07d755095a68f4987d1c2c3d1d64408f1f9 # v8.5.0
uses: super-linter/super-linter/slim@9e863354e3ff62e0727d37183162c4a88873df41 # v8.6.0
if: steps.changed-files.outputs.any_changed == 'true'
env:
BASH_SEVERITY: warning

View File

@@ -240,7 +240,7 @@ jobs:
- name: Run Claude Code for Translation Sync
if: steps.context.outputs.CHANGED_FILES != ''
uses: anthropics/claude-code-action@88c168b39e7e64da0286d812b6e9fbebb6708185 # v1.0.82
uses: anthropics/claude-code-action@6e2bd52842c65e914eba5c8badd17560bd26b5de # v1.0.89
with:
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}
github_token: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -36,7 +36,7 @@ jobs:
remove_tool_cache: true
- name: Setup UV and Python
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # v7.6.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
enable-cache: true
python-version: ${{ matrix.python-version }}

View File

@@ -95,31 +95,5 @@ if $web_modified; then
exit 1
fi
echo "Running unit tests check"
modified_files=$(git diff --cached --name-only -- utils | grep -v '\.spec\.ts$' || true)
if [ -n "$modified_files" ]; then
for file in $modified_files; do
test_file="${file%.*}.spec.ts"
echo "Checking for test file: $test_file"
# check if the test file exists
if [ -f "../$test_file" ]; then
echo "Detected changes in $file, running corresponding unit tests..."
pnpm run test "../$test_file"
if [ $? -ne 0 ]; then
echo "Unit tests failed. Please fix the errors before committing."
exit 1
fi
echo "Unit tests for $file passed."
else
echo "Warning: $file does not have a corresponding test file."
fi
done
echo "All unit tests for modified web/utils files have passed."
fi
cd ../
fi

BIN
api/bin/dify-cli-darwin-arm64 Executable file

Binary file not shown.

BIN
api/bin/dify-cli-linux-amd64 Executable file

Binary file not shown.

BIN
api/bin/dify-cli-linux-arm64 Executable file

Binary file not shown.

18
api/celery_healthcheck.py Normal file
View File

@@ -0,0 +1,18 @@
# This module provides a lightweight Celery instance for use in Docker health checks.
# Unlike celery_entrypoint.py, this does NOT import app.py and therefore avoids
# initializing all Flask extensions (DB, Redis, storage, blueprints, etc.).
# Using this module keeps the health check fast and low-cost.
from celery import Celery
from configs import dify_config
from extensions.ext_celery import get_celery_broker_transport_options, get_celery_ssl_options
celery = Celery(broker=dify_config.CELERY_BROKER_URL)
broker_transport_options = get_celery_broker_transport_options()
if broker_transport_options:
celery.conf.update(broker_transport_options=broker_transport_options)
ssl_options = get_celery_ssl_options()
if ssl_options:
celery.conf.update(broker_use_ssl=ssl_options)

View File

@@ -1,7 +1,7 @@
import datetime
import logging
import time
from typing import Any
from typing import TypedDict
import click
import sqlalchemy as sa
@@ -503,7 +503,19 @@ def _find_orphaned_draft_variables(batch_size: int = 1000) -> list[str]:
return [row[0] for row in result]
def _count_orphaned_draft_variables() -> dict[str, Any]:
class _AppOrphanCounts(TypedDict):
variables: int
files: int
class OrphanedDraftVariableStatsDict(TypedDict):
total_orphaned_variables: int
total_orphaned_files: int
orphaned_app_count: int
orphaned_by_app: dict[str, _AppOrphanCounts]
def _count_orphaned_draft_variables() -> OrphanedDraftVariableStatsDict:
"""
Count orphaned draft variables by app, including associated file counts.
@@ -526,7 +538,7 @@ def _count_orphaned_draft_variables() -> dict[str, Any]:
with db.engine.connect() as conn:
result = conn.execute(sa.text(variables_query))
orphaned_by_app = {}
orphaned_by_app: dict[str, _AppOrphanCounts] = {}
total_files = 0
for row in result:

View File

@@ -271,6 +271,27 @@ class PluginConfig(BaseSettings):
)
class CliApiConfig(BaseSettings):
"""
Configuration for CLI API (for dify-cli to call back from external sandbox environments)
"""
CLI_API_URL: str = Field(
description="CLI API URL for external sandbox (e.g., e2b) to call back.",
default="http://localhost:5001",
)
SANDBOX_DIFY_CLI_ROOT: str = Field(
description="Root directory containing dify-cli binaries (dify-cli-{os}-{arch}).",
default="",
)
DIFY_PORT: int = Field(
description="Dify API port, used by Docker sandbox for socat forwarding.",
default=5001,
)
class MarketplaceConfig(BaseSettings):
"""
Configuration for marketplace
@@ -287,6 +308,27 @@ class MarketplaceConfig(BaseSettings):
)
class CreatorsPlatformConfig(BaseSettings):
"""
Configuration for creators platform
"""
CREATORS_PLATFORM_FEATURES_ENABLED: bool = Field(
description="Enable or disable creators platform features",
default=True,
)
CREATORS_PLATFORM_API_URL: HttpUrl = Field(
description="Creators Platform API URL",
default=HttpUrl("https://creators.dify.ai"),
)
CREATORS_PLATFORM_OAUTH_CLIENT_ID: str = Field(
description="OAuth client_id for the Creators Platform app registered in Dify",
default="",
)
class EndpointConfig(BaseSettings):
"""
Configuration for various application endpoints and URLs
@@ -341,6 +383,15 @@ class FileAccessConfig(BaseSettings):
default="",
)
FILES_API_URL: str = Field(
description="Base URL for storage file ticket API endpoints."
" Used by sandbox containers (internal or external like e2b) that need"
" an absolute, routable address to upload/download files via the API."
" For all-in-one Docker deployments, set to http://localhost."
" For public sandbox environments, set to a public domain or IP.",
default="",
)
FILES_ACCESS_TIMEOUT: int = Field(
description="Expiration time in seconds for file access URLs",
default=300,
@@ -1274,6 +1325,29 @@ class PositionConfig(BaseSettings):
return {item.strip() for item in self.POSITION_TOOL_EXCLUDES.split(",") if item.strip() != ""}
class CollaborationConfig(BaseSettings):
ENABLE_COLLABORATION_MODE: bool = Field(
description="Whether to enable collaboration mode features across the workspace",
default=False,
)
class AgentV2UpgradeConfig(BaseSettings):
"""Feature flags for transparent Agent V2 upgrade."""
AGENT_V2_TRANSPARENT_UPGRADE: bool = Field(
description="Transparently run old apps (chat/completion/agent-chat) through the Agent V2 workflow engine. "
"When enabled, old apps synthesize a virtual workflow at runtime instead of using legacy runners.",
default=False,
)
AGENT_V2_REPLACES_LLM: bool = Field(
description="Transparently replace LLM nodes in workflows with Agent V2 nodes at runtime. "
"LLMNodeData is remapped to AgentV2NodeData with tools=[] (identical behavior).",
default=False,
)
class LoginConfig(BaseSettings):
ENABLE_EMAIL_CODE_LOGIN: bool = Field(
description="whether to enable email code login",
@@ -1375,7 +1449,9 @@ class FeatureConfig(
TriggerConfig,
AsyncWorkflowConfig,
PluginConfig,
CliApiConfig,
MarketplaceConfig,
CreatorsPlatformConfig,
DataSetConfig,
EndpointConfig,
FileAccessConfig,
@@ -1399,6 +1475,8 @@ class FeatureConfig(
WorkflowConfig,
WorkflowNodeExecutionConfig,
WorkspaceConfig,
CollaborationConfig,
AgentV2UpgradeConfig,
LoginConfig,
AccountConfig,
SwaggerUIConfig,

View File

@@ -81,4 +81,20 @@ default_app_templates: Mapping[AppMode, Mapping] = {
},
},
},
# agent default mode (new agent backed by single-node workflow)
AppMode.AGENT: {
"app": {
"mode": AppMode.AGENT,
"enable_site": True,
"enable_api": True,
},
"model_config": {
"model": {
"provider": "openai",
"name": "gpt-4o",
"mode": "chat",
"completion_params": {},
},
},
},
}

View File

@@ -0,0 +1,27 @@
from flask import Blueprint
from flask_restx import Namespace
from libs.external_api import ExternalApi
bp = Blueprint("cli_api", __name__, url_prefix="/cli/api")
api = ExternalApi(
bp,
version="1.0",
title="CLI API",
description="APIs for Dify CLI to call back from external sandbox environments (e.g., e2b)",
)
# Create namespace
cli_api_ns = Namespace("cli_api", description="CLI API operations", path="/")
from .dify_cli import cli_api as _plugin
api.add_namespace(cli_api_ns)
__all__ = [
"_plugin",
"api",
"bp",
"cli_api_ns",
]

View File

@@ -0,0 +1,190 @@
from flask import abort
from flask_restx import Resource
from pydantic import BaseModel
from controllers.cli_api import cli_api_ns
from controllers.cli_api.dify_cli.wraps import get_cli_user_tenant, plugin_data
from controllers.cli_api.wraps import cli_api_only
from controllers.console.wraps import setup_required
from core.app.entities.app_invoke_entities import InvokeFrom
from core.plugin.backwards_invocation.app import PluginAppBackwardsInvocation
from core.plugin.backwards_invocation.base import BaseBackwardsInvocationResponse
from core.plugin.backwards_invocation.model import PluginModelBackwardsInvocation
from core.plugin.backwards_invocation.tool import PluginToolBackwardsInvocation
from core.plugin.entities.request import (
RequestInvokeApp,
RequestInvokeLLM,
RequestInvokeTool,
RequestRequestUploadFile,
)
from core.sandbox.bash.dify_cli import DifyCliToolConfig
from core.session.cli_api import CliContext
from core.skill.entities import ToolInvocationRequest
from core.tools.entities.tool_entities import ToolProviderType
from core.tools.tool_manager import ToolManager
from graphon.file.helpers import get_signed_file_url
from libs.helper import length_prefixed_response
from models.account import Account
from models.model import EndUser, Tenant
class FetchToolItem(BaseModel):
tool_type: str
tool_provider: str
tool_name: str
credential_id: str | None = None
class FetchToolBatchRequest(BaseModel):
tools: list[FetchToolItem]
@cli_api_ns.route("/invoke/llm")
class CliInvokeLLMApi(Resource):
@cli_api_only
@get_cli_user_tenant
@setup_required
@plugin_data(payload_type=RequestInvokeLLM)
def post(
self,
user_model: Account | EndUser,
tenant_model: Tenant,
payload: RequestInvokeLLM,
cli_context: CliContext,
):
def generator():
response = PluginModelBackwardsInvocation.invoke_llm(user_model.id, tenant_model, payload)
return PluginModelBackwardsInvocation.convert_to_event_stream(response)
return length_prefixed_response(0xF, generator())
@cli_api_ns.route("/invoke/tool")
class CliInvokeToolApi(Resource):
@cli_api_only
@get_cli_user_tenant
@setup_required
@plugin_data(payload_type=RequestInvokeTool)
def post(
self,
user_model: Account | EndUser,
tenant_model: Tenant,
payload: RequestInvokeTool,
cli_context: CliContext,
):
tool_type = ToolProviderType.value_of(payload.tool_type)
request = ToolInvocationRequest(
tool_type=tool_type,
provider=payload.provider,
tool_name=payload.tool,
credential_id=payload.credential_id,
)
if cli_context.tool_access and not cli_context.tool_access.is_allowed(request):
abort(403, description=f"Access denied for tool: {payload.provider}/{payload.tool}")
def generator():
return PluginToolBackwardsInvocation.convert_to_event_stream(
PluginToolBackwardsInvocation.invoke_tool(
tenant_id=tenant_model.id,
user_id=user_model.id,
tool_type=tool_type,
provider=payload.provider,
tool_name=payload.tool,
tool_parameters=payload.tool_parameters,
credential_id=payload.credential_id,
),
)
return length_prefixed_response(0xF, generator())
@cli_api_ns.route("/invoke/app")
class CliInvokeAppApi(Resource):
@cli_api_only
@get_cli_user_tenant
@setup_required
@plugin_data(payload_type=RequestInvokeApp)
def post(
self,
user_model: Account | EndUser,
tenant_model: Tenant,
payload: RequestInvokeApp,
cli_context: CliContext,
):
response = PluginAppBackwardsInvocation.invoke_app(
app_id=payload.app_id,
user_id=user_model.id,
tenant_id=tenant_model.id,
conversation_id=payload.conversation_id,
query=payload.query,
stream=payload.response_mode == "streaming",
inputs=payload.inputs,
files=payload.files,
)
return length_prefixed_response(0xF, PluginAppBackwardsInvocation.convert_to_event_stream(response))
@cli_api_ns.route("/upload/file/request")
class CliUploadFileRequestApi(Resource):
@cli_api_only
@get_cli_user_tenant
@setup_required
@plugin_data(payload_type=RequestRequestUploadFile)
def post(
self,
user_model: Account | EndUser,
tenant_model: Tenant,
payload: RequestRequestUploadFile,
cli_context: CliContext,
):
url = get_signed_file_url(
upload_file_id=f"{tenant_model.id}_{user_model.id}_{payload.filename}",
tenant_id=tenant_model.id,
)
return BaseBackwardsInvocationResponse(data={"url": url}).model_dump()
@cli_api_ns.route("/fetch/tools/batch")
class CliFetchToolsBatchApi(Resource):
@cli_api_only
@get_cli_user_tenant
@setup_required
@plugin_data(payload_type=FetchToolBatchRequest)
def post(
self,
user_model: Account | EndUser,
tenant_model: Tenant,
payload: FetchToolBatchRequest,
cli_context: CliContext,
):
tools: list[dict] = []
for item in payload.tools:
provider_type = ToolProviderType.value_of(item.tool_type)
request = ToolInvocationRequest(
tool_type=provider_type,
provider=item.tool_provider,
tool_name=item.tool_name,
credential_id=item.credential_id,
)
if cli_context.tool_access and not cli_context.tool_access.is_allowed(request):
abort(403, description=f"Access denied for tool: {item.tool_provider}/{item.tool_name}")
try:
tool_runtime = ToolManager.get_tool_runtime(
tenant_id=tenant_model.id,
provider_type=provider_type,
provider_id=item.tool_provider,
tool_name=item.tool_name,
invoke_from=InvokeFrom.DEBUGGER,
credential_id=item.credential_id,
)
tool_config = DifyCliToolConfig.create_from_tool(tool_runtime)
tools.append(tool_config.model_dump())
except Exception:
continue
return BaseBackwardsInvocationResponse(data={"tools": tools}).model_dump()

View File

@@ -0,0 +1,137 @@
from collections.abc import Callable
from functools import wraps
from typing import ParamSpec, TypeVar
from flask import current_app, g, request
from flask_login import user_logged_in
from pydantic import BaseModel
from sqlalchemy.orm import Session
from core.session.cli_api import CliApiSession, CliContext
from extensions.ext_database import db
from libs.login import current_user
from models.account import Tenant
from models.model import DefaultEndUserSessionID, EndUser
P = ParamSpec("P")
R = TypeVar("R")
class TenantUserPayload(BaseModel):
tenant_id: str
user_id: str
def get_user(tenant_id: str, user_id: str | None) -> EndUser:
"""
Get current user
NOTE: user_id is not trusted, it could be maliciously set to any value.
As a result, it could only be considered as an end user id.
"""
if not user_id:
user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID
is_anonymous = user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID
try:
with Session(db.engine) as session:
user_model = None
if is_anonymous:
user_model = (
session.query(EndUser)
.where(
EndUser.session_id == user_id,
EndUser.tenant_id == tenant_id,
)
.first()
)
else:
user_model = (
session.query(EndUser)
.where(
EndUser.id == user_id,
EndUser.tenant_id == tenant_id,
)
.first()
)
if not user_model:
user_model = EndUser(
tenant_id=tenant_id,
type="service_api",
is_anonymous=is_anonymous,
session_id=user_id,
)
session.add(user_model)
session.commit()
session.refresh(user_model)
except Exception:
raise ValueError("user not found")
return user_model
def get_cli_user_tenant(view_func: Callable[P, R]):
@wraps(view_func)
def decorated_view(*args: P.args, **kwargs: P.kwargs):
session: CliApiSession | None = getattr(g, "cli_api_session", None)
if session is None:
raise ValueError("session not found")
user_id = session.user_id
tenant_id = session.tenant_id
cli_context = CliContext.model_validate(session.context)
if not user_id:
user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID
try:
tenant_model = (
db.session.query(Tenant)
.where(
Tenant.id == tenant_id,
)
.first()
)
except Exception:
raise ValueError("tenant not found")
if not tenant_model:
raise ValueError("tenant not found")
kwargs["tenant_model"] = tenant_model
kwargs["user_model"] = get_user(tenant_id, user_id)
kwargs["cli_context"] = cli_context
current_app.login_manager._update_request_context_with_user(kwargs["user_model"]) # type: ignore
user_logged_in.send(current_app._get_current_object(), user=current_user) # type: ignore
return view_func(*args, **kwargs)
return decorated_view
def plugin_data(view: Callable[P, R] | None = None, *, payload_type: type[BaseModel]):
def decorator(view_func: Callable[P, R]):
@wraps(view_func)
def decorated_view(*args: P.args, **kwargs: P.kwargs):
try:
data = request.get_json()
except Exception:
raise ValueError("invalid json")
try:
payload = payload_type.model_validate(data)
except Exception as e:
raise ValueError(f"invalid payload: {str(e)}")
kwargs["payload"] = payload
return view_func(*args, **kwargs)
return decorated_view
if view is None:
return decorator
else:
return decorator(view)

View File

@@ -0,0 +1,56 @@
import hashlib
import hmac
import time
from collections.abc import Callable
from functools import wraps
from typing import ParamSpec, TypeVar
from flask import abort, g, request
from core.session.cli_api import CliApiSessionManager
P = ParamSpec("P")
R = TypeVar("R")
SIGNATURE_TTL_SECONDS = 300
def _verify_signature(session_secret: str, timestamp: str, body: bytes, signature: str) -> bool:
expected = hmac.new(
session_secret.encode(),
f"{timestamp}.".encode() + body,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
def cli_api_only(view: Callable[P, R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
session_id = request.headers.get("X-Cli-Api-Session-Id")
timestamp = request.headers.get("X-Cli-Api-Timestamp")
signature = request.headers.get("X-Cli-Api-Signature")
if not session_id or not timestamp or not signature:
abort(401)
try:
ts = int(timestamp)
if abs(time.time() - ts) > SIGNATURE_TTL_SECONDS:
abort(401)
except ValueError:
abort(401)
session = CliApiSessionManager().get(session_id)
if not session:
abort(401)
body = request.get_data()
if not _verify_signature(session.secret, timestamp, body, signature):
abort(401)
g.cli_api_session = session
return view(*args, **kwargs)
return decorated

View File

@@ -41,6 +41,7 @@ from . import (
init_validate,
notification,
ping,
sandbox_files,
setup,
spec,
version,
@@ -52,6 +53,7 @@ from .app import (
agent,
annotation,
app,
app_asset,
audio,
completion,
conversation,
@@ -62,6 +64,7 @@ from .app import (
model_config,
ops_trace,
site,
skills,
statistic,
workflow,
workflow_app_log,
@@ -130,6 +133,7 @@ from .workspace import (
model_providers,
models,
plugin,
sandbox_providers,
tool_providers,
trigger_providers,
workspace,

View File

@@ -2,6 +2,7 @@ import csv
import io
from collections.abc import Callable
from functools import wraps
from typing import cast
from flask import request
from flask_restx import Resource
@@ -17,7 +18,7 @@ from core.db.session_factory import session_factory
from extensions.ext_database import db
from libs.token import extract_access_token
from models.model import App, ExporleBanner, InstalledApp, RecommendedApp, TrialApp
from services.billing_service import BillingService
from services.billing_service import BillingService, LangContentDict
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
@@ -328,7 +329,7 @@ class UpsertNotificationApi(Resource):
def post(self):
payload = UpsertNotificationPayload.model_validate(console_ns.payload)
result = BillingService.upsert_notification(
contents=[c.model_dump() for c in payload.contents],
contents=[cast(LangContentDict, c.model_dump()) for c in payload.contents],
frequency=payload.frequency,
status=payload.status,
notification_id=payload.notification_id,

View File

@@ -7,7 +7,7 @@ from flask import request
from flask_restx import Resource
from graphon.enums import WorkflowExecutionStatus
from graphon.file import helpers as file_helpers
from pydantic import AliasChoices, BaseModel, ConfigDict, Field, computed_field, field_validator
from pydantic import AliasChoices, BaseModel, Field, computed_field, field_validator
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from werkzeug.exceptions import BadRequest
@@ -26,9 +26,11 @@ from controllers.console.wraps import (
setup_required,
)
from core.ops.ops_trace_manager import OpsTraceManager
from core.rag.entities import PreProcessingRule, Rule, Segmentation
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.trigger.constants import TRIGGER_NODE_TYPES
from extensions.ext_database import db
from fields.base import ResponseModel
from libs.login import current_account_with_tenant, login_required
from models import App, DatasetPermissionEnum, Workflow
from models.model import IconType
@@ -41,10 +43,7 @@ from services.entities.knowledge_entities.knowledge_entities import (
NotionIcon,
NotionInfo,
NotionPage,
PreProcessingRule,
RerankingModel,
Rule,
Segmentation,
WebsiteInfo,
WeightKeywordSetting,
WeightModel,
@@ -52,7 +51,7 @@ from services.entities.knowledge_entities.knowledge_entities import (
)
from services.feature_service import FeatureService
ALLOW_CREATE_APP_MODES = ["chat", "agent-chat", "advanced-chat", "workflow", "completion"]
ALLOW_CREATE_APP_MODES = ["chat", "agent-chat", "advanced-chat", "workflow", "completion", "agent"]
register_enum_models(console_ns, IconType)
@@ -62,7 +61,7 @@ _logger = logging.getLogger(__name__)
class AppListQuery(BaseModel):
page: int = Field(default=1, ge=1, le=99999, description="Page number (1-99999)")
limit: int = Field(default=20, ge=1, le=100, description="Page size (1-100)")
mode: Literal["completion", "chat", "advanced-chat", "workflow", "agent-chat", "channel", "all"] = Field(
mode: Literal["completion", "chat", "advanced-chat", "workflow", "agent-chat", "agent", "channel", "all"] = Field(
default="all", description="App mode filter"
)
name: str | None = Field(default=None, description="Filter by app name")
@@ -94,7 +93,9 @@ class AppListQuery(BaseModel):
class CreateAppPayload(BaseModel):
name: str = Field(..., min_length=1, description="App name")
description: str | None = Field(default=None, description="App description (max 400 chars)", max_length=400)
mode: Literal["chat", "agent-chat", "advanced-chat", "workflow", "completion"] = Field(..., description="App mode")
mode: Literal["chat", "agent-chat", "advanced-chat", "workflow", "completion", "agent"] = Field(
..., description="App mode"
)
icon_type: IconType | None = Field(default=None, description="Icon type")
icon: str | None = Field(default=None, description="Icon")
icon_background: str | None = Field(default=None, description="Icon background color")
@@ -155,16 +156,6 @@ class AppTracePayload(BaseModel):
type JSONValue = Any
class ResponseModel(BaseModel):
model_config = ConfigDict(
from_attributes=True,
extra="ignore",
populate_by_name=True,
serialize_by_alias=True,
protected_namespaces=(),
)
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())

View File

@@ -0,0 +1,333 @@
from flask import request
from flask_restx import Resource
from pydantic import BaseModel, Field, field_validator
from controllers.console import console_ns
from controllers.console.app.error import (
AppAssetNodeNotFoundError,
AppAssetPathConflictError,
)
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from core.app.entities.app_asset_entities import BatchUploadNode
from libs.login import current_account_with_tenant, login_required
from models import App
from models.model import AppMode
from services.app_asset_service import AppAssetService
from services.errors.app_asset import (
AppAssetNodeNotFoundError as ServiceNodeNotFoundError,
)
from services.errors.app_asset import (
AppAssetParentNotFoundError,
)
from services.errors.app_asset import (
AppAssetPathConflictError as ServicePathConflictError,
)
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
class CreateFolderPayload(BaseModel):
name: str = Field(..., min_length=1, max_length=255)
parent_id: str | None = None
class CreateFilePayload(BaseModel):
name: str = Field(..., min_length=1, max_length=255)
parent_id: str | None = None
@field_validator("name", mode="before")
@classmethod
def strip_name(cls, v: str) -> str:
return v.strip() if isinstance(v, str) else v
@field_validator("parent_id", mode="before")
@classmethod
def empty_to_none(cls, v: str | None) -> str | None:
return v or None
class GetUploadUrlPayload(BaseModel):
name: str = Field(..., min_length=1, max_length=255)
size: int = Field(..., ge=0)
parent_id: str | None = None
@field_validator("name", mode="before")
@classmethod
def strip_name(cls, v: str) -> str:
return v.strip() if isinstance(v, str) else v
@field_validator("parent_id", mode="before")
@classmethod
def empty_to_none(cls, v: str | None) -> str | None:
return v or None
class BatchUploadPayload(BaseModel):
children: list[BatchUploadNode] = Field(..., min_length=1)
parent_id: str | None = None
@field_validator("parent_id", mode="before")
@classmethod
def empty_to_none(cls, v: str | None) -> str | None:
return v or None
class UpdateFileContentPayload(BaseModel):
content: str
class RenameNodePayload(BaseModel):
name: str = Field(..., min_length=1, max_length=255)
class MoveNodePayload(BaseModel):
parent_id: str | None = None
class ReorderNodePayload(BaseModel):
after_node_id: str | None = Field(default=None, description="Place after this node, None for first position")
def reg(cls: type[BaseModel]) -> None:
console_ns.schema_model(cls.__name__, cls.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0))
reg(CreateFolderPayload)
reg(CreateFilePayload)
reg(GetUploadUrlPayload)
reg(BatchUploadNode)
reg(BatchUploadPayload)
reg(UpdateFileContentPayload)
reg(RenameNodePayload)
reg(MoveNodePayload)
reg(ReorderNodePayload)
@console_ns.route("/apps/<string:app_id>/assets/tree")
class AppAssetTreeResource(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App):
current_user, _ = current_account_with_tenant()
tree = AppAssetService.get_asset_tree(app_model, current_user.id)
return {"children": [view.model_dump() for view in tree.transform()]}
@console_ns.route("/apps/<string:app_id>/assets/folders")
class AppAssetFolderResource(Resource):
@console_ns.expect(console_ns.models[CreateFolderPayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App):
current_user, _ = current_account_with_tenant()
payload = CreateFolderPayload.model_validate(console_ns.payload or {})
try:
node = AppAssetService.create_folder(app_model, current_user.id, payload.name, payload.parent_id)
return node.model_dump(), 201
except AppAssetParentNotFoundError:
raise AppAssetNodeNotFoundError()
except ServicePathConflictError:
raise AppAssetPathConflictError()
@console_ns.route("/apps/<string:app_id>/assets/files/<string:node_id>")
class AppAssetFileDetailResource(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
try:
content = AppAssetService.get_file_content(app_model, current_user.id, node_id)
return {"content": content.decode("utf-8", errors="replace")}
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()
@console_ns.expect(console_ns.models[UpdateFileContentPayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def put(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
file = request.files.get("file")
if file:
content = file.read()
else:
payload = UpdateFileContentPayload.model_validate(console_ns.payload or {})
content = payload.content.encode("utf-8")
try:
node = AppAssetService.update_file_content(app_model, current_user.id, node_id, content)
return node.model_dump()
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()
@console_ns.route("/apps/<string:app_id>/assets/nodes/<string:node_id>")
class AppAssetNodeResource(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def delete(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
try:
AppAssetService.delete_node(app_model, current_user.id, node_id)
return {"result": "success"}, 200
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()
@console_ns.route("/apps/<string:app_id>/assets/nodes/<string:node_id>/rename")
class AppAssetNodeRenameResource(Resource):
@console_ns.expect(console_ns.models[RenameNodePayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
payload = RenameNodePayload.model_validate(console_ns.payload or {})
try:
node = AppAssetService.rename_node(app_model, current_user.id, node_id, payload.name)
return node.model_dump()
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()
except ServicePathConflictError:
raise AppAssetPathConflictError()
@console_ns.route("/apps/<string:app_id>/assets/nodes/<string:node_id>/move")
class AppAssetNodeMoveResource(Resource):
@console_ns.expect(console_ns.models[MoveNodePayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
payload = MoveNodePayload.model_validate(console_ns.payload or {})
try:
node = AppAssetService.move_node(app_model, current_user.id, node_id, payload.parent_id)
return node.model_dump()
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()
except AppAssetParentNotFoundError:
raise AppAssetNodeNotFoundError()
except ServicePathConflictError:
raise AppAssetPathConflictError()
@console_ns.route("/apps/<string:app_id>/assets/nodes/<string:node_id>/reorder")
class AppAssetNodeReorderResource(Resource):
@console_ns.expect(console_ns.models[ReorderNodePayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
payload = ReorderNodePayload.model_validate(console_ns.payload or {})
try:
node = AppAssetService.reorder_node(app_model, current_user.id, node_id, payload.after_node_id)
return node.model_dump()
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()
@console_ns.route("/apps/<string:app_id>/assets/files/<string:node_id>/download-url")
class AppAssetFileDownloadUrlResource(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
try:
download_url = AppAssetService.get_file_download_url(app_model, current_user.id, node_id)
return {"download_url": download_url}
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()
@console_ns.route("/apps/<string:app_id>/assets/files/upload")
class AppAssetFileUploadUrlResource(Resource):
@console_ns.expect(console_ns.models[GetUploadUrlPayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App):
current_user, _ = current_account_with_tenant()
payload = GetUploadUrlPayload.model_validate(console_ns.payload or {})
try:
node, upload_url = AppAssetService.get_file_upload_url(
app_model, current_user.id, payload.name, payload.size, payload.parent_id
)
return {"node": node.model_dump(), "upload_url": upload_url}, 201
except AppAssetParentNotFoundError:
raise AppAssetNodeNotFoundError()
except ServicePathConflictError:
raise AppAssetPathConflictError()
@console_ns.route("/apps/<string:app_id>/assets/batch-upload")
class AppAssetBatchUploadResource(Resource):
@console_ns.expect(console_ns.models[BatchUploadPayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App):
"""
Create nodes from tree structure and return upload URLs.
Input:
{
"parent_id": "optional-target-folder-id",
"children": [
{"name": "folder1", "node_type": "folder", "children": [
{"name": "file1.txt", "node_type": "file", "size": 1024}
]},
{"name": "root.txt", "node_type": "file", "size": 512}
]
}
Output:
{
"children": [
{"id": "xxx", "name": "folder1", "node_type": "folder", "children": [
{"id": "yyy", "name": "file1.txt", "node_type": "file", "size": 1024, "upload_url": "..."}
]},
{"id": "zzz", "name": "root.txt", "node_type": "file", "size": 512, "upload_url": "..."}
]
}
"""
current_user, _ = current_account_with_tenant()
payload = BatchUploadPayload.model_validate(console_ns.payload or {})
try:
result_children = AppAssetService.batch_create_from_tree(
app_model,
current_user.id,
payload.children,
parent_id=payload.parent_id,
)
return {"children": [child.model_dump() for child in result_children]}, 201
except AppAssetParentNotFoundError:
raise AppAssetNodeNotFoundError()
except ServicePathConflictError:
raise AppAssetPathConflictError()

View File

@@ -1,6 +1,6 @@
from flask_restx import Resource, fields, marshal_with
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.orm import sessionmaker
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import (
@@ -71,7 +71,7 @@ class AppImportApi(Resource):
args = AppImportPayload.model_validate(console_ns.payload)
# Create service with session
with Session(db.engine) as session:
with sessionmaker(db.engine).begin() as session:
import_service = AppDslService(session)
# Import app
account = current_user

View File

@@ -161,7 +161,7 @@ class ChatMessageApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT])
@get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.AGENT])
@edit_permission_required
def post(self, app_model):
args_model = ChatMessagePayload.model_validate(console_ns.payload)
@@ -215,7 +215,7 @@ class ChatMessageStopApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT])
@get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.AGENT])
def post(self, app_model, task_id):
if not isinstance(current_user, Account):
raise ValueError("current_user must be an Account instance")

View File

@@ -121,3 +121,21 @@ class NeedAddIdsError(BaseHTTPException):
error_code = "need_add_ids"
description = "Need to add ids."
code = 400
class AppAssetNodeNotFoundError(BaseHTTPException):
error_code = "app_asset_node_not_found"
description = "App asset node not found."
code = 404
class AppAssetFileRequiredError(BaseHTTPException):
error_code = "app_asset_file_required"
description = "File is required."
code = 400
class AppAssetPathConflictError(BaseHTTPException):
error_code = "app_asset_path_conflict"
description = "Path already exists."
code = 409

View File

@@ -238,7 +238,7 @@ class ChatMessageListApi(Resource):
@login_required
@account_initialization_required
@setup_required
@get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT])
@get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.AGENT])
@marshal_with(message_infinite_scroll_pagination_model)
@edit_permission_required
def get(self, app_model):
@@ -394,7 +394,7 @@ class MessageSuggestedQuestionApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT])
@get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.AGENT])
def get(self, app_model, message_id):
current_user, _ = current_account_with_tenant()
message_id = str(message_id)

View File

@@ -0,0 +1,38 @@
from flask import request
from flask_restx import Resource
from controllers.console import console_ns
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, current_account_with_tenant, setup_required
from libs.login import login_required
from models import App
from models.model import AppMode
from services.skill_service import SkillService
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/nodes/llm/skills")
class NodeSkillsApi(Resource):
"""Extract tool dependencies from an LLM node's skill prompts.
The client sends the full node ``data`` object in the request body.
The server real-time builds a ``SkillBundle`` from the current draft
``.md`` assets and resolves transitive tool dependencies — no cached
bundle is used.
"""
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App):
current_user, _ = current_account_with_tenant()
node_data = request.get_json(force=True)
if not isinstance(node_data, dict):
return {"tool_dependencies": []}
tool_deps = SkillService.extract_tool_dependencies(
app=app_model,
node_data=node_data,
user_id=current_user.id,
)
return {"tool_dependencies": [d.model_dump() for d in tool_deps]}

View File

@@ -221,7 +221,7 @@ class DraftWorkflowApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@marshal_with(workflow_model)
@edit_permission_required
def get(self, app_model: App):
@@ -241,7 +241,7 @@ class DraftWorkflowApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@console_ns.doc("sync_draft_workflow")
@console_ns.doc(description="Sync draft workflow configuration")
@console_ns.expect(console_ns.models[SyncDraftWorkflowPayload.__name__])
@@ -325,7 +325,7 @@ class AdvancedChatDraftWorkflowRunApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.AGENT])
@edit_permission_required
def post(self, app_model: App):
"""
@@ -371,7 +371,7 @@ class AdvancedChatDraftRunIterationNodeApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.AGENT])
@edit_permission_required
def post(self, app_model: App, node_id: str):
"""
@@ -447,7 +447,7 @@ class AdvancedChatDraftRunLoopNodeApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.AGENT])
@edit_permission_required
def post(self, app_model: App, node_id: str):
"""
@@ -549,7 +549,7 @@ class AdvancedChatDraftHumanInputFormPreviewApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.AGENT])
@edit_permission_required
def post(self, app_model: App, node_id: str):
"""
@@ -578,7 +578,7 @@ class AdvancedChatDraftHumanInputFormRunApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.AGENT])
@edit_permission_required
def post(self, app_model: App, node_id: str):
"""
@@ -733,7 +733,7 @@ class WorkflowTaskStopApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@edit_permission_required
def post(self, app_model: App, task_id: str):
"""
@@ -761,7 +761,7 @@ class DraftWorkflowNodeRunApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@marshal_with(workflow_run_node_execution_model)
@edit_permission_required
def post(self, app_model: App, node_id: str):
@@ -807,7 +807,7 @@ class PublishedWorkflowApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@marshal_with(workflow_model)
@edit_permission_required
def get(self, app_model: App):
@@ -825,7 +825,7 @@ class PublishedWorkflowApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@edit_permission_required
def post(self, app_model: App):
"""
@@ -869,7 +869,7 @@ class DefaultBlockConfigsApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@edit_permission_required
def get(self, app_model: App):
"""
@@ -891,7 +891,7 @@ class DefaultBlockConfigApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@edit_permission_required
def get(self, app_model: App, block_type: str):
"""
@@ -956,7 +956,7 @@ class PublishedAllWorkflowApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@marshal_with(workflow_pagination_model)
@edit_permission_required
def get(self, app_model: App):
@@ -1005,7 +1005,7 @@ class DraftWorkflowRestoreApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@edit_permission_required
def post(self, app_model: App, workflow_id: str):
current_user, _ = current_account_with_tenant()
@@ -1043,7 +1043,7 @@ class WorkflowByIdApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@marshal_with(workflow_model)
@edit_permission_required
def patch(self, app_model: App, workflow_id: str):
@@ -1083,7 +1083,7 @@ class WorkflowByIdApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@edit_permission_required
def delete(self, app_model: App, workflow_id: str):
"""
@@ -1118,7 +1118,7 @@ class DraftWorkflowNodeLastRunApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@marshal_with(workflow_run_node_execution_model)
def get(self, app_model: App, node_id: str):
srv = WorkflowService()

View File

@@ -0,0 +1,322 @@
import logging
from flask_restx import Resource, marshal_with
from pydantic import BaseModel, Field, TypeAdapter
from controllers.console import console_ns
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from fields.member_fields import AccountWithRole
from fields.workflow_comment_fields import (
workflow_comment_basic_fields,
workflow_comment_create_fields,
workflow_comment_detail_fields,
workflow_comment_reply_create_fields,
workflow_comment_reply_update_fields,
workflow_comment_resolve_fields,
workflow_comment_update_fields,
)
from libs.login import current_user, login_required
from models import App
from services.account_service import TenantService
from services.workflow_comment_service import WorkflowCommentService
logger = logging.getLogger(__name__)
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
class WorkflowCommentCreatePayload(BaseModel):
position_x: float = Field(..., description="Comment X position")
position_y: float = Field(..., description="Comment Y position")
content: str = Field(..., description="Comment content")
mentioned_user_ids: list[str] = Field(default_factory=list, description="Mentioned user IDs")
class WorkflowCommentUpdatePayload(BaseModel):
content: str = Field(..., description="Comment content")
position_x: float | None = Field(default=None, description="Comment X position")
position_y: float | None = Field(default=None, description="Comment Y position")
mentioned_user_ids: list[str] = Field(default_factory=list, description="Mentioned user IDs")
class WorkflowCommentReplyCreatePayload(BaseModel):
content: str = Field(..., description="Reply content")
mentioned_user_ids: list[str] = Field(default_factory=list, description="Mentioned user IDs")
class WorkflowCommentReplyUpdatePayload(BaseModel):
content: str = Field(..., description="Reply content")
mentioned_user_ids: list[str] = Field(default_factory=list, description="Mentioned user IDs")
class WorkflowCommentMentionUsersResponse(BaseModel):
users: list[AccountWithRole] = Field(description="Mentionable users")
for model in (
WorkflowCommentCreatePayload,
WorkflowCommentUpdatePayload,
WorkflowCommentReplyCreatePayload,
WorkflowCommentReplyUpdatePayload,
):
console_ns.schema_model(model.__name__, model.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0))
for model in (AccountWithRole, WorkflowCommentMentionUsersResponse):
console_ns.schema_model(model.__name__, model.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0))
workflow_comment_basic_model = console_ns.model("WorkflowCommentBasic", workflow_comment_basic_fields)
workflow_comment_detail_model = console_ns.model("WorkflowCommentDetail", workflow_comment_detail_fields)
workflow_comment_create_model = console_ns.model("WorkflowCommentCreate", workflow_comment_create_fields)
workflow_comment_update_model = console_ns.model("WorkflowCommentUpdate", workflow_comment_update_fields)
workflow_comment_resolve_model = console_ns.model("WorkflowCommentResolve", workflow_comment_resolve_fields)
workflow_comment_reply_create_model = console_ns.model(
"WorkflowCommentReplyCreate", workflow_comment_reply_create_fields
)
workflow_comment_reply_update_model = console_ns.model(
"WorkflowCommentReplyUpdate", workflow_comment_reply_update_fields
)
workflow_comment_mention_users_model = console_ns.models[WorkflowCommentMentionUsersResponse.__name__]
@console_ns.route("/apps/<uuid:app_id>/workflow/comments")
class WorkflowCommentListApi(Resource):
"""API for listing and creating workflow comments."""
@console_ns.doc("list_workflow_comments")
@console_ns.doc(description="Get all comments for a workflow")
@console_ns.doc(params={"app_id": "Application ID"})
@console_ns.response(200, "Comments retrieved successfully", workflow_comment_basic_model)
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_basic_model, envelope="data")
def get(self, app_model: App):
"""Get all comments for a workflow."""
comments = WorkflowCommentService.get_comments(tenant_id=current_user.current_tenant_id, app_id=app_model.id)
return comments
@console_ns.doc("create_workflow_comment")
@console_ns.doc(description="Create a new workflow comment")
@console_ns.doc(params={"app_id": "Application ID"})
@console_ns.expect(console_ns.models[WorkflowCommentCreatePayload.__name__])
@console_ns.response(201, "Comment created successfully", workflow_comment_create_model)
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_create_model)
def post(self, app_model: App):
"""Create a new workflow comment."""
payload = WorkflowCommentCreatePayload.model_validate(console_ns.payload or {})
result = WorkflowCommentService.create_comment(
tenant_id=current_user.current_tenant_id,
app_id=app_model.id,
created_by=current_user.id,
content=payload.content,
position_x=payload.position_x,
position_y=payload.position_y,
mentioned_user_ids=payload.mentioned_user_ids,
)
return result, 201
@console_ns.route("/apps/<uuid:app_id>/workflow/comments/<string:comment_id>")
class WorkflowCommentDetailApi(Resource):
"""API for managing individual workflow comments."""
@console_ns.doc("get_workflow_comment")
@console_ns.doc(description="Get a specific workflow comment")
@console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"})
@console_ns.response(200, "Comment retrieved successfully", workflow_comment_detail_model)
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_detail_model)
def get(self, app_model: App, comment_id: str):
"""Get a specific workflow comment."""
comment = WorkflowCommentService.get_comment(
tenant_id=current_user.current_tenant_id, app_id=app_model.id, comment_id=comment_id
)
return comment
@console_ns.doc("update_workflow_comment")
@console_ns.doc(description="Update a workflow comment")
@console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"})
@console_ns.expect(console_ns.models[WorkflowCommentUpdatePayload.__name__])
@console_ns.response(200, "Comment updated successfully", workflow_comment_update_model)
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_update_model)
def put(self, app_model: App, comment_id: str):
"""Update a workflow comment."""
payload = WorkflowCommentUpdatePayload.model_validate(console_ns.payload or {})
result = WorkflowCommentService.update_comment(
tenant_id=current_user.current_tenant_id,
app_id=app_model.id,
comment_id=comment_id,
user_id=current_user.id,
content=payload.content,
position_x=payload.position_x,
position_y=payload.position_y,
mentioned_user_ids=payload.mentioned_user_ids,
)
return result
@console_ns.doc("delete_workflow_comment")
@console_ns.doc(description="Delete a workflow comment")
@console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"})
@console_ns.response(204, "Comment deleted successfully")
@login_required
@setup_required
@account_initialization_required
@get_app_model()
def delete(self, app_model: App, comment_id: str):
"""Delete a workflow comment."""
WorkflowCommentService.delete_comment(
tenant_id=current_user.current_tenant_id,
app_id=app_model.id,
comment_id=comment_id,
user_id=current_user.id,
)
return {"result": "success"}, 204
@console_ns.route("/apps/<uuid:app_id>/workflow/comments/<string:comment_id>/resolve")
class WorkflowCommentResolveApi(Resource):
"""API for resolving and reopening workflow comments."""
@console_ns.doc("resolve_workflow_comment")
@console_ns.doc(description="Resolve a workflow comment")
@console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"})
@console_ns.response(200, "Comment resolved successfully", workflow_comment_resolve_model)
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_resolve_model)
def post(self, app_model: App, comment_id: str):
"""Resolve a workflow comment."""
comment = WorkflowCommentService.resolve_comment(
tenant_id=current_user.current_tenant_id,
app_id=app_model.id,
comment_id=comment_id,
user_id=current_user.id,
)
return comment
@console_ns.route("/apps/<uuid:app_id>/workflow/comments/<string:comment_id>/replies")
class WorkflowCommentReplyApi(Resource):
"""API for managing comment replies."""
@console_ns.doc("create_workflow_comment_reply")
@console_ns.doc(description="Add a reply to a workflow comment")
@console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"})
@console_ns.expect(console_ns.models[WorkflowCommentReplyCreatePayload.__name__])
@console_ns.response(201, "Reply created successfully", workflow_comment_reply_create_model)
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_reply_create_model)
def post(self, app_model: App, comment_id: str):
"""Add a reply to a workflow comment."""
# Validate comment access first
WorkflowCommentService.validate_comment_access(
comment_id=comment_id, tenant_id=current_user.current_tenant_id, app_id=app_model.id
)
payload = WorkflowCommentReplyCreatePayload.model_validate(console_ns.payload or {})
result = WorkflowCommentService.create_reply(
comment_id=comment_id,
content=payload.content,
created_by=current_user.id,
mentioned_user_ids=payload.mentioned_user_ids,
)
return result, 201
@console_ns.route("/apps/<uuid:app_id>/workflow/comments/<string:comment_id>/replies/<string:reply_id>")
class WorkflowCommentReplyDetailApi(Resource):
"""API for managing individual comment replies."""
@console_ns.doc("update_workflow_comment_reply")
@console_ns.doc(description="Update a comment reply")
@console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID", "reply_id": "Reply ID"})
@console_ns.expect(console_ns.models[WorkflowCommentReplyUpdatePayload.__name__])
@console_ns.response(200, "Reply updated successfully", workflow_comment_reply_update_model)
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_reply_update_model)
def put(self, app_model: App, comment_id: str, reply_id: str):
"""Update a comment reply."""
# Validate comment access first
WorkflowCommentService.validate_comment_access(
comment_id=comment_id, tenant_id=current_user.current_tenant_id, app_id=app_model.id
)
payload = WorkflowCommentReplyUpdatePayload.model_validate(console_ns.payload or {})
reply = WorkflowCommentService.update_reply(
reply_id=reply_id,
user_id=current_user.id,
content=payload.content,
mentioned_user_ids=payload.mentioned_user_ids,
)
return reply
@console_ns.doc("delete_workflow_comment_reply")
@console_ns.doc(description="Delete a comment reply")
@console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID", "reply_id": "Reply ID"})
@console_ns.response(204, "Reply deleted successfully")
@login_required
@setup_required
@account_initialization_required
@get_app_model()
def delete(self, app_model: App, comment_id: str, reply_id: str):
"""Delete a comment reply."""
# Validate comment access first
WorkflowCommentService.validate_comment_access(
comment_id=comment_id, tenant_id=current_user.current_tenant_id, app_id=app_model.id
)
WorkflowCommentService.delete_reply(reply_id=reply_id, user_id=current_user.id)
return {"result": "success"}, 204
@console_ns.route("/apps/<uuid:app_id>/workflow/comments/mention-users")
class WorkflowCommentMentionUsersApi(Resource):
"""API for getting mentionable users for workflow comments."""
@console_ns.doc("workflow_comment_mention_users")
@console_ns.doc(description="Get all users in current tenant for mentions")
@console_ns.doc(params={"app_id": "Application ID"})
@console_ns.response(200, "Mentionable users retrieved successfully", workflow_comment_mention_users_model)
@login_required
@setup_required
@account_initialization_required
@get_app_model()
def get(self, app_model: App):
"""Get all users in current tenant for mentions."""
members = TenantService.get_tenant_members(current_user.current_tenant)
member_models = TypeAdapter(list[AccountWithRole]).validate_python(members, from_attributes=True)
response = WorkflowCommentMentionUsersResponse(users=member_models)
return response.model_dump(mode="json"), 200

View File

@@ -208,7 +208,7 @@ def _api_prerequisite[**P, R](f: Callable[P, R]) -> Callable[P, R | Response]:
@login_required
@account_initialization_required
@edit_permission_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@wraps(f)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R | Response:
return f(*args, **kwargs)

View File

@@ -207,7 +207,7 @@ class AdvancedChatAppWorkflowRunListApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.AGENT])
@marshal_with(advanced_chat_workflow_run_pagination_model)
def get(self, app_model: App):
"""
@@ -305,7 +305,7 @@ class AdvancedChatAppWorkflowRunCountApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.AGENT])
@marshal_with(workflow_run_count_model)
def get(self, app_model: App):
"""
@@ -349,7 +349,7 @@ class WorkflowRunListApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@marshal_with(workflow_run_pagination_model)
def get(self, app_model: App):
"""
@@ -397,7 +397,7 @@ class WorkflowRunCountApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@marshal_with(workflow_run_count_model)
def get(self, app_model: App):
"""
@@ -434,7 +434,7 @@ class WorkflowRunDetailApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@marshal_with(workflow_run_detail_model)
def get(self, app_model: App, run_id):
"""
@@ -458,7 +458,7 @@ class WorkflowRunNodeExecutionListApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW, AppMode.AGENT])
@marshal_with(workflow_run_node_execution_list_model)
def get(self, app_model: App, run_id):
"""

View File

@@ -66,13 +66,13 @@ class WebhookTriggerApi(Resource):
with sessionmaker(db.engine).begin() as session:
# Get webhook trigger for this app and node
webhook_trigger = (
session.query(WorkflowWebhookTrigger)
webhook_trigger = session.scalar(
select(WorkflowWebhookTrigger)
.where(
WorkflowWebhookTrigger.app_id == app_model.id,
WorkflowWebhookTrigger.node_id == node_id,
)
.first()
.limit(1)
)
if not webhook_trigger:

View File

@@ -3,7 +3,7 @@ import secrets
from flask import request
from flask_restx import Resource
from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, Field
from sqlalchemy.orm import sessionmaker
from controllers.common.schema import register_schema_models
@@ -20,35 +20,18 @@ from controllers.console.wraps import email_password_login_enabled, setup_requir
from events.tenant_event import tenant_was_created
from extensions.ext_database import db
from libs.helper import EmailStr, extract_remote_ip
from libs.password import hash_password, valid_password
from libs.password import hash_password
from services.account_service import AccountService, TenantService
from services.entities.auth_entities import (
ForgotPasswordCheckPayload,
ForgotPasswordResetPayload,
ForgotPasswordSendPayload,
)
from services.feature_service import FeatureService
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
class ForgotPasswordSendPayload(BaseModel):
email: EmailStr = Field(...)
language: str | None = Field(default=None)
class ForgotPasswordCheckPayload(BaseModel):
email: EmailStr = Field(...)
code: str = Field(...)
token: str = Field(...)
class ForgotPasswordResetPayload(BaseModel):
token: str = Field(...)
new_password: str = Field(...)
password_confirm: str = Field(...)
@field_validator("new_password", "password_confirm")
@classmethod
def validate_password(cls, value: str) -> str:
return valid_password(value)
class ForgotPasswordEmailResponse(BaseModel):
result: str = Field(description="Operation result")
data: str | None = Field(default=None, description="Reset token")

View File

@@ -1,5 +1,3 @@
from typing import Any
import flask_login
from flask import make_response, request
from flask_restx import Resource
@@ -42,8 +40,9 @@ from libs.token import (
set_csrf_token_to_cookie,
set_refresh_token_to_cookie,
)
from services.account_service import AccountService, RegisterService, TenantService
from services.account_service import AccountService, InvitationDetailDict, RegisterService, TenantService
from services.billing_service import BillingService
from services.entities.auth_entities import LoginPayloadBase
from services.errors.account import AccountRegisterError
from services.errors.workspace import WorkSpaceNotAllowedCreateError, WorkspacesLimitExceededError
from services.feature_service import FeatureService
@@ -51,9 +50,7 @@ from services.feature_service import FeatureService
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
class LoginPayload(BaseModel):
email: EmailStr = Field(..., description="Email address")
password: str = Field(..., description="Password")
class LoginPayload(LoginPayloadBase):
remember_me: bool = Field(default=False, description="Remember me flag")
invite_token: str | None = Field(default=None, description="Invitation token")
@@ -101,7 +98,7 @@ class LoginApi(Resource):
raise EmailPasswordLoginLimitError()
invite_token = args.invite_token
invitation_data: dict[str, Any] | None = None
invitation_data: InvitationDetailDict | None = None
if invite_token:
invitation_data = RegisterService.get_invitation_with_case_fallback(None, request_email, invite_token)
if invitation_data is None:

View File

@@ -3,6 +3,7 @@ import logging
from flask import request
from flask_restx import Resource
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from controllers.common.schema import register_schema_models
@@ -86,8 +87,8 @@ class CustomizedPipelineTemplateApi(Resource):
@enterprise_license_required
def post(self, template_id: str):
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
template = (
session.query(PipelineCustomizedTemplate).where(PipelineCustomizedTemplate.id == template_id).first()
template = session.scalar(
select(PipelineCustomizedTemplate).where(PipelineCustomizedTemplate.id == template_id).limit(1)
)
if not template:
raise ValueError("Customized pipeline template not found.")

View File

@@ -1,3 +1,5 @@
from typing import TypedDict
from flask import request
from flask_restx import Resource
from pydantic import BaseModel, Field
@@ -11,6 +13,21 @@ from services.billing_service import BillingService
_FALLBACK_LANG = "en-US"
class NotificationItemDict(TypedDict):
notification_id: str | None
frequency: str | None
lang: str
title: str
subtitle: str
body: str
title_pic_url: str
class NotificationResponseDict(TypedDict):
should_show: bool
notifications: list[NotificationItemDict]
def _pick_lang_content(contents: dict, lang: str) -> dict:
"""Return the single LangContent for *lang*, falling back to English."""
return contents.get(lang) or contents.get(_FALLBACK_LANG) or next(iter(contents.values()), {})
@@ -45,28 +62,30 @@ class NotificationApi(Resource):
result = BillingService.get_account_notification(str(current_user.id))
# Proto JSON uses camelCase field names (Kratos default marshaling).
response: NotificationResponseDict
if not result.get("shouldShow"):
return {"should_show": False, "notifications": []}, 200
response = {"should_show": False, "notifications": []}
return response, 200
lang = current_user.interface_language or _FALLBACK_LANG
notifications = []
notifications: list[NotificationItemDict] = []
for notification in result.get("notifications") or []:
contents: dict = notification.get("contents") or {}
lang_content = _pick_lang_content(contents, lang)
notifications.append(
{
"notification_id": notification.get("notificationId"),
"frequency": notification.get("frequency"),
"lang": lang_content.get("lang", lang),
"title": lang_content.get("title", ""),
"subtitle": lang_content.get("subtitle", ""),
"body": lang_content.get("body", ""),
"title_pic_url": lang_content.get("titlePicUrl", ""),
}
)
item: NotificationItemDict = {
"notification_id": notification.get("notificationId"),
"frequency": notification.get("frequency"),
"lang": lang_content.get("lang", lang),
"title": lang_content.get("title", ""),
"subtitle": lang_content.get("subtitle", ""),
"body": lang_content.get("body", ""),
"title_pic_url": lang_content.get("titlePicUrl", ""),
}
notifications.append(item)
return {"should_show": bool(notifications), "notifications": notifications}, 200
response = {"should_show": bool(notifications), "notifications": notifications}
return response, 200
@console_ns.route("/notification/dismiss")

View File

@@ -0,0 +1,103 @@
from __future__ import annotations
from fastapi.encoders import jsonable_encoder
from flask import request
from flask_restx import Resource, fields
from pydantic import BaseModel, Field
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, setup_required
from libs.login import current_account_with_tenant, login_required
from services.sandbox.sandbox_file_service import SandboxFileService
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
class SandboxFileListQuery(BaseModel):
path: str | None = Field(default=None, description="Workspace relative path")
recursive: bool = Field(default=False, description="List recursively")
class SandboxFileDownloadRequest(BaseModel):
path: str = Field(..., description="Workspace relative file path")
console_ns.schema_model(
SandboxFileListQuery.__name__,
SandboxFileListQuery.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0),
)
console_ns.schema_model(
SandboxFileDownloadRequest.__name__,
SandboxFileDownloadRequest.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0),
)
SANDBOX_FILE_NODE_FIELDS = {
"path": fields.String,
"is_dir": fields.Boolean,
"size": fields.Raw,
"mtime": fields.Raw,
"extension": fields.String,
}
SANDBOX_FILE_DOWNLOAD_TICKET_FIELDS = {
"download_url": fields.String,
"expires_in": fields.Integer,
"export_id": fields.String,
}
sandbox_file_node_model = console_ns.model("SandboxFileNode", SANDBOX_FILE_NODE_FIELDS)
sandbox_file_download_ticket_model = console_ns.model("SandboxFileDownloadTicket", SANDBOX_FILE_DOWNLOAD_TICKET_FIELDS)
@console_ns.route("/apps/<string:app_id>/sandbox/files")
class SandboxFilesApi(Resource):
"""List sandbox files for the current user.
The sandbox_id is derived from the current user's ID, as each user has
their own sandbox workspace per app.
"""
@setup_required
@login_required
@account_initialization_required
@console_ns.expect(console_ns.models[SandboxFileListQuery.__name__])
@console_ns.marshal_list_with(sandbox_file_node_model)
def get(self, app_id: str):
args = SandboxFileListQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore[arg-type]
account, tenant_id = current_account_with_tenant()
sandbox_id = account.id
return jsonable_encoder(
SandboxFileService.list_files(
tenant_id=tenant_id,
app_id=app_id,
sandbox_id=sandbox_id,
path=args.path,
recursive=args.recursive,
)
)
@console_ns.route("/apps/<string:app_id>/sandbox/files/download")
class SandboxFileDownloadApi(Resource):
"""Download a sandbox file for the current user.
The sandbox_id is derived from the current user's ID, as each user has
their own sandbox workspace per app.
"""
@setup_required
@login_required
@account_initialization_required
@console_ns.expect(console_ns.models[SandboxFileDownloadRequest.__name__])
@console_ns.marshal_with(sandbox_file_download_ticket_model)
def post(self, app_id: str):
payload = SandboxFileDownloadRequest.model_validate(console_ns.payload or {})
account, tenant_id = current_account_with_tenant()
sandbox_id = account.id
res = SandboxFileService.download_file(
tenant_id=tenant_id, app_id=app_id, sandbox_id=sandbox_id, path=payload.path
)
return jsonable_encoder(res)

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,119 @@
import logging
from collections.abc import Callable
from typing import cast
from flask import Request as FlaskRequest
from extensions.ext_socketio import sio
from libs.passport import PassportService
from libs.token import extract_access_token
from repositories.workflow_collaboration_repository import WorkflowCollaborationRepository
from services.account_service import AccountService
from services.workflow_collaboration_service import WorkflowCollaborationService
repository = WorkflowCollaborationRepository()
collaboration_service = WorkflowCollaborationService(repository, sio)
def _sio_on(event: str) -> Callable[[Callable[..., object]], Callable[..., object]]:
return cast(Callable[[Callable[..., object]], Callable[..., object]], sio.on(event))
@_sio_on("connect")
def socket_connect(sid, environ, auth):
"""
WebSocket connect event, do authentication here.
"""
try:
request_environ = FlaskRequest(environ)
token = extract_access_token(request_environ)
except Exception:
logging.exception("Failed to extract token")
token = None
if not token:
logging.warning("Socket connect rejected: missing token (sid=%s)", sid)
return False
try:
decoded = PassportService().verify(token)
user_id = decoded.get("user_id")
if not user_id:
logging.warning("Socket connect rejected: missing user_id (sid=%s)", sid)
return False
with sio.app.app_context():
user = AccountService.load_logged_in_account(account_id=user_id)
if not user:
logging.warning("Socket connect rejected: user not found (user_id=%s, sid=%s)", user_id, sid)
return False
if not user.has_edit_permission:
logging.warning("Socket connect rejected: no edit permission (user_id=%s, sid=%s)", user_id, sid)
return False
collaboration_service.save_session(sid, user)
return True
except Exception:
logging.exception("Socket authentication failed")
return False
@_sio_on("user_connect")
def handle_user_connect(sid, data):
"""
Handle user connect event. Each session (tab) is treated as an independent collaborator.
"""
workflow_id = data.get("workflow_id")
if not workflow_id:
return {"msg": "workflow_id is required"}, 400
result = collaboration_service.register_session(workflow_id, sid)
if not result:
return {"msg": "unauthorized"}, 401
user_id, is_leader = result
return {"msg": "connected", "user_id": user_id, "sid": sid, "isLeader": is_leader}
@_sio_on("disconnect")
def handle_disconnect(sid):
"""
Handle session disconnect event. Remove the specific session from online users.
"""
collaboration_service.disconnect_session(sid)
@_sio_on("collaboration_event")
def handle_collaboration_event(sid, data):
"""
Handle general collaboration events, include:
1. mouse_move
2. vars_and_features_update
3. sync_request (ask leader to update graph)
4. app_state_update
5. mcp_server_update
6. workflow_update
7. comments_update
8. node_panel_presence
9. skill_file_active
10. skill_sync_request
11. skill_resync_request
"""
return collaboration_service.relay_collaboration_event(sid, data)
@_sio_on("graph_event")
def handle_graph_event(sid, data):
"""
Handle graph events - simple broadcast relay.
"""
return collaboration_service.relay_graph_event(sid, data)
@_sio_on("skill_event")
def handle_skill_event(sid, data):
"""
Handle skill events - simple broadcast relay.
"""
return collaboration_service.relay_skill_event(sid, data)

View File

@@ -9,7 +9,14 @@ from controllers.common.schema import register_schema_models
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, edit_permission_required, setup_required
from libs.login import current_account_with_tenant, login_required
from services.tag_service import TagService
from models.enums import TagType
from services.tag_service import (
SaveTagPayload,
TagBindingCreatePayload,
TagBindingDeletePayload,
TagService,
UpdateTagPayload,
)
dataset_tag_fields = {
"id": fields.String,
@@ -25,19 +32,19 @@ def build_dataset_tag_fields(api_or_ns: Namespace):
class TagBasePayload(BaseModel):
name: str = Field(description="Tag name", min_length=1, max_length=50)
type: Literal["knowledge", "app"] | None = Field(default=None, description="Tag type")
type: TagType = Field(description="Tag type")
class TagBindingPayload(BaseModel):
tag_ids: list[str] = Field(description="Tag IDs to bind")
target_id: str = Field(description="Target ID to bind tags to")
type: Literal["knowledge", "app"] | None = Field(default=None, description="Tag type")
type: TagType = Field(description="Tag type")
class TagBindingRemovePayload(BaseModel):
tag_id: str = Field(description="Tag ID to remove")
target_id: str = Field(description="Target ID to unbind tag from")
type: Literal["knowledge", "app"] | None = Field(default=None, description="Tag type")
type: TagType = Field(description="Tag type")
class TagListQueryParam(BaseModel):
@@ -82,7 +89,7 @@ class TagListApi(Resource):
raise Forbidden()
payload = TagBasePayload.model_validate(console_ns.payload or {})
tag = TagService.save_tags(payload.model_dump())
tag = TagService.save_tags(SaveTagPayload(name=payload.name, type=payload.type))
response = {"id": tag.id, "name": tag.name, "type": tag.type, "binding_count": 0}
@@ -103,7 +110,7 @@ class TagUpdateDeleteApi(Resource):
raise Forbidden()
payload = TagBasePayload.model_validate(console_ns.payload or {})
tag = TagService.update_tags(payload.model_dump(), tag_id)
tag = TagService.update_tags(UpdateTagPayload(name=payload.name, type=payload.type), tag_id)
binding_count = TagService.get_tag_binding_count(tag_id)
@@ -136,7 +143,9 @@ class TagBindingCreateApi(Resource):
raise Forbidden()
payload = TagBindingPayload.model_validate(console_ns.payload or {})
TagService.save_tag_binding(payload.model_dump())
TagService.save_tag_binding(
TagBindingCreatePayload(tag_ids=payload.tag_ids, target_id=payload.target_id, type=payload.type)
)
return {"result": "success"}, 200
@@ -154,6 +163,8 @@ class TagBindingDeleteApi(Resource):
raise Forbidden()
payload = TagBindingRemovePayload.model_validate(console_ns.payload or {})
TagService.delete_tag_binding(payload.model_dump())
TagService.delete_tag_binding(
TagBindingDeletePayload(tag_id=payload.tag_id, target_id=payload.target_id, type=payload.type)
)
return {"result": "success"}, 200

View File

@@ -1,6 +1,7 @@
from collections.abc import Callable
from functools import wraps
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from werkzeug.exceptions import Forbidden
@@ -21,12 +22,12 @@ def plugin_permission_required(
tenant_id = current_tenant_id
with sessionmaker(db.engine).begin() as session:
permission = (
session.query(TenantPluginPermission)
permission = session.scalar(
select(TenantPluginPermission)
.where(
TenantPluginPermission.tenant_id == tenant_id,
)
.first()
.limit(1)
)
if not permission:

View File

@@ -0,0 +1,67 @@
import json
import httpx
import yaml
from flask import request
from flask_restx import Resource
from pydantic import BaseModel
from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, setup_required
from core.plugin.impl.exc import PluginPermissionDeniedError
from extensions.ext_database import db
from libs.login import current_account_with_tenant, login_required
from models.model import App
from models.workflow import Workflow
from services.app_dsl_service import AppDslService
class DSLPredictRequest(BaseModel):
app_id: str
current_node_id: str
@console_ns.route("/workspaces/current/dsl/predict")
class DSLPredictApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self):
user, _ = current_account_with_tenant()
if not user.is_admin_or_owner:
raise Forbidden()
args = DSLPredictRequest.model_validate(request.get_json())
app_id: str = args.app_id
current_node_id: str = args.current_node_id
with Session(db.engine) as session:
app = session.query(App).filter_by(id=app_id).first()
workflow = session.query(Workflow).filter_by(app_id=app_id, version=Workflow.VERSION_DRAFT).first()
if not app:
raise ValueError("App not found")
if not workflow:
raise ValueError("Workflow not found")
try:
i = 0
for node_id, _ in workflow.walk_nodes():
if node_id == current_node_id:
break
i += 1
dsl = yaml.safe_load(AppDslService.export_dsl(app_model=app))
response = httpx.post(
"http://spark-832c:8000/predict",
json={"graph_data": dsl, "source_node_index": i},
)
return {
"nodes": json.loads(response.json()),
}
except PluginPermissionDeniedError as e:
raise ValueError(e.description) from e

View File

@@ -0,0 +1,104 @@
import logging
from flask import request
from flask_restx import Resource, fields
from pydantic import BaseModel
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, setup_required
from graphon.model_runtime.utils.encoders import jsonable_encoder
from libs.login import current_account_with_tenant, login_required
from services.sandbox.sandbox_provider_service import SandboxProviderService
logger = logging.getLogger(__name__)
class SandboxProviderConfigRequest(BaseModel):
config: dict
activate: bool = False
class SandboxProviderActivateRequest(BaseModel):
type: str
@console_ns.route("/workspaces/current/sandbox-providers")
class SandboxProviderListApi(Resource):
@console_ns.doc("list_sandbox_providers")
@console_ns.doc(description="Get list of available sandbox providers with configuration status")
@console_ns.response(200, "Success", fields.List(fields.Raw(description="Sandbox provider information")))
@setup_required
@login_required
@account_initialization_required
def get(self):
_, current_tenant_id = current_account_with_tenant()
providers = SandboxProviderService.list_providers(current_tenant_id)
return jsonable_encoder([p.model_dump() for p in providers])
@console_ns.route("/workspaces/current/sandbox-provider/<string:provider_type>/config")
class SandboxProviderConfigApi(Resource):
@console_ns.doc("save_sandbox_provider_config")
@console_ns.doc(description="Save or update configuration for a sandbox provider")
@console_ns.response(200, "Success")
@setup_required
@login_required
@account_initialization_required
def post(self, provider_type: str):
_, current_tenant_id = current_account_with_tenant()
args = SandboxProviderConfigRequest.model_validate(request.get_json())
try:
result = SandboxProviderService.save_config(
tenant_id=current_tenant_id,
provider_type=provider_type,
config=args.config,
activate=args.activate,
)
return result
except ValueError as e:
return {"message": str(e)}, 400
@console_ns.doc("delete_sandbox_provider_config")
@console_ns.doc(description="Delete configuration for a sandbox provider")
@console_ns.response(200, "Success")
@setup_required
@login_required
@account_initialization_required
def delete(self, provider_type: str):
_, current_tenant_id = current_account_with_tenant()
try:
result = SandboxProviderService.delete_config(
tenant_id=current_tenant_id,
provider_type=provider_type,
)
return result
except ValueError as e:
return {"message": str(e)}, 400
@console_ns.route("/workspaces/current/sandbox-provider/<string:provider_type>/activate")
class SandboxProviderActivateApi(Resource):
"""Activate a sandbox provider."""
@console_ns.doc("activate_sandbox_provider")
@console_ns.doc(description="Activate a sandbox provider for the current workspace")
@console_ns.response(200, "Success")
@setup_required
@login_required
@account_initialization_required
def post(self, provider_type: str):
"""Activate a sandbox provider."""
_, current_tenant_id = current_account_with_tenant()
try:
args = SandboxProviderActivateRequest.model_validate(request.get_json())
result = SandboxProviderService.activate_provider(
tenant_id=current_tenant_id,
provider_type=provider_type,
type=args.type,
)
return result
except ValueError as e:
return {"message": str(e)}, 400

View File

@@ -28,7 +28,7 @@ from enums.cloud_plan import CloudPlan
from extensions.ext_database import db
from libs.helper import TimestampField
from libs.login import current_account_with_tenant, login_required
from models.account import Tenant, TenantStatus
from models.account import Tenant, TenantCustomConfigDict, TenantStatus
from services.account_service import TenantService
from services.billing_service import BillingService, SubscriptionPlan
from services.enterprise.enterprise_service import EnterpriseService
@@ -240,8 +240,10 @@ class CustomConfigWorkspaceApi(Resource):
args = WorkspaceCustomConfigPayload.model_validate(payload)
tenant = db.get_or_404(Tenant, current_tenant_id)
custom_config_dict = {
"remove_webapp_brand": args.remove_webapp_brand,
custom_config_dict: TenantCustomConfigDict = {
"remove_webapp_brand": args.remove_webapp_brand
if args.remove_webapp_brand is not None
else tenant.custom_config_dict.get("remove_webapp_brand", False),
"replace_webapp_logo": args.replace_webapp_logo
if args.replace_webapp_logo is not None
else tenant.custom_config_dict.get("replace_webapp_logo"),

View File

@@ -0,0 +1,80 @@
"""Token-based file proxy controller for storage operations.
This controller handles file download and upload operations using opaque UUID tokens.
The token maps to the real storage key in Redis, so the actual storage path is never
exposed in the URL.
Routes:
GET /files/storage-files/{token} - Download a file
PUT /files/storage-files/{token} - Upload a file
The operation type (download/upload) is determined by the ticket stored in Redis,
not by the HTTP method. This ensures a download ticket cannot be used for upload
and vice versa.
"""
from urllib.parse import quote
from flask import Response, request
from flask_restx import Resource
from werkzeug.exceptions import Forbidden, NotFound, RequestEntityTooLarge
from controllers.files import files_ns
from extensions.ext_storage import storage
from services.storage_ticket_service import StorageTicketService
@files_ns.route("/storage-files/<string:token>")
class StorageFilesApi(Resource):
"""Handle file operations through token-based URLs."""
def get(self, token: str):
"""Download a file using a token.
The ticket must have op="download", otherwise returns 403.
"""
ticket = StorageTicketService.get_ticket(token)
if ticket is None:
raise Forbidden("Invalid or expired token")
if ticket.op != "download":
raise Forbidden("This token is not valid for download")
try:
generator = storage.load_stream(ticket.storage_key)
except FileNotFoundError:
raise NotFound("File not found")
filename = ticket.filename or ticket.storage_key.rsplit("/", 1)[-1]
encoded_filename = quote(filename)
return Response(
generator,
mimetype="application/octet-stream",
direct_passthrough=True,
headers={
"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}",
},
)
def put(self, token: str):
"""Upload a file using a token.
The ticket must have op="upload", otherwise returns 403.
If the request body exceeds max_bytes, returns 413.
"""
ticket = StorageTicketService.get_ticket(token)
if ticket is None:
raise Forbidden("Invalid or expired token")
if ticket.op != "upload":
raise Forbidden("This token is not valid for upload")
content = request.get_data()
if ticket.max_bytes is not None and len(content) > ticket.max_bytes:
raise RequestEntityTooLarge(f"Upload exceeds maximum size of {ticket.max_bytes} bytes")
storage.save(ticket.storage_key, content)
return Response(status=204)

View File

@@ -9,7 +9,7 @@ from flask import request
from flask_restx import Resource
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.orm import Session
from sqlalchemy.orm import sessionmaker
from controllers.common.schema import register_schema_model
from controllers.console.wraps import setup_required
@@ -55,7 +55,7 @@ class EnterpriseAppDSLImport(Resource):
account.set_tenant_id(workspace_id)
with Session(db.engine) as session:
with sessionmaker(db.engine).begin() as session:
dsl_service = AppDslService(session)
result = dsl_service.import_app(
account=account,
@@ -64,7 +64,6 @@ class EnterpriseAppDSLImport(Resource):
name=args.name,
description=args.description,
)
session.commit()
if result.status == ImportStatus.FAILED:
return result.model_dump(mode="json"), 400

View File

@@ -4,6 +4,7 @@ from flask import Response
from flask_restx import Resource
from graphon.variables.input_entities import VariableEntity
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
from controllers.common.schema import register_schema_model
@@ -80,11 +81,11 @@ class MCPAppApi(Resource):
def _get_mcp_server_and_app(self, server_code: str, session: Session) -> tuple[AppMCPServer, App]:
"""Get and validate MCP server and app in one query session"""
mcp_server = session.query(AppMCPServer).where(AppMCPServer.server_code == server_code).first()
mcp_server = session.scalar(select(AppMCPServer).where(AppMCPServer.server_code == server_code).limit(1))
if not mcp_server:
raise MCPRequestError(mcp_types.INVALID_REQUEST, "Server Not Found")
app = session.query(App).where(App.id == mcp_server.app_id).first()
app = session.scalar(select(App).where(App.id == mcp_server.app_id).limit(1))
if not app:
raise MCPRequestError(mcp_types.INVALID_REQUEST, "App Not Found")
@@ -190,12 +191,12 @@ class MCPAppApi(Resource):
def _retrieve_end_user(self, tenant_id: str, mcp_server_id: str) -> EndUser | None:
"""Get end user - manages its own database session"""
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
return (
session.query(EndUser)
return session.scalar(
select(EndUser)
.where(EndUser.tenant_id == tenant_id)
.where(EndUser.session_id == mcp_server_id)
.where(EndUser.type == "mcp")
.first()
.limit(1)
)
def _create_end_user(

View File

@@ -194,7 +194,7 @@ class ChatApi(Resource):
Supports conversation management and both blocking and streaming response modes.
"""
app_mode = AppMode.value_of(app_model.mode)
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.AGENT}:
raise NotChatAppError()
payload = ChatRequestPayload.model_validate(service_api_ns.payload or {})
@@ -258,7 +258,7 @@ class ChatStopApi(Resource):
def post(self, app_model: App, end_user: EndUser, task_id: str):
"""Stop a running chat message generation."""
app_mode = AppMode.value_of(app_model.mode)
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.AGENT}:
raise NotChatAppError()
AppTaskService.stop_task(

View File

@@ -109,7 +109,7 @@ class ConversationApi(Resource):
Supports pagination using last_id and limit parameters.
"""
app_mode = AppMode.value_of(app_model.mode)
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.AGENT}:
raise NotChatAppError()
query_args = ConversationListQuery.model_validate(request.args.to_dict())
@@ -153,7 +153,7 @@ class ConversationDetailApi(Resource):
def delete(self, app_model: App, end_user: EndUser, c_id):
"""Delete a specific conversation."""
app_mode = AppMode.value_of(app_model.mode)
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.AGENT}:
raise NotChatAppError()
conversation_id = str(c_id)
@@ -182,7 +182,7 @@ class ConversationRenameApi(Resource):
def post(self, app_model: App, end_user: EndUser, c_id):
"""Rename a conversation or auto-generate a name."""
app_mode = AppMode.value_of(app_model.mode)
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.AGENT}:
raise NotChatAppError()
conversation_id = str(c_id)
@@ -224,7 +224,7 @@ class ConversationVariablesApi(Resource):
"""
# conversational variable only for chat app
app_mode = AppMode.value_of(app_model.mode)
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.AGENT}:
raise NotChatAppError()
conversation_id = str(c_id)
@@ -263,7 +263,7 @@ class ConversationVariableDetailApi(Resource):
The value must match the variable's expected type.
"""
app_mode = AppMode.value_of(app_model.mode)
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.AGENT}:
raise NotChatAppError()
conversation_id = str(c_id)

View File

@@ -65,7 +65,7 @@ class MessageListApi(Resource):
Retrieves messages with pagination support using first_id.
"""
app_mode = AppMode.value_of(app_model.mode)
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.AGENT}:
raise NotChatAppError()
query_args = MessageListQuery.model_validate(request.args.to_dict())
@@ -170,7 +170,7 @@ class MessageSuggestedApi(Resource):
"""
message_id = str(message_id)
app_mode = AppMode.value_of(app_model.mode)
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT, AppMode.AGENT}:
raise NotChatAppError()
try:

View File

@@ -22,10 +22,17 @@ from fields.tag_fields import DataSetTag
from libs.login import current_user
from models.account import Account
from models.dataset import DatasetPermissionEnum
from models.enums import TagType
from models.provider_ids import ModelProviderID
from services.dataset_service import DatasetPermissionService, DatasetService, DocumentService
from services.entities.knowledge_entities.knowledge_entities import RetrievalModel
from services.tag_service import TagService
from services.tag_service import (
SaveTagPayload,
TagBindingCreatePayload,
TagBindingDeletePayload,
TagService,
UpdateTagPayload,
)
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
@@ -513,7 +520,7 @@ class DatasetTagsApi(DatasetApiResource):
raise Forbidden()
payload = TagCreatePayload.model_validate(service_api_ns.payload or {})
tag = TagService.save_tags({"name": payload.name, "type": "knowledge"})
tag = TagService.save_tags(SaveTagPayload(name=payload.name, type=TagType.KNOWLEDGE))
response = DataSetTag.model_validate(
{"id": tag.id, "name": tag.name, "type": tag.type, "binding_count": 0}
@@ -536,9 +543,8 @@ class DatasetTagsApi(DatasetApiResource):
raise Forbidden()
payload = TagUpdatePayload.model_validate(service_api_ns.payload or {})
params = {"name": payload.name, "type": "knowledge"}
tag_id = payload.tag_id
tag = TagService.update_tags(params, tag_id)
tag = TagService.update_tags(UpdateTagPayload(name=payload.name, type=TagType.KNOWLEDGE), tag_id)
binding_count = TagService.get_tag_binding_count(tag_id)
@@ -585,7 +591,9 @@ class DatasetTagBindingApi(DatasetApiResource):
raise Forbidden()
payload = TagBindingPayload.model_validate(service_api_ns.payload or {})
TagService.save_tag_binding({"tag_ids": payload.tag_ids, "target_id": payload.target_id, "type": "knowledge"})
TagService.save_tag_binding(
TagBindingCreatePayload(tag_ids=payload.tag_ids, target_id=payload.target_id, type=TagType.KNOWLEDGE)
)
return "", 204
@@ -609,7 +617,9 @@ class DatasetTagUnbindingApi(DatasetApiResource):
raise Forbidden()
payload = TagUnbindingPayload.model_validate(service_api_ns.payload or {})
TagService.delete_tag_binding({"tag_id": payload.tag_id, "target_id": payload.target_id, "type": "knowledge"})
TagService.delete_tag_binding(
TagBindingDeletePayload(tag_id=payload.tag_id, target_id=payload.target_id, type=TagType.KNOWLEDGE)
)
return "", 204

View File

@@ -31,6 +31,7 @@ from controllers.service_api.wraps import (
cloud_edition_billing_resource_check,
)
from core.errors.error import ProviderTokenNotInitError
from core.rag.entities import PreProcessingRule, Rule, Segmentation
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from extensions.ext_database import db
from fields.document_fields import document_fields, document_status_fields
@@ -40,11 +41,8 @@ from models.enums import SegmentStatus
from services.dataset_service import DatasetService, DocumentService
from services.entities.knowledge_entities.knowledge_entities import (
KnowledgeConfig,
PreProcessingRule,
ProcessRule,
RetrievalModel,
Rule,
Segmentation,
)
from services.file_service import FileService
from services.summary_index_service import SummaryIndexService

View File

@@ -4,13 +4,23 @@ Serialization helpers for Service API knowledge pipeline endpoints.
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, TypedDict
if TYPE_CHECKING:
from models.model import UploadFile
def serialize_upload_file(upload_file: UploadFile) -> dict[str, Any]:
class UploadFileDict(TypedDict):
id: str
name: str
size: int
extension: str
mime_type: str | None
created_by: str
created_at: str | None
def serialize_upload_file(upload_file: UploadFile) -> UploadFileDict:
return {
"id": upload_file.id,
"name": upload_file.name,

View File

@@ -3,7 +3,6 @@ import secrets
from flask import request
from flask_restx import Resource
from pydantic import BaseModel, Field, field_validator
from sqlalchemy.orm import sessionmaker
from controllers.common.schema import register_schema_models
@@ -19,33 +18,15 @@ from controllers.console.error import EmailSendIpLimitError
from controllers.console.wraps import email_password_login_enabled, only_edition_enterprise, setup_required
from controllers.web import web_ns
from extensions.ext_database import db
from libs.helper import EmailStr, extract_remote_ip
from libs.password import hash_password, valid_password
from libs.helper import extract_remote_ip
from libs.password import hash_password
from models.account import Account
from services.account_service import AccountService
class ForgotPasswordSendPayload(BaseModel):
email: EmailStr
language: str | None = None
class ForgotPasswordCheckPayload(BaseModel):
email: EmailStr
code: str
token: str = Field(min_length=1)
class ForgotPasswordResetPayload(BaseModel):
token: str = Field(min_length=1)
new_password: str
password_confirm: str
@field_validator("new_password", "password_confirm")
@classmethod
def validate_password(cls, value: str) -> str:
return valid_password(value)
from services.entities.auth_entities import (
ForgotPasswordCheckPayload,
ForgotPasswordResetPayload,
ForgotPasswordSendPayload,
)
register_schema_models(web_ns, ForgotPasswordSendPayload, ForgotPasswordCheckPayload, ForgotPasswordResetPayload)

View File

@@ -29,13 +29,11 @@ from libs.token import (
)
from services.account_service import AccountService
from services.app_service import AppService
from services.entities.auth_entities import LoginPayloadBase
from services.webapp_auth_service import WebAppAuthService
class LoginPayload(BaseModel):
email: EmailStr
password: str
class LoginPayload(LoginPayloadBase):
@field_validator("password")
@classmethod
def validate_password(cls, value: str) -> str:

View File

@@ -0,0 +1,380 @@
import logging
from collections.abc import Generator
from copy import deepcopy
from typing import Any
from core.agent.base_agent_runner import BaseAgentRunner
from core.agent.entities import AgentEntity, AgentLog, AgentResult
from core.agent.patterns.strategy_factory import StrategyFactory
from core.app.apps.base_app_queue_manager import PublishFrom
from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessageEndEvent, QueueMessageFileEvent
from core.prompt.agent_history_prompt_transform import AgentHistoryPromptTransform
from core.tools.__base.tool import Tool
from core.tools.entities.tool_entities import ToolInvokeMeta
from core.tools.tool_engine import ToolEngine
from graphon.file import file_manager
from graphon.model_runtime.entities import (
AssistantPromptMessage,
LLMResult,
LLMResultChunk,
LLMUsage,
PromptMessage,
PromptMessageContentType,
SystemPromptMessage,
TextPromptMessageContent,
UserPromptMessage,
)
from graphon.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
from models.model import Message
logger = logging.getLogger(__name__)
class AgentAppRunner(BaseAgentRunner):
def _create_tool_invoke_hook(self, message: Message):
"""
Create a tool invoke hook that uses ToolEngine.agent_invoke.
This hook handles file creation and returns proper meta information.
"""
# Get trace manager from app generate entity
trace_manager = self.application_generate_entity.trace_manager
def tool_invoke_hook(
tool: Tool, tool_args: dict[str, Any], tool_name: str
) -> tuple[str, list[str], ToolInvokeMeta]:
"""Hook that uses agent_invoke for proper file and meta handling."""
tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke(
tool=tool,
tool_parameters=tool_args,
user_id=self.user_id,
tenant_id=self.tenant_id,
message=message,
invoke_from=self.application_generate_entity.invoke_from,
agent_tool_callback=self.agent_callback,
trace_manager=trace_manager,
app_id=self.application_generate_entity.app_config.app_id,
message_id=message.id,
conversation_id=self.conversation.id,
)
# Publish files and track IDs
for message_file_id in message_files:
self.queue_manager.publish(
QueueMessageFileEvent(message_file_id=message_file_id),
PublishFrom.APPLICATION_MANAGER,
)
self._current_message_file_ids.append(message_file_id)
return tool_invoke_response, message_files, tool_invoke_meta
return tool_invoke_hook
def run(self, message: Message, query: str, **kwargs: Any) -> Generator[LLMResultChunk, None, None]:
"""
Run Agent application
"""
self.query = query
app_generate_entity = self.application_generate_entity
app_config = self.app_config
assert app_config is not None, "app_config is required"
assert app_config.agent is not None, "app_config.agent is required"
# convert tools into ModelRuntime Tool format
tool_instances, _ = self._init_prompt_tools()
assert app_config.agent
# Create tool invoke hook for agent_invoke
tool_invoke_hook = self._create_tool_invoke_hook(message)
# Get instruction for ReAct strategy
instruction = self.app_config.prompt_template.simple_prompt_template or ""
# Use factory to create appropriate strategy
strategy = StrategyFactory.create_strategy(
model_features=self.model_features,
model_instance=self.model_instance,
tools=list(tool_instances.values()),
files=list(self.files),
max_iterations=app_config.agent.max_iteration,
context=self.build_execution_context(),
agent_strategy=self.config.strategy,
tool_invoke_hook=tool_invoke_hook,
instruction=instruction,
)
# Initialize state variables
current_agent_thought_id: str | None = None
has_published_thought = False
current_tool_name: str | None = None
self._current_message_file_ids: list[str] = []
# organize prompt messages
prompt_messages = self._organize_prompt_messages()
# Run strategy
generator = strategy.run(
prompt_messages=prompt_messages,
model_parameters=app_generate_entity.model_conf.parameters,
stop=app_generate_entity.model_conf.stop,
stream=True,
)
# Consume generator and collect result
result: AgentResult | None = None
try:
while True:
try:
output = next(generator)
except StopIteration as e:
# Generator finished, get the return value
result = e.value
break
if isinstance(output, LLMResultChunk):
# Handle LLM chunk
if current_agent_thought_id and not has_published_thought:
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=current_agent_thought_id),
PublishFrom.APPLICATION_MANAGER,
)
has_published_thought = True
yield output
elif isinstance(output, AgentLog):
# Handle Agent Log using log_type for type-safe dispatch
if output.status == AgentLog.LogStatus.START:
if output.log_type == AgentLog.LogType.ROUND:
# Start of a new round
message_file_ids: list[str] = []
current_agent_thought_id = self.create_agent_thought(
message_id=message.id,
message="",
tool_name="",
tool_input="",
messages_ids=message_file_ids,
)
has_published_thought = False
elif output.log_type == AgentLog.LogType.TOOL_CALL:
if current_agent_thought_id is None:
continue
# Tool call start - extract data from structured fields
current_tool_name = output.data.get("tool_name", "")
tool_input = output.data.get("tool_args", {})
self.save_agent_thought(
agent_thought_id=current_agent_thought_id,
tool_name=current_tool_name,
tool_input=tool_input,
thought=None,
observation=None,
tool_invoke_meta=None,
answer=None,
messages_ids=[],
)
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=current_agent_thought_id),
PublishFrom.APPLICATION_MANAGER,
)
elif output.status == AgentLog.LogStatus.SUCCESS:
if output.log_type == AgentLog.LogType.THOUGHT:
if current_agent_thought_id is None:
continue
thought_text = output.data.get("thought")
self.save_agent_thought(
agent_thought_id=current_agent_thought_id,
tool_name=None,
tool_input=None,
thought=thought_text,
observation=None,
tool_invoke_meta=None,
answer=None,
messages_ids=[],
)
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=current_agent_thought_id),
PublishFrom.APPLICATION_MANAGER,
)
elif output.log_type == AgentLog.LogType.TOOL_CALL:
if current_agent_thought_id is None:
continue
# Tool call finished
tool_output = output.data.get("output")
# Get meta from strategy output (now properly populated)
tool_meta = output.data.get("meta")
# Wrap tool_meta with tool_name as key (required by agent_service)
if tool_meta and current_tool_name:
tool_meta = {current_tool_name: tool_meta}
self.save_agent_thought(
agent_thought_id=current_agent_thought_id,
tool_name=None,
tool_input=None,
thought=None,
observation=tool_output,
tool_invoke_meta=tool_meta,
answer=None,
messages_ids=self._current_message_file_ids,
)
# Clear message file ids after saving
self._current_message_file_ids = []
current_tool_name = None
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=current_agent_thought_id),
PublishFrom.APPLICATION_MANAGER,
)
elif output.log_type == AgentLog.LogType.ROUND:
if current_agent_thought_id is None:
continue
# Round finished - save LLM usage and answer
llm_usage = output.metadata.get(AgentLog.LogMetadata.LLM_USAGE)
llm_result = output.data.get("llm_result")
final_answer = output.data.get("final_answer")
self.save_agent_thought(
agent_thought_id=current_agent_thought_id,
tool_name=None,
tool_input=None,
thought=llm_result,
observation=None,
tool_invoke_meta=None,
answer=final_answer,
messages_ids=[],
llm_usage=llm_usage,
)
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=current_agent_thought_id),
PublishFrom.APPLICATION_MANAGER,
)
except Exception:
# Re-raise any other exceptions
raise
# Process final result
if isinstance(result, AgentResult):
final_answer = result.text
usage = result.usage or LLMUsage.empty_usage()
# Publish end event
self.queue_manager.publish(
QueueMessageEndEvent(
llm_result=LLMResult(
model=self.model_instance.model_name,
prompt_messages=prompt_messages,
message=AssistantPromptMessage(content=final_answer),
usage=usage,
system_fingerprint="",
)
),
PublishFrom.APPLICATION_MANAGER,
)
def _init_system_message(self, prompt_template: str, prompt_messages: list[PromptMessage]) -> list[PromptMessage]:
"""
Initialize system message
"""
if not prompt_template:
return prompt_messages or []
prompt_messages = prompt_messages or []
if prompt_messages and isinstance(prompt_messages[0], SystemPromptMessage):
prompt_messages[0] = SystemPromptMessage(content=prompt_template)
return prompt_messages
if not prompt_messages:
return [SystemPromptMessage(content=prompt_template)]
prompt_messages.insert(0, SystemPromptMessage(content=prompt_template))
return prompt_messages
def _organize_user_query(self, query: str, prompt_messages: list[PromptMessage]) -> list[PromptMessage]:
"""
Organize user query
"""
if self.files:
# get image detail config
image_detail_config = (
self.application_generate_entity.file_upload_config.image_config.detail
if (
self.application_generate_entity.file_upload_config
and self.application_generate_entity.file_upload_config.image_config
)
else None
)
image_detail_config = image_detail_config or ImagePromptMessageContent.DETAIL.LOW
prompt_message_contents: list[PromptMessageContentUnionTypes] = []
for file in self.files:
prompt_message_contents.append(
file_manager.to_prompt_message_content(
file,
image_detail_config=image_detail_config,
)
)
prompt_message_contents.append(TextPromptMessageContent(data=query))
prompt_messages.append(UserPromptMessage(content=prompt_message_contents))
else:
prompt_messages.append(UserPromptMessage(content=query))
return prompt_messages
def _clear_user_prompt_image_messages(self, prompt_messages: list[PromptMessage]) -> list[PromptMessage]:
"""
As for now, gpt supports both fc and vision at the first iteration.
We need to remove the image messages from the prompt messages at the first iteration.
"""
prompt_messages = deepcopy(prompt_messages)
for prompt_message in prompt_messages:
if isinstance(prompt_message, UserPromptMessage):
if isinstance(prompt_message.content, list):
prompt_message.content = "\n".join(
[
content.data
if content.type == PromptMessageContentType.TEXT
else "[image]"
if content.type == PromptMessageContentType.IMAGE
else "[file]"
for content in prompt_message.content
]
)
return prompt_messages
def _organize_prompt_messages(self):
# For ReAct strategy, use the agent prompt template
if self.config.strategy == AgentEntity.Strategy.CHAIN_OF_THOUGHT and self.config.prompt:
prompt_template = self.config.prompt.first_prompt
else:
prompt_template = self.app_config.prompt_template.simple_prompt_template or ""
self.history_prompt_messages = self._init_system_message(prompt_template, self.history_prompt_messages)
query_prompt_messages = self._organize_user_query(self.query or "", [])
self.history_prompt_messages = AgentHistoryPromptTransform(
model_config=self.model_config,
prompt_messages=[*query_prompt_messages, *self._current_thoughts],
history_messages=self.history_prompt_messages,
memory=self.memory,
).get_prompt()
prompt_messages = [*self.history_prompt_messages, *query_prompt_messages, *self._current_thoughts]
if len(self._current_thoughts) != 0:
# clear messages after the first iteration
prompt_messages = self._clear_user_prompt_image_messages(prompt_messages)
return prompt_messages

View File

@@ -79,21 +79,18 @@ class CotChatAgentRunner(CotAgentRunner):
if not agent_scratchpad:
assistant_messages = []
else:
assistant_message = AssistantPromptMessage(content="")
assistant_message.content = "" # FIXME: type check tell mypy that assistant_message.content is str
content = ""
for unit in agent_scratchpad:
if unit.is_final():
assert isinstance(assistant_message.content, str)
assistant_message.content += f"Final Answer: {unit.agent_response}"
content += f"Final Answer: {unit.agent_response}"
else:
assert isinstance(assistant_message.content, str)
assistant_message.content += f"Thought: {unit.thought}\n\n"
content += f"Thought: {unit.thought}\n\n"
if unit.action_str:
assistant_message.content += f"Action: {unit.action_str}\n\n"
content += f"Action: {unit.action_str}\n\n"
if unit.observation:
assistant_message.content += f"Observation: {unit.observation}\n\n"
content += f"Observation: {unit.observation}\n\n"
assistant_messages = [assistant_message]
assistant_messages = [AssistantPromptMessage(content=content)]
# query messages
query_messages = self._organize_user_query(self._query, [])

View File

@@ -1,3 +1,5 @@
import uuid
from collections.abc import Mapping
from enum import StrEnum
from typing import Any, Union
@@ -92,3 +94,80 @@ class AgentInvokeMessage(ToolInvokeMessage):
"""
pass
class ExecutionContext(BaseModel):
"""Execution context containing trace and audit information.
Carries IDs and metadata needed for tracing, auditing, and correlation
but not part of the core business logic.
"""
user_id: str | None = None
app_id: str | None = None
conversation_id: str | None = None
message_id: str | None = None
tenant_id: str | None = None
node_id: str | None = None
@classmethod
def create_minimal(cls, user_id: str | None = None) -> "ExecutionContext":
return cls(user_id=user_id)
def to_dict(self) -> dict[str, Any]:
return {
"user_id": self.user_id,
"app_id": self.app_id,
"conversation_id": self.conversation_id,
"message_id": self.message_id,
"tenant_id": self.tenant_id,
}
def with_updates(self, **kwargs) -> "ExecutionContext":
data = self.to_dict()
data.update(kwargs)
return ExecutionContext(**{k: v for k, v in data.items() if k in ExecutionContext.model_fields})
class AgentLog(BaseModel):
"""Structured log entry for agent execution tracing."""
class LogType(StrEnum):
ROUND = "round"
THOUGHT = "thought"
TOOL_CALL = "tool_call"
class LogMetadata(StrEnum):
STARTED_AT = "started_at"
FINISHED_AT = "finished_at"
ELAPSED_TIME = "elapsed_time"
TOTAL_PRICE = "total_price"
TOTAL_TOKENS = "total_tokens"
PROVIDER = "provider"
CURRENCY = "currency"
LLM_USAGE = "llm_usage"
ICON = "icon"
ICON_DARK = "icon_dark"
class LogStatus(StrEnum):
START = "start"
ERROR = "error"
SUCCESS = "success"
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
label: str = Field(...)
log_type: LogType = Field(...)
parent_id: str | None = Field(default=None)
error: str | None = Field(default=None)
status: LogStatus = Field(...)
data: Mapping[str, Any] = Field(...)
metadata: Mapping[LogMetadata, Any] = Field(default={})
class AgentResult(BaseModel):
"""Agent execution result."""
text: str = Field(default="")
files: list[Any] = Field(default_factory=list)
usage: Any | None = Field(default=None)
finish_reason: str | None = Field(default=None)

View File

@@ -0,0 +1,19 @@
"""Agent patterns module.
This module provides different strategies for agent execution:
- FunctionCallStrategy: Uses native function/tool calling
- ReActStrategy: Uses ReAct (Reasoning + Acting) approach
- StrategyFactory: Factory for creating strategies based on model features
"""
from .base import AgentPattern
from .function_call import FunctionCallStrategy
from .react import ReActStrategy
from .strategy_factory import StrategyFactory
__all__ = [
"AgentPattern",
"FunctionCallStrategy",
"ReActStrategy",
"StrategyFactory",
]

View File

@@ -0,0 +1,506 @@
"""Base class for agent strategies."""
from __future__ import annotations
import json
import re
import time
from abc import ABC, abstractmethod
from collections.abc import Callable, Generator
from typing import TYPE_CHECKING, Any
from core.agent.entities import AgentLog, AgentResult, ExecutionContext
from core.model_manager import ModelInstance
from core.tools.entities.tool_entities import ToolInvokeMessage, ToolInvokeMeta
from graphon.file import File
from graphon.model_runtime.entities import (
AssistantPromptMessage,
LLMResult,
LLMResultChunk,
LLMResultChunkDelta,
PromptMessage,
PromptMessageTool,
)
from graphon.model_runtime.entities.llm_entities import LLMUsage
from graphon.model_runtime.entities.message_entities import TextPromptMessageContent
if TYPE_CHECKING:
from core.tools.__base.tool import Tool
# Type alias for tool invoke hook
# Returns: (response_content, message_file_ids, tool_invoke_meta)
ToolInvokeHook = Callable[["Tool", dict[str, Any], str], tuple[str, list[str], ToolInvokeMeta]]
class AgentPattern(ABC):
"""Base class for agent execution strategies."""
def __init__(
self,
model_instance: ModelInstance,
tools: list[Tool],
context: ExecutionContext,
max_iterations: int = 10,
workflow_call_depth: int = 0,
files: list[File] = [],
tool_invoke_hook: ToolInvokeHook | None = None,
):
"""Initialize the agent strategy."""
self.model_instance = model_instance
self.tools = tools
self.context = context
self.max_iterations = min(max_iterations, 99) # Cap at 99 iterations
self.workflow_call_depth = workflow_call_depth
self.files: list[File] = files
self.tool_invoke_hook = tool_invoke_hook
@abstractmethod
def run(
self,
prompt_messages: list[PromptMessage],
model_parameters: dict[str, Any],
stop: list[str] = [],
stream: bool = True,
) -> Generator[LLMResultChunk | AgentLog, None, AgentResult]:
"""Execute the agent strategy."""
pass
def _accumulate_usage(self, total_usage: dict[str, Any], delta_usage: LLMUsage) -> None:
"""Accumulate LLM usage statistics."""
if not total_usage.get("usage"):
# Create a copy to avoid modifying the original
total_usage["usage"] = LLMUsage(
prompt_tokens=delta_usage.prompt_tokens,
prompt_unit_price=delta_usage.prompt_unit_price,
prompt_price_unit=delta_usage.prompt_price_unit,
prompt_price=delta_usage.prompt_price,
completion_tokens=delta_usage.completion_tokens,
completion_unit_price=delta_usage.completion_unit_price,
completion_price_unit=delta_usage.completion_price_unit,
completion_price=delta_usage.completion_price,
total_tokens=delta_usage.total_tokens,
total_price=delta_usage.total_price,
currency=delta_usage.currency,
latency=delta_usage.latency,
)
else:
current: LLMUsage = total_usage["usage"]
current.prompt_tokens += delta_usage.prompt_tokens
current.completion_tokens += delta_usage.completion_tokens
current.total_tokens += delta_usage.total_tokens
current.prompt_price += delta_usage.prompt_price
current.completion_price += delta_usage.completion_price
current.total_price += delta_usage.total_price
def _extract_content(self, content: Any) -> str:
"""Extract text content from message content."""
if isinstance(content, list):
# Content items are PromptMessageContentUnionTypes
text_parts = []
for c in content:
# Check if it's a TextPromptMessageContent (which has data attribute)
if isinstance(c, TextPromptMessageContent):
text_parts.append(c.data)
return "".join(text_parts)
return str(content)
def _has_tool_calls(self, chunk: LLMResultChunk) -> bool:
"""Check if chunk contains tool calls."""
# LLMResultChunk always has delta attribute
return bool(chunk.delta.message and chunk.delta.message.tool_calls)
def _has_tool_calls_result(self, result: LLMResult) -> bool:
"""Check if result contains tool calls (non-streaming)."""
# LLMResult always has message attribute
return bool(result.message and result.message.tool_calls)
def _extract_tool_calls(self, chunk: LLMResultChunk) -> list[tuple[str, str, dict[str, Any]]]:
"""Extract tool calls from streaming chunk."""
tool_calls: list[tuple[str, str, dict[str, Any]]] = []
if chunk.delta.message and chunk.delta.message.tool_calls:
for tool_call in chunk.delta.message.tool_calls:
if tool_call.function:
try:
args = json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}
except json.JSONDecodeError:
args = {}
tool_calls.append((tool_call.id or "", tool_call.function.name, args))
return tool_calls
def _extract_tool_calls_result(self, result: LLMResult) -> list[tuple[str, str, dict[str, Any]]]:
"""Extract tool calls from non-streaming result."""
tool_calls = []
if result.message and result.message.tool_calls:
for tool_call in result.message.tool_calls:
if tool_call.function:
try:
args = json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}
except json.JSONDecodeError:
args = {}
tool_calls.append((tool_call.id or "", tool_call.function.name, args))
return tool_calls
def _extract_text_from_message(self, message: PromptMessage) -> str:
"""Extract text content from a prompt message."""
# PromptMessage always has content attribute
content = message.content
if isinstance(content, str):
return content
elif isinstance(content, list):
# Extract text from content list
text_parts = []
for item in content:
if isinstance(item, TextPromptMessageContent):
text_parts.append(item.data)
return " ".join(text_parts)
return ""
def _get_tool_metadata(self, tool_instance: Tool) -> dict[AgentLog.LogMetadata, Any]:
"""Get metadata for a tool including provider and icon info."""
from core.tools.tool_manager import ToolManager
metadata: dict[AgentLog.LogMetadata, Any] = {}
if tool_instance.entity and tool_instance.entity.identity:
identity = tool_instance.entity.identity
if identity.provider:
metadata[AgentLog.LogMetadata.PROVIDER] = identity.provider
# Get icon using ToolManager for proper URL generation
tenant_id = self.context.tenant_id
if tenant_id and identity.provider:
try:
provider_type = tool_instance.tool_provider_type()
icon = ToolManager.get_tool_icon(tenant_id, provider_type, identity.provider)
if isinstance(icon, str):
metadata[AgentLog.LogMetadata.ICON] = icon
elif isinstance(icon, dict):
# Handle icon dict with background/content or light/dark variants
metadata[AgentLog.LogMetadata.ICON] = icon
except Exception:
# Fallback to identity.icon if ToolManager fails
if identity.icon:
metadata[AgentLog.LogMetadata.ICON] = identity.icon
elif identity.icon:
metadata[AgentLog.LogMetadata.ICON] = identity.icon
return metadata
def _create_log(
self,
label: str,
log_type: AgentLog.LogType,
status: AgentLog.LogStatus,
data: dict[str, Any] | None = None,
parent_id: str | None = None,
extra_metadata: dict[AgentLog.LogMetadata, Any] | None = None,
) -> AgentLog:
"""Create a new AgentLog with standard metadata."""
metadata: dict[AgentLog.LogMetadata, Any] = {
AgentLog.LogMetadata.STARTED_AT: time.perf_counter(),
}
if extra_metadata:
metadata.update(extra_metadata)
return AgentLog(
label=label,
log_type=log_type,
status=status,
data=data or {},
parent_id=parent_id,
metadata=metadata,
)
def _finish_log(
self,
log: AgentLog,
data: dict[str, Any] | None = None,
usage: LLMUsage | None = None,
) -> AgentLog:
"""Finish an AgentLog by updating its status and metadata."""
log.status = AgentLog.LogStatus.SUCCESS
if data is not None:
log.data = data
# Calculate elapsed time
started_at = log.metadata.get(AgentLog.LogMetadata.STARTED_AT, time.perf_counter())
finished_at = time.perf_counter()
# Update metadata
log.metadata = {
**log.metadata,
AgentLog.LogMetadata.FINISHED_AT: finished_at,
# Calculate elapsed time in seconds
AgentLog.LogMetadata.ELAPSED_TIME: round(finished_at - started_at, 4),
}
# Add usage information if provided
if usage:
log.metadata.update(
{
AgentLog.LogMetadata.TOTAL_PRICE: usage.total_price,
AgentLog.LogMetadata.CURRENCY: usage.currency,
AgentLog.LogMetadata.TOTAL_TOKENS: usage.total_tokens,
AgentLog.LogMetadata.LLM_USAGE: usage,
}
)
return log
def _replace_file_references(self, tool_args: dict[str, Any]) -> dict[str, Any]:
"""
Replace file references in tool arguments with actual File objects.
Args:
tool_args: Dictionary of tool arguments
Returns:
Updated tool arguments with file references replaced
"""
# Process each argument in the dictionary
processed_args: dict[str, Any] = {}
for key, value in tool_args.items():
processed_args[key] = self._process_file_reference(value)
return processed_args
def _process_file_reference(self, data: Any) -> Any:
"""
Recursively process data to replace file references.
Supports both single file [File: file_id] and multiple files [Files: file_id1, file_id2, ...].
Args:
data: The data to process (can be dict, list, str, or other types)
Returns:
Processed data with file references replaced
"""
single_file_pattern = re.compile(r"^\[File:\s*([^\]]+)\]$")
multiple_files_pattern = re.compile(r"^\[Files:\s*([^\]]+)\]$")
if isinstance(data, dict):
# Process dictionary recursively
return {key: self._process_file_reference(value) for key, value in data.items()}
elif isinstance(data, list):
# Process list recursively
return [self._process_file_reference(item) for item in data]
elif isinstance(data, str):
# Check for single file pattern [File: file_id]
single_match = single_file_pattern.match(data.strip())
if single_match:
file_id = single_match.group(1).strip()
# Find the file in self.files
for file in self.files:
if file.id and str(file.id) == file_id:
return file
# If file not found, return original value
return data
# Check for multiple files pattern [Files: file_id1, file_id2, ...]
multiple_match = multiple_files_pattern.match(data.strip())
if multiple_match:
file_ids_str = multiple_match.group(1).strip()
# Split by comma and strip whitespace
file_ids = [fid.strip() for fid in file_ids_str.split(",")]
# Find all matching files
matched_files: list[File] = []
for file_id in file_ids:
for file in self.files:
if file.id and str(file.id) == file_id:
matched_files.append(file)
break
# Return list of files if any were found, otherwise return original
return matched_files or data
return data
else:
# Return other types as-is
return data
def _create_text_chunk(self, text: str, prompt_messages: list[PromptMessage]) -> LLMResultChunk:
"""Create a text chunk for streaming."""
return LLMResultChunk(
model=self.model_instance.model_name,
prompt_messages=prompt_messages,
delta=LLMResultChunkDelta(
index=0,
message=AssistantPromptMessage(content=text),
usage=None,
),
system_fingerprint="",
)
def _invoke_tool(
self,
tool_instance: Tool,
tool_args: dict[str, Any],
tool_name: str,
) -> tuple[str, list[File], ToolInvokeMeta | None]:
"""
Invoke a tool and collect its response.
Args:
tool_instance: The tool instance to invoke
tool_args: Tool arguments
tool_name: Name of the tool
Returns:
Tuple of (response_content, tool_files, tool_invoke_meta)
"""
# Process tool_args to replace file references with actual File objects
tool_args = self._replace_file_references(tool_args)
# If a tool invoke hook is set, use it instead of generic_invoke
if self.tool_invoke_hook:
response_content, _, tool_invoke_meta = self.tool_invoke_hook(tool_instance, tool_args, tool_name)
# Note: message_file_ids are stored in DB, we don't convert them to File objects here
# The caller (AgentAppRunner) handles file publishing
return response_content, [], tool_invoke_meta
# Default: use generic_invoke for workflow scenarios
# Import here to avoid circular import
from core.tools.tool_engine import DifyWorkflowCallbackHandler, ToolEngine
tool_response = ToolEngine.generic_invoke(
tool=tool_instance,
tool_parameters=tool_args,
user_id=self.context.user_id or "",
workflow_tool_callback=DifyWorkflowCallbackHandler(),
workflow_call_depth=self.workflow_call_depth,
app_id=self.context.app_id,
conversation_id=self.context.conversation_id,
message_id=self.context.message_id,
)
# Collect response and files
response_content = ""
tool_files: list[File] = []
for response in tool_response:
if response.type == ToolInvokeMessage.MessageType.TEXT:
assert isinstance(response.message, ToolInvokeMessage.TextMessage)
response_content += response.message.text
elif response.type == ToolInvokeMessage.MessageType.LINK:
# Handle link messages
if isinstance(response.message, ToolInvokeMessage.TextMessage):
response_content += f"[Link: {response.message.text}]"
elif response.type == ToolInvokeMessage.MessageType.IMAGE:
# Handle image URL messages
if isinstance(response.message, ToolInvokeMessage.TextMessage):
response_content += f"[Image: {response.message.text}]"
elif response.type == ToolInvokeMessage.MessageType.IMAGE_LINK:
# Handle image link messages
if isinstance(response.message, ToolInvokeMessage.TextMessage):
response_content += f"[Image: {response.message.text}]"
elif response.type == ToolInvokeMessage.MessageType.BINARY_LINK:
# Handle binary file link messages
if isinstance(response.message, ToolInvokeMessage.TextMessage):
filename = response.meta.get("filename", "file") if response.meta else "file"
response_content += f"[File: {filename} - {response.message.text}]"
elif response.type == ToolInvokeMessage.MessageType.JSON:
# Handle JSON messages
if isinstance(response.message, ToolInvokeMessage.JsonMessage):
response_content += json.dumps(response.message.json_object, ensure_ascii=False, indent=2)
elif response.type == ToolInvokeMessage.MessageType.BLOB:
# Handle blob messages - convert to text representation
if isinstance(response.message, ToolInvokeMessage.BlobMessage):
mime_type = (
response.meta.get("mime_type", "application/octet-stream")
if response.meta
else "application/octet-stream"
)
size = len(response.message.blob)
response_content += f"[Binary data: {mime_type}, size: {size} bytes]"
elif response.type == ToolInvokeMessage.MessageType.VARIABLE:
# Handle variable messages
if isinstance(response.message, ToolInvokeMessage.VariableMessage):
var_name = response.message.variable_name
var_value = response.message.variable_value
if isinstance(var_value, str):
response_content += var_value
else:
response_content += f"[Variable {var_name}: {json.dumps(var_value, ensure_ascii=False)}]"
elif response.type == ToolInvokeMessage.MessageType.BLOB_CHUNK:
# Handle blob chunk messages - these are parts of a larger blob
if isinstance(response.message, ToolInvokeMessage.BlobChunkMessage):
response_content += f"[Blob chunk {response.message.sequence}: {len(response.message.blob)} bytes]"
elif response.type == ToolInvokeMessage.MessageType.RETRIEVER_RESOURCES:
# Handle retriever resources messages
if isinstance(response.message, ToolInvokeMessage.RetrieverResourceMessage):
response_content += response.message.context
elif response.type == ToolInvokeMessage.MessageType.FILE:
# Extract file from meta
if response.meta and "file" in response.meta:
file = response.meta["file"]
if isinstance(file, File):
# Check if file is for model or tool output
if response.meta.get("target") == "self":
# File is for model - add to files for next prompt
self.files.append(file)
response_content += f"File '{file.filename}' has been loaded into your context."
else:
# File is tool output
tool_files.append(file)
return response_content, tool_files, None
def _validate_tool_args(self, tool_instance: Tool, tool_args: dict[str, Any]) -> str | None:
"""Validate tool arguments against the tool's required parameters.
Checks that all required LLM-facing parameters are present and non-empty
before actual execution, preventing wasted tool invocations when the model
generates calls with missing arguments (e.g. empty ``{}``).
Returns:
Error message if validation fails, None if all required parameters are satisfied.
"""
prompt_tool = tool_instance.to_prompt_message_tool()
required_params: list[str] = prompt_tool.parameters.get("required", [])
if not required_params:
return None
missing = [
p
for p in required_params
if p not in tool_args
or tool_args[p] is None
or (isinstance(tool_args[p], str) and not tool_args[p].strip())
]
if not missing:
return None
return (
f"Missing required parameter(s): {', '.join(missing)}. "
f"Please provide all required parameters before calling this tool."
)
def _find_tool_by_name(self, tool_name: str) -> Tool | None:
"""Find a tool instance by its name."""
for tool in self.tools:
if tool.entity.identity.name == tool_name:
return tool
return None
def _convert_tools_to_prompt_format(self) -> list[PromptMessageTool]:
"""Convert tools to prompt message format."""
prompt_tools: list[PromptMessageTool] = []
for tool in self.tools:
prompt_tools.append(tool.to_prompt_message_tool())
return prompt_tools
def _update_usage_with_empty(self, llm_usage: dict[str, Any]) -> None:
"""Initialize usage tracking with empty usage if not set."""
if "usage" not in llm_usage or llm_usage["usage"] is None:
llm_usage["usage"] = LLMUsage.empty_usage()

View File

@@ -0,0 +1,358 @@
"""Function Call strategy implementation.
Implements the Function Call agent pattern where the LLM uses native tool-calling
capability to invoke tools. Includes pre-execution parameter validation that
intercepts invalid calls (e.g. empty arguments) before they reach tool backends,
and avoids counting purely-invalid rounds against the iteration budget.
"""
import json
import logging
from collections.abc import Generator
from typing import Any, Union
from core.agent.entities import AgentLog, AgentResult
from core.tools.entities.tool_entities import ToolInvokeMeta
from graphon.file import File
from graphon.model_runtime.entities import (
AssistantPromptMessage,
LLMResult,
LLMResultChunk,
LLMResultChunkDelta,
LLMUsage,
PromptMessage,
PromptMessageTool,
ToolPromptMessage,
)
from .base import AgentPattern
logger = logging.getLogger(__name__)
class FunctionCallStrategy(AgentPattern):
"""Function Call strategy using model's native tool calling capability."""
def run(
self,
prompt_messages: list[PromptMessage],
model_parameters: dict[str, Any],
stop: list[str] = [],
stream: bool = True,
) -> Generator[LLMResultChunk | AgentLog, None, AgentResult]:
"""Execute the function call agent strategy."""
# Convert tools to prompt format
prompt_tools: list[PromptMessageTool] = self._convert_tools_to_prompt_format()
# Initialize tracking
iteration_step: int = 1
max_iterations: int = self.max_iterations + 1
function_call_state: bool = True
total_usage: dict[str, LLMUsage | None] = {"usage": None}
messages: list[PromptMessage] = list(prompt_messages) # Create mutable copy
final_text: str = ""
finish_reason: str | None = None
output_files: list[File] = [] # Track files produced by tools
# Consecutive rounds where ALL tool calls failed parameter validation.
# When this happens the round is "free" (iteration_step not incremented)
# up to a safety cap to prevent infinite loops.
consecutive_validation_failures: int = 0
max_validation_retries: int = 3
while function_call_state and iteration_step <= max_iterations:
function_call_state = False
round_log = self._create_log(
label=f"ROUND {iteration_step}",
log_type=AgentLog.LogType.ROUND,
status=AgentLog.LogStatus.START,
data={},
)
yield round_log
# On last iteration, remove tools to force final answer
current_tools: list[PromptMessageTool] = [] if iteration_step == max_iterations else prompt_tools
model_log = self._create_log(
label=f"{self.model_instance.model_name} Thought",
log_type=AgentLog.LogType.THOUGHT,
status=AgentLog.LogStatus.START,
data={},
parent_id=round_log.id,
extra_metadata={
AgentLog.LogMetadata.PROVIDER: self.model_instance.provider,
},
)
yield model_log
# Track usage for this round only
round_usage: dict[str, LLMUsage | None] = {"usage": None}
# Invoke model
chunks: Union[Generator[LLMResultChunk, None, None], LLMResult] = self.model_instance.invoke_llm(
prompt_messages=messages,
model_parameters=model_parameters,
tools=current_tools,
stop=stop,
stream=stream,
callbacks=[],
)
# Process response
tool_calls, response_content, chunk_finish_reason = yield from self._handle_chunks(
chunks, round_usage, model_log
)
messages.append(self._create_assistant_message(response_content, tool_calls))
# Accumulate to total usage
round_usage_value = round_usage.get("usage")
if round_usage_value:
self._accumulate_usage(total_usage, round_usage_value)
# Update final text if no tool calls (this is likely the final answer)
if not tool_calls:
final_text = response_content
# Update finish reason
if chunk_finish_reason:
finish_reason = chunk_finish_reason
# Process tool calls
tool_outputs: dict[str, str] = {}
all_validation_errors: bool = True
if tool_calls:
function_call_state = True
# Execute tools (with pre-execution parameter validation)
for tool_call_id, tool_name, tool_args in tool_calls:
tool_response, tool_files, _, is_validation_error = yield from self._handle_tool_call(
tool_name, tool_args, tool_call_id, messages, round_log
)
tool_outputs[tool_name] = tool_response
output_files.extend(tool_files)
if not is_validation_error:
all_validation_errors = False
else:
all_validation_errors = False
yield self._finish_log(
round_log,
data={
"llm_result": response_content,
"tool_calls": [
{"name": tc[1], "args": tc[2], "output": tool_outputs.get(tc[1], "")} for tc in tool_calls
]
if tool_calls
else [],
"final_answer": final_text if not function_call_state else None,
},
usage=round_usage.get("usage"),
)
# Skip iteration counter when every tool call in this round failed validation,
# giving the model a free retry — but cap retries to prevent infinite loops.
if tool_calls and all_validation_errors:
consecutive_validation_failures += 1
if consecutive_validation_failures >= max_validation_retries:
logger.warning(
"Agent hit %d consecutive validation-only rounds, forcing iteration increment",
consecutive_validation_failures,
)
iteration_step += 1
consecutive_validation_failures = 0
else:
logger.info(
"All tool calls failed validation (attempt %d/%d), not counting iteration",
consecutive_validation_failures,
max_validation_retries,
)
else:
consecutive_validation_failures = 0
iteration_step += 1
# Return final result
from core.agent.entities import AgentResult
return AgentResult(
text=final_text,
files=output_files,
usage=total_usage.get("usage") or LLMUsage.empty_usage(),
finish_reason=finish_reason,
)
def _handle_chunks(
self,
chunks: Union[Generator[LLMResultChunk, None, None], LLMResult],
llm_usage: dict[str, LLMUsage | None],
start_log: AgentLog,
) -> Generator[
LLMResultChunk | AgentLog,
None,
tuple[list[tuple[str, str, dict[str, Any]]], str, str | None],
]:
"""Handle LLM response chunks and extract tool calls and content.
Returns a tuple of (tool_calls, response_content, finish_reason).
"""
tool_calls: list[tuple[str, str, dict[str, Any]]] = []
response_content: str = ""
finish_reason: str | None = None
if not isinstance(chunks, LLMResult):
# Streaming response
for chunk in chunks:
# Extract tool calls
if self._has_tool_calls(chunk):
tool_calls.extend(self._extract_tool_calls(chunk))
# Extract content
if chunk.delta.message and chunk.delta.message.content:
response_content += self._extract_content(chunk.delta.message.content)
# Track usage
if chunk.delta.usage:
self._accumulate_usage(llm_usage, chunk.delta.usage)
# Capture finish reason
if chunk.delta.finish_reason:
finish_reason = chunk.delta.finish_reason
yield chunk
else:
# Non-streaming response
result: LLMResult = chunks
if self._has_tool_calls_result(result):
tool_calls.extend(self._extract_tool_calls_result(result))
if result.message and result.message.content:
response_content += self._extract_content(result.message.content)
if result.usage:
self._accumulate_usage(llm_usage, result.usage)
# Convert to streaming format
yield LLMResultChunk(
model=result.model,
prompt_messages=result.prompt_messages,
delta=LLMResultChunkDelta(index=0, message=result.message, usage=result.usage),
)
yield self._finish_log(
start_log,
data={
"result": response_content,
},
usage=llm_usage.get("usage"),
)
return tool_calls, response_content, finish_reason
def _create_assistant_message(
self, content: str, tool_calls: list[tuple[str, str, dict[str, Any]]] | None = None
) -> AssistantPromptMessage:
"""Create assistant message with tool calls."""
if tool_calls is None:
return AssistantPromptMessage(content=content)
return AssistantPromptMessage(
content=content or "",
tool_calls=[
AssistantPromptMessage.ToolCall(
id=tc[0],
type="function",
function=AssistantPromptMessage.ToolCall.ToolCallFunction(name=tc[1], arguments=json.dumps(tc[2])),
)
for tc in tool_calls
],
)
def _handle_tool_call(
self,
tool_name: str,
tool_args: dict[str, Any],
tool_call_id: str,
messages: list[PromptMessage],
round_log: AgentLog,
) -> Generator[AgentLog, None, tuple[str, list[File], ToolInvokeMeta | None, bool]]:
"""Handle a single tool call and return response with files, meta, and validation status.
Validates required parameters before execution. When validation fails the tool
is never invoked — a synthetic error is fed back to the model so it can self-correct
without consuming a real iteration.
Returns:
(response_content, tool_files, tool_invoke_meta, is_validation_error).
``is_validation_error`` is True when the call was rejected due to missing
required parameters, allowing the caller to skip the iteration counter.
"""
# Find tool
tool_instance = self._find_tool_by_name(tool_name)
if not tool_instance:
raise ValueError(f"Tool {tool_name} not found")
# Get tool metadata (provider, icon, etc.)
tool_metadata = self._get_tool_metadata(tool_instance)
# Create tool call log
tool_call_log = self._create_log(
label=f"CALL {tool_name}",
log_type=AgentLog.LogType.TOOL_CALL,
status=AgentLog.LogStatus.START,
data={
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"tool_args": tool_args,
},
parent_id=round_log.id,
extra_metadata=tool_metadata,
)
yield tool_call_log
# Validate required parameters before execution to avoid wasted invocations
validation_error = self._validate_tool_args(tool_instance, tool_args)
if validation_error:
tool_call_log.status = AgentLog.LogStatus.ERROR
tool_call_log.error = validation_error
tool_call_log.data = {**tool_call_log.data, "error": validation_error}
yield tool_call_log
messages.append(ToolPromptMessage(content=validation_error, tool_call_id=tool_call_id, name=tool_name))
return validation_error, [], None, True
# Invoke tool using base class method with error handling
try:
response_content, tool_files, tool_invoke_meta = self._invoke_tool(tool_instance, tool_args, tool_name)
yield self._finish_log(
tool_call_log,
data={
**tool_call_log.data,
"output": response_content,
"files": len(tool_files),
"meta": tool_invoke_meta.to_dict() if tool_invoke_meta else None,
},
)
final_content = response_content or "Tool executed successfully"
# Add tool response to messages
messages.append(
ToolPromptMessage(
content=final_content,
tool_call_id=tool_call_id,
name=tool_name,
)
)
return response_content, tool_files, tool_invoke_meta, False
except Exception as e:
# Tool invocation failed, yield error log
error_message = str(e)
tool_call_log.status = AgentLog.LogStatus.ERROR
tool_call_log.error = error_message
tool_call_log.data = {
**tool_call_log.data,
"error": error_message,
}
yield tool_call_log
# Add error message to conversation
error_content = f"Tool execution failed: {error_message}"
messages.append(
ToolPromptMessage(
content=error_content,
tool_call_id=tool_call_id,
name=tool_name,
)
)
return error_content, [], None, False

View File

@@ -0,0 +1,418 @@
"""ReAct strategy implementation."""
from __future__ import annotations
import json
from collections.abc import Generator
from typing import TYPE_CHECKING, Any, Union
from core.agent.entities import AgentLog, AgentResult, AgentScratchpadUnit, ExecutionContext
from core.agent.output_parser.cot_output_parser import CotAgentOutputParser
from core.model_manager import ModelInstance
from graphon.file import File
from graphon.model_runtime.entities import (
AssistantPromptMessage,
LLMResult,
LLMResultChunk,
LLMResultChunkDelta,
PromptMessage,
SystemPromptMessage,
)
from .base import AgentPattern, ToolInvokeHook
if TYPE_CHECKING:
from core.tools.__base.tool import Tool
class ReActStrategy(AgentPattern):
"""ReAct strategy using reasoning and acting approach."""
def __init__(
self,
model_instance: ModelInstance,
tools: list[Tool],
context: ExecutionContext,
max_iterations: int = 10,
workflow_call_depth: int = 0,
files: list[File] = [],
tool_invoke_hook: ToolInvokeHook | None = None,
instruction: str = "",
):
"""Initialize the ReAct strategy with instruction support."""
super().__init__(
model_instance=model_instance,
tools=tools,
context=context,
max_iterations=max_iterations,
workflow_call_depth=workflow_call_depth,
files=files,
tool_invoke_hook=tool_invoke_hook,
)
self.instruction = instruction
def run(
self,
prompt_messages: list[PromptMessage],
model_parameters: dict[str, Any],
stop: list[str] = [],
stream: bool = True,
) -> Generator[LLMResultChunk | AgentLog, None, AgentResult]:
"""Execute the ReAct agent strategy."""
# Initialize tracking
agent_scratchpad: list[AgentScratchpadUnit] = []
iteration_step: int = 1
max_iterations: int = self.max_iterations + 1
react_state: bool = True
total_usage: dict[str, Any] = {"usage": None}
output_files: list[File] = [] # Track files produced by tools
final_text: str = ""
finish_reason: str | None = None
# Add "Observation" to stop sequences
if "Observation" not in stop:
stop = stop.copy()
stop.append("Observation")
while react_state and iteration_step <= max_iterations:
react_state = False
round_log = self._create_log(
label=f"ROUND {iteration_step}",
log_type=AgentLog.LogType.ROUND,
status=AgentLog.LogStatus.START,
data={},
)
yield round_log
# Build prompt with/without tools based on iteration
include_tools = iteration_step < max_iterations
current_messages = self._build_prompt_with_react_format(
prompt_messages, agent_scratchpad, include_tools, self.instruction
)
model_log = self._create_log(
label=f"{self.model_instance.model_name} Thought",
log_type=AgentLog.LogType.THOUGHT,
status=AgentLog.LogStatus.START,
data={},
parent_id=round_log.id,
extra_metadata={
AgentLog.LogMetadata.PROVIDER: self.model_instance.provider,
},
)
yield model_log
# Track usage for this round only
round_usage: dict[str, Any] = {"usage": None}
# Use current messages directly (files are handled by base class if needed)
messages_to_use = current_messages
# Invoke model
chunks: Union[Generator[LLMResultChunk, None, None], LLMResult] = self.model_instance.invoke_llm(
prompt_messages=messages_to_use,
model_parameters=model_parameters,
stop=stop,
stream=stream,
callbacks=[],
)
# Process response
scratchpad, chunk_finish_reason = yield from self._handle_chunks(
chunks, round_usage, model_log, current_messages
)
agent_scratchpad.append(scratchpad)
# Accumulate to total usage
round_usage_value = round_usage.get("usage")
if round_usage_value:
self._accumulate_usage(total_usage, round_usage_value)
# Update finish reason
if chunk_finish_reason:
finish_reason = chunk_finish_reason
# Check if we have an action to execute
if scratchpad.action and scratchpad.action.action_name.lower() != "final answer":
react_state = True
# Execute tool
observation, tool_files = yield from self._handle_tool_call(
scratchpad.action, current_messages, round_log
)
scratchpad.observation = observation
# Track files produced by tools
output_files.extend(tool_files)
# Add observation to scratchpad for display
yield self._create_text_chunk(f"\nObservation: {observation}\n", current_messages)
else:
# Extract final answer
if scratchpad.action and scratchpad.action.action_input:
final_answer = scratchpad.action.action_input
if isinstance(final_answer, dict):
final_answer = json.dumps(final_answer, ensure_ascii=False)
final_text = str(final_answer)
elif scratchpad.thought:
# If no action but we have thought, use thought as final answer
final_text = scratchpad.thought
yield self._finish_log(
round_log,
data={
"thought": scratchpad.thought,
"action": scratchpad.action_str if scratchpad.action else None,
"observation": scratchpad.observation or None,
"final_answer": final_text if not react_state else None,
},
usage=round_usage.get("usage"),
)
iteration_step += 1
# Return final result
from core.agent.entities import AgentResult
return AgentResult(
text=final_text, files=output_files, usage=total_usage.get("usage"), finish_reason=finish_reason
)
def _build_prompt_with_react_format(
self,
original_messages: list[PromptMessage],
agent_scratchpad: list[AgentScratchpadUnit],
include_tools: bool = True,
instruction: str = "",
) -> list[PromptMessage]:
"""Build prompt messages with ReAct format."""
# Copy messages to avoid modifying original
messages = list(original_messages)
# Find and update the system prompt that should already exist
system_prompt_found = False
for i, msg in enumerate(messages):
if isinstance(msg, SystemPromptMessage):
system_prompt_found = True
# The system prompt from frontend already has the template, just replace placeholders
# Format tools
tools_str = ""
tool_names = []
if include_tools and self.tools:
# Convert tools to prompt message tools format
prompt_tools = [tool.to_prompt_message_tool() for tool in self.tools]
tool_names = [tool.name for tool in prompt_tools]
# Format tools as JSON for comprehensive information
from graphon.model_runtime.utils.encoders import jsonable_encoder
tools_str = json.dumps(jsonable_encoder(prompt_tools), indent=2)
tool_names_str = ", ".join(f'"{name}"' for name in tool_names)
else:
tools_str = "No tools available"
tool_names_str = ""
# Replace placeholders in the existing system prompt
updated_content = msg.content
assert isinstance(updated_content, str)
updated_content = updated_content.replace("{{instruction}}", instruction)
updated_content = updated_content.replace("{{tools}}", tools_str)
updated_content = updated_content.replace("{{tool_names}}", tool_names_str)
# Create new SystemPromptMessage with updated content
messages[i] = SystemPromptMessage(content=updated_content)
break
# If no system prompt found, that's unexpected but add scratchpad anyway
if not system_prompt_found:
# This shouldn't happen if frontend is working correctly
pass
# Format agent scratchpad
scratchpad_str = ""
if agent_scratchpad:
scratchpad_parts: list[str] = []
for unit in agent_scratchpad:
if unit.thought:
scratchpad_parts.append(f"Thought: {unit.thought}")
if unit.action_str:
scratchpad_parts.append(f"Action:\n```\n{unit.action_str}\n```")
if unit.observation:
scratchpad_parts.append(f"Observation: {unit.observation}")
scratchpad_str = "\n".join(scratchpad_parts)
# If there's a scratchpad, append it to the last message
if scratchpad_str:
messages.append(AssistantPromptMessage(content=scratchpad_str))
return messages
def _handle_chunks(
self,
chunks: Union[Generator[LLMResultChunk, None, None], LLMResult],
llm_usage: dict[str, Any],
model_log: AgentLog,
current_messages: list[PromptMessage],
) -> Generator[
LLMResultChunk | AgentLog,
None,
tuple[AgentScratchpadUnit, str | None],
]:
"""Handle LLM response chunks and extract action/thought.
Returns a tuple of (scratchpad_unit, finish_reason).
"""
usage_dict: dict[str, Any] = {}
# Convert non-streaming to streaming format if needed
if isinstance(chunks, LLMResult):
result = chunks
def result_to_chunks() -> Generator[LLMResultChunk, None, None]:
yield LLMResultChunk(
model=result.model,
prompt_messages=result.prompt_messages,
delta=LLMResultChunkDelta(
index=0,
message=result.message,
usage=result.usage,
finish_reason=None,
),
system_fingerprint=result.system_fingerprint or "",
)
streaming_chunks = result_to_chunks()
else:
streaming_chunks = chunks
react_chunks = CotAgentOutputParser.handle_react_stream_output(streaming_chunks, usage_dict)
# Initialize scratchpad unit
scratchpad = AgentScratchpadUnit(
agent_response="",
thought="",
action_str="",
observation="",
action=None,
)
finish_reason: str | None = None
# Process chunks
for chunk in react_chunks:
if isinstance(chunk, AgentScratchpadUnit.Action):
# Action detected
action_str = json.dumps(chunk.model_dump())
scratchpad.agent_response = (scratchpad.agent_response or "") + action_str
scratchpad.action_str = action_str
scratchpad.action = chunk
yield self._create_text_chunk(json.dumps(chunk.model_dump()), current_messages)
else:
# Text chunk
chunk_text = str(chunk)
scratchpad.agent_response = (scratchpad.agent_response or "") + chunk_text
scratchpad.thought = (scratchpad.thought or "") + chunk_text
yield self._create_text_chunk(chunk_text, current_messages)
# Update usage
if usage_dict.get("usage"):
if llm_usage.get("usage"):
self._accumulate_usage(llm_usage, usage_dict["usage"])
else:
llm_usage["usage"] = usage_dict["usage"]
# Clean up thought
scratchpad.thought = (scratchpad.thought or "").strip() or "I am thinking about how to help you"
# Finish model log
yield self._finish_log(
model_log,
data={
"thought": scratchpad.thought,
"action": scratchpad.action_str if scratchpad.action else None,
},
usage=llm_usage.get("usage"),
)
return scratchpad, finish_reason
def _handle_tool_call(
self,
action: AgentScratchpadUnit.Action,
prompt_messages: list[PromptMessage],
round_log: AgentLog,
) -> Generator[AgentLog, None, tuple[str, list[File]]]:
"""Handle tool call and return observation with files."""
tool_name = action.action_name
tool_args: dict[str, Any] | str = action.action_input
# Find tool instance first to get metadata
tool_instance = self._find_tool_by_name(tool_name)
tool_metadata = self._get_tool_metadata(tool_instance) if tool_instance else {}
# Start tool log with tool metadata
tool_log = self._create_log(
label=f"CALL {tool_name}",
log_type=AgentLog.LogType.TOOL_CALL,
status=AgentLog.LogStatus.START,
data={
"tool_name": tool_name,
"tool_args": tool_args,
},
parent_id=round_log.id,
extra_metadata=tool_metadata,
)
yield tool_log
if not tool_instance:
# Finish tool log with error
yield self._finish_log(
tool_log,
data={
**tool_log.data,
"error": f"Tool {tool_name} not found",
},
)
return f"Tool {tool_name} not found", []
# Ensure tool_args is a dict
tool_args_dict: dict[str, Any]
if isinstance(tool_args, str):
try:
tool_args_dict = json.loads(tool_args)
except json.JSONDecodeError:
tool_args_dict = {"input": tool_args}
elif not isinstance(tool_args, dict):
tool_args_dict = {"input": str(tool_args)}
else:
tool_args_dict = tool_args
# Invoke tool using base class method with error handling
try:
response_content, tool_files, tool_invoke_meta = self._invoke_tool(tool_instance, tool_args_dict, tool_name)
# Finish tool log
yield self._finish_log(
tool_log,
data={
**tool_log.data,
"output": response_content,
"files": len(tool_files),
"meta": tool_invoke_meta.to_dict() if tool_invoke_meta else None,
},
)
return response_content or "Tool executed successfully", tool_files
except Exception as e:
# Tool invocation failed, yield error log
error_message = str(e)
tool_log.status = AgentLog.LogStatus.ERROR
tool_log.error = error_message
tool_log.data = {
**tool_log.data,
"error": error_message,
}
yield tool_log
return f"Tool execution failed: {error_message}", []

View File

@@ -0,0 +1,108 @@
"""Strategy factory for creating agent strategies."""
from __future__ import annotations
from typing import TYPE_CHECKING
from core.agent.entities import AgentEntity, ExecutionContext
from core.model_manager import ModelInstance
from graphon.file.models import File
from graphon.model_runtime.entities.model_entities import ModelFeature
from .base import AgentPattern, ToolInvokeHook
from .function_call import FunctionCallStrategy
from .react import ReActStrategy
if TYPE_CHECKING:
from core.tools.__base.tool import Tool
class StrategyFactory:
"""Factory for creating agent strategies based on model features."""
# Tool calling related features
TOOL_CALL_FEATURES = {ModelFeature.TOOL_CALL, ModelFeature.MULTI_TOOL_CALL, ModelFeature.STREAM_TOOL_CALL}
@staticmethod
def create_strategy(
model_features: list[ModelFeature],
model_instance: ModelInstance,
context: ExecutionContext,
tools: list[Tool],
files: list[File],
max_iterations: int = 10,
workflow_call_depth: int = 0,
agent_strategy: AgentEntity.Strategy | None = None,
tool_invoke_hook: ToolInvokeHook | None = None,
instruction: str = "",
) -> AgentPattern:
"""
Create an appropriate strategy based on model features.
Args:
model_features: List of model features/capabilities
model_instance: Model instance to use
context: Execution context containing trace/audit information
tools: Available tools
files: Available files
max_iterations: Maximum iterations for the strategy
workflow_call_depth: Depth of workflow calls
agent_strategy: Optional explicit strategy override
tool_invoke_hook: Optional hook for custom tool invocation (e.g., agent_invoke)
instruction: Optional instruction for ReAct strategy
Returns:
AgentStrategy instance
"""
# If explicit strategy is provided and it's Function Calling, try to use it if supported
if agent_strategy == AgentEntity.Strategy.FUNCTION_CALLING:
if set(model_features) & StrategyFactory.TOOL_CALL_FEATURES:
return FunctionCallStrategy(
model_instance=model_instance,
context=context,
tools=tools,
files=files,
max_iterations=max_iterations,
workflow_call_depth=workflow_call_depth,
tool_invoke_hook=tool_invoke_hook,
)
# Fallback to ReAct if FC is requested but not supported
# If explicit strategy is Chain of Thought (ReAct)
if agent_strategy == AgentEntity.Strategy.CHAIN_OF_THOUGHT:
return ReActStrategy(
model_instance=model_instance,
context=context,
tools=tools,
files=files,
max_iterations=max_iterations,
workflow_call_depth=workflow_call_depth,
tool_invoke_hook=tool_invoke_hook,
instruction=instruction,
)
# Default auto-selection logic
if set(model_features) & StrategyFactory.TOOL_CALL_FEATURES:
# Model supports native function calling
return FunctionCallStrategy(
model_instance=model_instance,
context=context,
tools=tools,
files=files,
max_iterations=max_iterations,
workflow_call_depth=workflow_call_depth,
tool_invoke_hook=tool_invoke_hook,
)
else:
# Use ReAct strategy for models without function calling
return ReActStrategy(
model_instance=model_instance,
context=context,
tools=tools,
files=files,
max_iterations=max_iterations,
workflow_call_depth=workflow_call_depth,
tool_invoke_hook=tool_invoke_hook,
instruction=instruction,
)

View File

@@ -5,6 +5,10 @@ from configs import dify_config
from constants import DEFAULT_FILE_NUMBER_LIMITS
class FeatureToggleDict(TypedDict):
enabled: bool
class SystemParametersDict(TypedDict):
image_file_size_limit: int
video_file_size_limit: int
@@ -16,12 +20,12 @@ class SystemParametersDict(TypedDict):
class AppParametersDict(TypedDict):
opening_statement: str | None
suggested_questions: list[str]
suggested_questions_after_answer: dict[str, Any]
speech_to_text: dict[str, Any]
text_to_speech: dict[str, Any]
retriever_resource: dict[str, Any]
annotation_reply: dict[str, Any]
more_like_this: dict[str, Any]
suggested_questions_after_answer: FeatureToggleDict
speech_to_text: FeatureToggleDict
text_to_speech: FeatureToggleDict
retriever_resource: FeatureToggleDict
annotation_reply: FeatureToggleDict
more_like_this: FeatureToggleDict
user_input_form: list[dict[str, Any]]
sensitive_word_avoidance: dict[str, Any]
file_upload: dict[str, Any]

View File

@@ -1,4 +1,3 @@
from collections.abc import Sequence
from enum import StrEnum, auto
from typing import Any, Literal
@@ -9,6 +8,7 @@ from graphon.variables.input_entities import VariableEntity as WorkflowVariableE
from pydantic import BaseModel, Field
from core.rag.data_post_processor.data_post_processor import RerankingModelDict, WeightsDict
from core.rag.entities import MetadataFilteringCondition
from models.model import AppMode
@@ -111,31 +111,6 @@ class ExternalDataVariableEntity(BaseModel):
config: dict[str, Any] = Field(default_factory=dict)
SupportedComparisonOperator = Literal[
# for string or array
"contains",
"not contains",
"start with",
"end with",
"is",
"is not",
"empty",
"not empty",
"in",
"not in",
# for number
"=",
"",
">",
"<",
"",
"",
# for time
"before",
"after",
]
class ModelConfig(BaseModel):
provider: str
name: str
@@ -143,25 +118,6 @@ class ModelConfig(BaseModel):
completion_params: dict[str, Any] = Field(default_factory=dict)
class Condition(BaseModel):
"""
Condition detail
"""
name: str
comparison_operator: SupportedComparisonOperator
value: str | Sequence[str] | None | int | float = None
class MetadataFilteringCondition(BaseModel):
"""
Metadata Filtering Condition.
"""
logical_operator: Literal["and", "or"] | None = "and"
conditions: list[Condition] | None = Field(default=None, deprecated=True)
class DatasetRetrieveConfigEntity(BaseModel):
"""
Dataset Retrieve Config Entity.

View File

@@ -177,6 +177,14 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
# always enable retriever resource in debugger mode
app_config.additional_features.show_retrieve_source = True # type: ignore
# Resolve parent_message_id for thread continuity
if invoke_from == InvokeFrom.SERVICE_API:
parent_message_id: str | None = UUID_NIL
else:
parent_message_id = args.get("parent_message_id")
if not parent_message_id and conversation:
parent_message_id = self._resolve_latest_message_id(conversation.id)
# init application generate entity
application_generate_entity = AdvancedChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
@@ -188,7 +196,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
),
query=query,
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
parent_message_id=parent_message_id,
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
@@ -689,3 +697,17 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
else:
logger.exception("Failed to process generate task pipeline, conversation_id: %s", conversation.id)
raise e
@staticmethod
def _resolve_latest_message_id(conversation_id: str) -> str | None:
"""Auto-resolve parent_message_id to the latest message when client doesn't provide one."""
from sqlalchemy import select
stmt = (
select(Message.id)
.where(Message.conversation_id == conversation_id)
.order_by(Message.created_at.desc())
.limit(1)
)
latest_id = db.session.scalar(stmt)
return str(latest_id) if latest_id else None

View File

@@ -246,6 +246,10 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
for layer in self._graph_engine_layers:
workflow_entry.graph_engine.layer(layer)
if hasattr(self, '_sandbox') and self._sandbox is not None:
from core.app.layers.sandbox_layer import SandboxLayer
workflow_entry.graph_engine.layer(SandboxLayer(self._sandbox))
generator = workflow_entry.run()
for event in generator:

View File

@@ -1,15 +1,12 @@
import logging
from typing import cast
from graphon.model_runtime.entities.llm_entities import LLMMode
from graphon.model_runtime.entities.model_entities import ModelFeature, ModelPropertyKey
from graphon.model_runtime.entities.model_entities import ModelFeature
from graphon.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from sqlalchemy import select
from core.agent.cot_chat_agent_runner import CotChatAgentRunner
from core.agent.cot_completion_agent_runner import CotCompletionAgentRunner
from core.agent.agent_app_runner import AgentAppRunner
from core.agent.entities import AgentEntity
from core.agent.fc_agent_runner import FunctionCallAgentRunner
from core.app.apps.agent_chat.app_config_manager import AgentChatAppConfig
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.base_app_runner import AppRunner
@@ -194,22 +191,7 @@ class AgentChatAppRunner(AppRunner):
raise ValueError("Message not found")
db.session.close()
runner_cls: type[FunctionCallAgentRunner] | type[CotChatAgentRunner] | type[CotCompletionAgentRunner]
# start agent runner
if agent_entity.strategy == AgentEntity.Strategy.CHAIN_OF_THOUGHT:
# check LLM mode
if model_schema.model_properties.get(ModelPropertyKey.MODE) == LLMMode.CHAT:
runner_cls = CotChatAgentRunner
elif model_schema.model_properties.get(ModelPropertyKey.MODE) == LLMMode.COMPLETION:
runner_cls = CotCompletionAgentRunner
else:
raise ValueError(f"Invalid LLM mode: {model_schema.model_properties.get(ModelPropertyKey.MODE)}")
elif agent_entity.strategy == AgentEntity.Strategy.FUNCTION_CALLING:
runner_cls = FunctionCallAgentRunner
else:
raise ValueError(f"Invalid agent strategy: {agent_entity.strategy}")
runner = runner_cls(
runner = AgentAppRunner(
tenant_id=app_config.tenant_id,
application_generate_entity=application_generate_entity,
conversation=conversation_result,

View File

@@ -107,13 +107,13 @@ class AppGenerateResponseConverter(ABC):
return metadata
@classmethod
def _error_to_stream_response(cls, e: Exception):
def _error_to_stream_response(cls, e: Exception) -> dict[str, Any]:
"""
Error to stream response.
:param e: exception
:return:
"""
error_responses = {
error_responses: dict[type[Exception], dict[str, Any]] = {
ValueError: {"code": "invalid_param", "status": 400},
ProviderTokenNotInitError: {"code": "provider_not_initialize", "status": 400},
QuotaExceededError: {
@@ -127,7 +127,7 @@ class AppGenerateResponseConverter(ABC):
}
# Determine the response based on the type of exception
data = None
data: dict[str, Any] | None = None
for k, v in error_responses.items():
if isinstance(e, k):
data = v

View File

@@ -0,0 +1,54 @@
"""Legacy Response Adapter for transparent upgrade.
When old apps (chat/completion/agent-chat) run through the Agent V2
workflow engine via transparent upgrade, the SSE events are in workflow
format (workflow_started, node_started, etc.). This adapter filters out
workflow-specific events and passes through only the events that old
clients expect (message, message_end, etc.).
"""
from __future__ import annotations
import json
import logging
from collections.abc import Generator
from typing import Any
logger = logging.getLogger(__name__)
WORKFLOW_ONLY_EVENTS = frozenset({
"workflow_started",
"workflow_finished",
"node_started",
"node_finished",
"iteration_started",
"iteration_next",
"iteration_completed",
})
def adapt_workflow_stream_for_legacy(
stream: Generator[str, None, None],
) -> Generator[str, None, None]:
"""Filter workflow-specific SSE events from a streaming response.
Passes through message, message_end, agent_log, error, ping events.
Suppresses workflow_started, workflow_finished, node_started, node_finished.
This makes the SSE stream look more like what old easy-UI apps produce,
while still carrying the actual LLM response content.
"""
for chunk in stream:
if not chunk or not chunk.strip():
yield chunk
continue
try:
if chunk.startswith("data: "):
data = json.loads(chunk[6:])
event = data.get("event", "")
if event in WORKFLOW_ONLY_EVENTS:
continue
yield chunk
except (json.JSONDecodeError, TypeError):
yield chunk

View File

@@ -170,6 +170,10 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
for layer in self._graph_engine_layers:
workflow_entry.graph_engine.layer(layer)
if hasattr(self, '_sandbox') and self._sandbox is not None:
from core.app.layers.sandbox_layer import SandboxLayer
workflow_entry.graph_engine.layer(SandboxLayer(self._sandbox))
generator = workflow_entry.run()
for event in generator:

View File

@@ -66,7 +66,7 @@ from core.app.entities.queue_entities import (
QueueWorkflowStartedEvent,
QueueWorkflowSucceededEvent,
)
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.rag.entities import RetrievalSourceMetadata
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id, resolve_workflow_node_class
from core.workflow.system_variables import (
build_bootstrap_variables,
@@ -104,6 +104,89 @@ class WorkflowBasedAppRunner:
return UserFrom.ACCOUNT
return UserFrom.END_USER
@staticmethod
def _resolve_sandbox_context(tenant_id: str, user_id: str, app_id: str) -> dict[str, Any] | None:
"""Create a sandbox and inject it into run_context if a provider is configured
AND the DifyCli binary is available for the current platform."""
try:
from core.app.entities.app_invoke_entities import DIFY_SANDBOX_CONTEXT_KEY
from core.sandbox.bash.dify_cli import DifyCliLocator
from core.sandbox.builder import SandboxBuilder
from core.sandbox.entities.sandbox_type import SandboxType
from core.sandbox.storage.noop_storage import NoopSandboxStorage
from core.virtual_environment.__base.entities import Arch, OperatingSystem
from platform import machine, system as os_system
from services.sandbox.sandbox_provider_service import SandboxProviderService
provider = SandboxProviderService.get_sandbox_provider(tenant_id)
sandbox_type = SandboxType(provider.provider_type)
if sandbox_type == SandboxType.LOCAL:
logger.debug("[SANDBOX] Local provider not supported under gevent worker, skipping")
return None
os_name = os_system().lower()
arch_name = machine().lower()
os_enum = OperatingSystem.LINUX if os_name == "linux" else OperatingSystem.DARWIN
arch_enum = Arch.ARM64 if arch_name in ("arm64", "aarch64") else Arch.AMD64
cli_binary = DifyCliLocator().resolve(os_enum, arch_enum)
# Also resolve linux binary for Docker containers
cli_binary_linux = None
if os_name != "linux":
try:
cli_binary_linux = DifyCliLocator().resolve(OperatingSystem.LINUX, arch_enum)
except FileNotFoundError:
pass
from core.sandbox.builder import _get_sandbox_class
from core.virtual_environment.__base.helpers import submit_command, with_connection, pipeline
vm_class = _get_sandbox_class(SandboxType(provider.provider_type))
vm = vm_class(
tenant_id=tenant_id,
options=provider.config or {},
environments={},
user_id=user_id,
)
vm.open_enviroment()
from core.sandbox.sandbox import Sandbox
sandbox = Sandbox(
vm=vm,
storage=NoopSandboxStorage(),
tenant_id=tenant_id,
user_id=user_id,
app_id=app_id,
assets_id=app_id,
)
from core.sandbox.entities.config import DifyCli as DifyCliPaths
from io import BytesIO
cli_paths = DifyCliPaths(sandbox.id)
vm_binary = cli_binary_linux if (vm.metadata.os == OperatingSystem.LINUX and cli_binary_linux) else cli_binary
with open(vm_binary.path, "rb") as f:
pipeline(vm).add(["mkdir", "-p", cli_paths.bin_dir]).execute(raise_on_error=True)
vm.upload_file(cli_paths.bin_path, BytesIO(f.read()))
with with_connection(vm) as conn:
submit_command(vm, conn, ["chmod", "+x", cli_paths.bin_path]).result(timeout=10)
logger.info("[SANDBOX] CLI binary uploaded to container: %s", cli_paths.bin_path)
sandbox.mount()
sandbox.mark_ready()
logger.info("[SANDBOX] Created sandbox for tenant=%s, provider=%s", tenant_id, provider.provider_type)
return {DIFY_SANDBOX_CONTEXT_KEY: sandbox}
except FileNotFoundError:
logger.debug("[SANDBOX] DifyCli binary not found, skipping sandbox creation")
return None
except Exception:
logger.warning("[SANDBOX] Failed to create sandbox", exc_info=True)
return None
def _build_sandbox_layer(self) -> GraphEngineLayer | None:
"""Build a SandboxLayer if sandbox exists in _graph_engine_layers context."""
return None
def _init_graph(
self,
graph_config: Mapping[str, Any],
@@ -127,7 +210,13 @@ class WorkflowBasedAppRunner:
if not isinstance(graph_config.get("edges"), list):
raise ValueError("edges in workflow graph must be a list")
# Create required parameters for Graph.init
extra_context = self._resolve_sandbox_context(tenant_id or "", user_id, self._app_id)
if extra_context:
from core.app.entities.app_invoke_entities import DIFY_SANDBOX_CONTEXT_KEY
self._sandbox = extra_context.get(DIFY_SANDBOX_CONTEXT_KEY)
else:
self._sandbox = None
graph_init_params = GraphInitParams(
workflow_id=workflow_id,
graph_config=graph_config,
@@ -137,12 +226,11 @@ class WorkflowBasedAppRunner:
user_id=user_id,
user_from=user_from,
invoke_from=invoke_from,
extra_context=extra_context,
),
call_depth=0,
)
# Use the provided graph_runtime_state for consistent state management
node_factory = DifyNodeFactory(
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,

View File

@@ -0,0 +1,352 @@
from __future__ import annotations
import os
from collections import defaultdict
from collections.abc import Generator
from enum import StrEnum
from pydantic import BaseModel, Field
class AssetNodeType(StrEnum):
FILE = "file"
FOLDER = "folder"
class AppAssetNode(BaseModel):
id: str = Field(description="Unique identifier for the node")
node_type: AssetNodeType = Field(description="Type of node: file or folder")
name: str = Field(description="Name of the file or folder")
parent_id: str | None = Field(default=None, description="Parent folder ID, None for root level")
order: int = Field(default=0, description="Sort order within parent folder, lower values first")
extension: str = Field(default="", description="File extension without dot, empty for folders")
size: int = Field(default=0, description="File size in bytes, 0 for folders")
@classmethod
def create_folder(cls, node_id: str, name: str, parent_id: str | None = None) -> AppAssetNode:
return cls(id=node_id, node_type=AssetNodeType.FOLDER, name=name, parent_id=parent_id)
@classmethod
def create_file(cls, node_id: str, name: str, parent_id: str | None = None, size: int = 0) -> AppAssetNode:
return cls(
id=node_id,
node_type=AssetNodeType.FILE,
name=name,
parent_id=parent_id,
extension=name.rsplit(".", 1)[-1] if "." in name else "",
size=size,
)
class AppAssetNodeView(BaseModel):
id: str = Field(description="Unique identifier for the node")
node_type: str = Field(description="Type of node: 'file' or 'folder'")
name: str = Field(description="Name of the file or folder")
path: str = Field(description="Full path from root, e.g. '/folder/file.txt'")
extension: str = Field(default="", description="File extension without dot")
size: int = Field(default=0, description="File size in bytes")
children: list[AppAssetNodeView] = Field(default_factory=list, description="Child nodes for folders")
class BatchUploadNode(BaseModel):
"""Structure for batch upload_url tree nodes, used for both input and output."""
name: str
node_type: AssetNodeType
size: int = 0
children: list[BatchUploadNode] = []
id: str | None = None
upload_url: str | None = None
def to_app_asset_nodes(self, parent_id: str | None = None) -> list[AppAssetNode]:
"""
Generate IDs when missing and convert to AppAssetNode list.
Mutates self to set id field when it is not set.
"""
from uuid import uuid4
self.id = self.id or str(uuid4())
nodes: list[AppAssetNode] = []
if self.node_type == AssetNodeType.FOLDER:
nodes.append(AppAssetNode.create_folder(self.id, self.name, parent_id))
for child in self.children:
nodes.extend(child.to_app_asset_nodes(self.id))
else:
nodes.append(AppAssetNode.create_file(self.id, self.name, parent_id, self.size))
return nodes
class TreeNodeNotFoundError(Exception):
"""Tree internal: node not found"""
pass
class TreeParentNotFoundError(Exception):
"""Tree internal: parent folder not found"""
pass
class TreePathConflictError(Exception):
"""Tree internal: path already exists"""
pass
class AppAssetFileTree(BaseModel):
"""
File tree structure for app assets using adjacency list pattern.
Design:
- Storage: Flat list with parent_id references (adjacency list)
- Path: Computed dynamically via get_path(), not stored
- Order: Integer field for user-defined sorting within each folder
- API response: transform() builds nested tree with computed paths
Why adjacency list over nested tree or materialized path:
- Simpler CRUD: move/rename only updates one node's parent_id
- No path cascade: renaming parent doesn't require updating all descendants
- JSON-friendly: flat list serializes cleanly to database JSON column
- Trade-off: path lookup is O(depth), acceptable for typical file trees
"""
nodes: list[AppAssetNode] = Field(default_factory=list, description="Flat list of all nodes in the tree")
def ensure_unique_name(
self,
parent_id: str | None,
name: str,
*,
is_file: bool,
extra_taken: set[str] | None = None,
) -> str:
"""
Return a sibling-unique name by appending numeric suffixes when needed.
The suffix format is " <n>" (e.g. "report 1", "report 2"). For files,
the suffix is inserted before the extension.
"""
taken = extra_taken or set()
if not self.has_child_named(parent_id, name) and name not in taken:
return name
suffix_index = 1
while True:
candidate = self._apply_name_suffix(name, suffix_index, is_file=is_file)
if not self.has_child_named(parent_id, candidate) and candidate not in taken:
return candidate
suffix_index += 1
@staticmethod
def _apply_name_suffix(name: str, suffix_index: int, *, is_file: bool) -> str:
if not is_file:
return f"{name} {suffix_index}"
stem, extension = os.path.splitext(name)
return f"{stem} {suffix_index}{extension}"
def get(self, node_id: str) -> AppAssetNode | None:
return next((n for n in self.nodes if n.id == node_id), None)
def get_children(self, parent_id: str | None) -> list[AppAssetNode]:
return [n for n in self.nodes if n.parent_id == parent_id]
def has_child_named(self, parent_id: str | None, name: str) -> bool:
return any(n.name == name and n.parent_id == parent_id for n in self.nodes)
def get_path(self, node_id: str) -> str:
node = self.get(node_id)
if not node:
raise TreeNodeNotFoundError(node_id)
parts: list[str] = []
current: AppAssetNode | None = node
while current:
parts.append(current.name)
current = self.get(current.parent_id) if current.parent_id else None
return "/".join(reversed(parts))
def relative_path(self, a: AppAssetNode, b: AppAssetNode) -> str:
"""
Calculate relative path from node a to node b for Markdown references.
Path is computed from a's parent directory (where the file resides).
Examples:
/foo/a.md -> /foo/b.md => ./b.md
/foo/a.md -> /foo/sub/b.md => ./sub/b.md
/foo/sub/a.md -> /foo/b.md => ../b.md
/foo/sub/deep/a.md -> /foo/b.md => ../../b.md
"""
def get_ancestor_ids(node_id: str | None) -> list[str]:
chain: list[str] = []
current_id = node_id
while current_id:
chain.append(current_id)
node = self.get(current_id)
current_id = node.parent_id if node else None
return chain
a_dir_ancestors = get_ancestor_ids(a.parent_id)
b_ancestors = [b.id] + get_ancestor_ids(b.parent_id)
a_dir_set = set(a_dir_ancestors)
lca_id: str | None = None
lca_index_in_b = -1
for idx, ancestor_id in enumerate(b_ancestors):
if ancestor_id in a_dir_set or (a.parent_id is None and b_ancestors[idx:] == []):
lca_id = ancestor_id
lca_index_in_b = idx
break
if a.parent_id is None:
steps_up = 0
lca_index_in_b = len(b_ancestors)
elif lca_id is None:
steps_up = len(a_dir_ancestors)
lca_index_in_b = len(b_ancestors)
else:
steps_up = 0
for ancestor_id in a_dir_ancestors:
if ancestor_id == lca_id:
break
steps_up += 1
path_down: list[str] = []
for i in range(lca_index_in_b - 1, -1, -1):
node = self.get(b_ancestors[i])
if node:
path_down.append(node.name)
if steps_up == 0:
return "./" + "/".join(path_down)
parts: list[str] = [".."] * steps_up + path_down
return "/".join(parts)
def get_descendant_ids(self, node_id: str) -> list[str]:
result: list[str] = []
stack = [node_id]
while stack:
current_id = stack.pop()
for child in self.nodes:
if child.parent_id == current_id:
result.append(child.id)
stack.append(child.id)
return result
def add(self, node: AppAssetNode) -> AppAssetNode:
if self.get(node.id):
raise TreePathConflictError(node.id)
if self.has_child_named(node.parent_id, node.name):
raise TreePathConflictError(node.name)
if node.parent_id:
parent = self.get(node.parent_id)
if not parent or parent.node_type != AssetNodeType.FOLDER:
raise TreeParentNotFoundError(node.parent_id)
siblings = self.get_children(node.parent_id)
node.order = max((s.order for s in siblings), default=-1) + 1
self.nodes.append(node)
return node
def update(self, node_id: str, size: int) -> AppAssetNode:
node = self.get(node_id)
if not node or node.node_type != AssetNodeType.FILE:
raise TreeNodeNotFoundError(node_id)
node.size = size
return node
def rename(self, node_id: str, new_name: str) -> AppAssetNode:
node = self.get(node_id)
if not node:
raise TreeNodeNotFoundError(node_id)
if node.name != new_name and self.has_child_named(node.parent_id, new_name):
raise TreePathConflictError(new_name)
node.name = new_name
if node.node_type == AssetNodeType.FILE:
node.extension = new_name.rsplit(".", 1)[-1] if "." in new_name else ""
return node
def move(self, node_id: str, new_parent_id: str | None) -> AppAssetNode:
node = self.get(node_id)
if not node:
raise TreeNodeNotFoundError(node_id)
if new_parent_id:
parent = self.get(new_parent_id)
if not parent or parent.node_type != AssetNodeType.FOLDER:
raise TreeParentNotFoundError(new_parent_id)
if self.has_child_named(new_parent_id, node.name):
raise TreePathConflictError(node.name)
node.parent_id = new_parent_id
siblings = self.get_children(new_parent_id)
node.order = max((s.order for s in siblings if s.id != node_id), default=-1) + 1
return node
def reorder(self, node_id: str, after_node_id: str | None) -> AppAssetNode:
node = self.get(node_id)
if not node:
raise TreeNodeNotFoundError(node_id)
siblings = sorted(self.get_children(node.parent_id), key=lambda x: x.order)
siblings = [s for s in siblings if s.id != node_id]
if after_node_id is None:
insert_idx = 0
else:
after_node = self.get(after_node_id)
if not after_node or after_node.parent_id != node.parent_id:
raise TreeNodeNotFoundError(after_node_id)
insert_idx = next((i for i, s in enumerate(siblings) if s.id == after_node_id), -1) + 1
siblings.insert(insert_idx, node)
for idx, sibling in enumerate(siblings):
sibling.order = idx
return node
def remove(self, node_id: str) -> list[str]:
node = self.get(node_id)
if not node:
raise TreeNodeNotFoundError(node_id)
ids_to_remove = [node_id] + self.get_descendant_ids(node_id)
self.nodes = [n for n in self.nodes if n.id not in ids_to_remove]
return ids_to_remove
def walk_files(self) -> Generator[AppAssetNode, None, None]:
return (n for n in self.nodes if n.node_type == AssetNodeType.FILE)
def transform(self) -> list[AppAssetNodeView]:
by_parent: dict[str | None, list[AppAssetNode]] = defaultdict(list)
for n in self.nodes:
by_parent[n.parent_id].append(n)
for children in by_parent.values():
children.sort(key=lambda x: x.order)
paths: dict[str, str] = {}
tree_views: dict[str, AppAssetNodeView] = {}
def build_view(node: AppAssetNode, parent_path: str) -> None:
path = f"{parent_path}/{node.name}"
paths[node.id] = path
child_views: list[AppAssetNodeView] = []
for child in by_parent.get(node.id, []):
build_view(child, path)
child_views.append(tree_views[child.id])
tree_views[node.id] = AppAssetNodeView(
id=node.id,
node_type=node.node_type.value,
name=node.name,
path=path,
extension=node.extension,
size=node.size,
children=child_views,
)
for root_node in by_parent.get(None, []):
build_view(root_node, "")
return [tree_views[n.id] for n in by_parent.get(None, [])]
def empty(self) -> bool:
return len(self.nodes) == 0

View File

@@ -0,0 +1,96 @@
from __future__ import annotations
import re
from datetime import UTC, datetime
from pydantic import BaseModel, ConfigDict, Field
from core.app.entities.app_asset_entities import AppAssetFileTree
# Constants
BUNDLE_DSL_FILENAME_PATTERN = re.compile(r"^[^/]+\.ya?ml$")
BUNDLE_MAX_SIZE = 50 * 1024 * 1024 # 50MB
MANIFEST_FILENAME = "manifest.json"
MANIFEST_SCHEMA_VERSION = "1.0"
# Exceptions
class BundleFormatError(Exception):
"""Raised when bundle format is invalid."""
pass
class ZipSecurityError(Exception):
"""Raised when zip file contains security violations."""
pass
# Manifest DTOs
class ManifestFileEntry(BaseModel):
"""Maps node_id to file path in the bundle."""
model_config = ConfigDict(extra="forbid")
node_id: str
path: str
class ManifestIntegrity(BaseModel):
"""Basic integrity check fields."""
model_config = ConfigDict(extra="forbid")
file_count: int
class ManifestAppAssets(BaseModel):
"""App assets section containing the full tree."""
model_config = ConfigDict(extra="forbid")
tree: AppAssetFileTree
class BundleManifest(BaseModel):
"""
Bundle manifest for app asset import/export.
Schema version 1.0:
- dsl_filename: DSL file name in bundle root (e.g. "my_app.yml")
- tree: Full AppAssetFileTree (files + folders) for 100% restoration including node IDs
- files: Explicit node_id -> path mapping for file nodes only
- integrity: Basic file_count validation
"""
model_config = ConfigDict(extra="forbid")
schema_version: str = Field(default=MANIFEST_SCHEMA_VERSION)
generated_at: datetime = Field(default_factory=lambda: datetime.now(tz=UTC))
dsl_filename: str = Field(description="DSL file name in bundle root")
app_assets: ManifestAppAssets
files: list[ManifestFileEntry]
integrity: ManifestIntegrity
@property
def assets_prefix(self) -> str:
"""Assets directory name (DSL filename without extension)."""
return self.dsl_filename.rsplit(".", 1)[0]
@classmethod
def from_tree(cls, tree: AppAssetFileTree, dsl_filename: str) -> BundleManifest:
"""Build manifest from an AppAssetFileTree."""
files = [ManifestFileEntry(node_id=n.id, path=tree.get_path(n.id)) for n in tree.walk_files()]
return cls(
dsl_filename=dsl_filename,
app_assets=ManifestAppAssets(tree=tree),
files=files,
integrity=ManifestIntegrity(file_count=len(files)),
)
# Export result
class BundleExportResult(BaseModel):
download_url: str = Field(description="Temporary download URL for the ZIP")
filename: str = Field(description="Suggested filename for the ZIP")

View File

@@ -46,6 +46,9 @@ class InvokeFrom(StrEnum):
return source_mapping.get(self, "dev")
DIFY_SANDBOX_CONTEXT_KEY = "_dify_sandbox"
class DifyRunContext(BaseModel):
tenant_id: str
app_id: str

View File

@@ -0,0 +1,72 @@
"""
LLM Generation Detail entities.
Defines the structure for storing and transmitting LLM generation details
including reasoning content, tool calls, and their sequence.
"""
from typing import Literal
from pydantic import BaseModel, Field
class ContentSegment(BaseModel):
"""Represents a content segment in the generation sequence."""
type: Literal["content"] = "content"
start: int = Field(..., description="Start position in the text")
end: int = Field(..., description="End position in the text")
class ReasoningSegment(BaseModel):
"""Represents a reasoning segment in the generation sequence."""
type: Literal["reasoning"] = "reasoning"
index: int = Field(..., description="Index into reasoning_content array")
class ToolCallSegment(BaseModel):
"""Represents a tool call segment in the generation sequence."""
type: Literal["tool_call"] = "tool_call"
index: int = Field(..., description="Index into tool_calls array")
SequenceSegment = ContentSegment | ReasoningSegment | ToolCallSegment
class ToolCallDetail(BaseModel):
"""Represents a tool call with its arguments and result."""
id: str = Field(default="", description="Unique identifier for the tool call")
name: str = Field(..., description="Name of the tool")
arguments: str = Field(default="", description="JSON string of tool arguments")
result: str = Field(default="", description="Result from the tool execution")
elapsed_time: float | None = Field(default=None, description="Elapsed time in seconds")
icon: str | dict | None = Field(default=None, description="Icon of the tool")
icon_dark: str | dict | None = Field(default=None, description="Dark theme icon of the tool")
class LLMGenerationDetailData(BaseModel):
"""
Domain model for LLM generation detail.
Contains the structured data for reasoning content, tool calls,
and their display sequence.
"""
reasoning_content: list[str] = Field(default_factory=list, description="List of reasoning segments")
tool_calls: list[ToolCallDetail] = Field(default_factory=list, description="List of tool call details")
sequence: list[SequenceSegment] = Field(default_factory=list, description="Display order of segments")
def is_empty(self) -> bool:
"""Check if there's any meaningful generation detail."""
return not self.reasoning_content and not self.tool_calls
def to_response_dict(self) -> dict:
"""Convert to dictionary for API response."""
return {
"reasoning_content": self.reasoning_content,
"tool_calls": [tc.model_dump() for tc in self.tool_calls],
"sequence": [seg.model_dump() for seg in self.sequence],
}

View File

@@ -10,7 +10,7 @@ from graphon.model_runtime.entities.llm_entities import LLMResult, LLMResultChun
from pydantic import BaseModel, ConfigDict, Field
from core.app.entities.agent_strategy import AgentStrategyInfo
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.rag.entities import RetrievalSourceMetadata
class QueueEvent(StrEnum):

View File

@@ -9,7 +9,7 @@ from graphon.nodes.human_input.entities import FormInput, UserAction
from pydantic import BaseModel, ConfigDict, Field
from core.app.entities.agent_strategy import AgentStrategyInfo
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.rag.entities import RetrievalSourceMetadata
class AnnotationReplyAccount(BaseModel):

View File

View File

@@ -0,0 +1,22 @@
import logging
from core.sandbox import Sandbox
from graphon.graph_engine.layers.base import GraphEngineLayer
from graphon.graph_events.base import GraphEngineEvent
logger = logging.getLogger(__name__)
class SandboxLayer(GraphEngineLayer):
def __init__(self, sandbox: Sandbox) -> None:
super().__init__()
self._sandbox = sandbox
def on_graph_start(self) -> None:
pass
def on_event(self, event: GraphEngineEvent) -> None:
pass
def on_graph_end(self, error: Exception | None) -> None:
self._sandbox.release()

View File

@@ -509,8 +509,8 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
:return:
"""
with Session(db.engine, expire_on_commit=False) as session:
agent_thought: MessageAgentThought | None = (
session.query(MessageAgentThought).where(MessageAgentThought.id == event.agent_thought_id).first()
agent_thought: MessageAgentThought | None = session.scalar(
select(MessageAgentThought).where(MessageAgentThought.id == event.agent_thought_id).limit(1)
)
if agent_thought:

View File

@@ -0,0 +1,13 @@
from .constants import AppAssetsAttrs
from .entities import (
AssetItem,
SkillAsset,
)
from .storage import AssetPaths
__all__ = [
"AppAssetsAttrs",
"AssetItem",
"AssetPaths",
"SkillAsset",
]

View File

@@ -0,0 +1,180 @@
"""Unified content accessor for app asset nodes.
Accessor is scoped to a single app (tenant_id + app_id), not a single node.
All methods accept an AppAssetNode parameter to identify the target.
CachedContentAccessor is the primary entry point:
- Reads DB first, misses fall through to S3 with sync backfill.
- Writes go to both DB and S3 (dual-write).
- resolve_items() batch-enriches AssetItem lists with DB-cached content
(extension-agnostic), so callers never need to filter by extension.
- Wraps an internal _StorageAccessor for S3 I/O.
Collaborators:
- services.asset_content_service.AssetContentService (DB layer)
- core.app_assets.storage.AssetPaths (S3 key generation)
- extensions.storage.cached_presign_storage.CachedPresignStorage (S3 I/O)
"""
from __future__ import annotations
import logging
from core.app.entities.app_asset_entities import AppAssetNode
from core.app_assets.entities.assets import AssetItem
from core.app_assets.storage import AssetPaths
from extensions.storage.cached_presign_storage import CachedPresignStorage
from services.asset_content_service import AssetContentService
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# S3-only implementation (internal, used as inner delegate)
# ---------------------------------------------------------------------------
class _StorageAccessor:
"""Reads/writes draft content via object storage (S3) only."""
_storage: CachedPresignStorage
_tenant_id: str
_app_id: str
def __init__(self, storage: CachedPresignStorage, tenant_id: str, app_id: str) -> None:
self._storage = storage
self._tenant_id = tenant_id
self._app_id = app_id
def _key(self, node: AppAssetNode) -> str:
return AssetPaths.draft(self._tenant_id, self._app_id, node.id)
def load(self, node: AppAssetNode) -> bytes:
return self._storage.load_once(self._key(node))
def save(self, node: AppAssetNode, content: bytes) -> None:
self._storage.save(self._key(node), content)
def delete(self, node: AppAssetNode) -> None:
try:
self._storage.delete(self._key(node))
except Exception:
logger.warning("Failed to delete storage key %s", self._key(node), exc_info=True)
# ---------------------------------------------------------------------------
# DB-cached implementation (the public API)
# ---------------------------------------------------------------------------
class CachedContentAccessor:
"""App-level content accessor with DB read-through cache over S3.
Read path: DB first -> miss -> S3 fallback -> sync backfill DB
Write path: DB upsert + S3 save (dual-write)
Delete path: DB delete + S3 delete
bulk_load uses a single SQL query for all nodes, with S3 fallback per miss.
Usage:
accessor = CachedContentAccessor(storage, tenant_id, app_id)
content = accessor.load(node)
accessor.save(node, content)
results = accessor.bulk_load(nodes)
"""
_inner: _StorageAccessor
_tenant_id: str
_app_id: str
def __init__(self, storage: CachedPresignStorage, tenant_id: str, app_id: str) -> None:
self._inner = _StorageAccessor(storage, tenant_id, app_id)
self._tenant_id = tenant_id
self._app_id = app_id
def load(self, node: AppAssetNode) -> bytes:
# 1. Try DB
cached = AssetContentService.get(self._tenant_id, self._app_id, node.id)
if cached is not None:
return cached.encode("utf-8")
# 2. Fallback to S3
data = self._inner.load(node)
# 3. Sync backfill DB
AssetContentService.upsert(
tenant_id=self._tenant_id,
app_id=self._app_id,
node_id=node.id,
content=data.decode("utf-8"),
size=len(data),
)
return data
def bulk_load(self, nodes: list[AppAssetNode]) -> dict[str, bytes]:
"""Single SQL for all nodes, S3 fallback + backfill per miss."""
result: dict[str, bytes] = {}
node_ids = [n.id for n in nodes]
cached = AssetContentService.get_many(self._tenant_id, self._app_id, node_ids)
for node in nodes:
if node.id in cached:
result[node.id] = cached[node.id].encode("utf-8")
else:
# S3 fallback + sync backfill
data = self._inner.load(node)
AssetContentService.upsert(
tenant_id=self._tenant_id,
app_id=self._app_id,
node_id=node.id,
content=data.decode("utf-8"),
size=len(data),
)
result[node.id] = data
return result
def save(self, node: AppAssetNode, content: bytes) -> None:
# Dual-write: DB + S3
AssetContentService.upsert(
tenant_id=self._tenant_id,
app_id=self._app_id,
node_id=node.id,
content=content.decode("utf-8"),
size=len(content),
)
self._inner.save(node, content)
def resolve_items(self, items: list[AssetItem]) -> list[AssetItem]:
"""Batch-enrich asset items with DB-cached content.
Queries by ``asset_id`` only — extension-agnostic. Items without
a DB cache row keep their original *content* value (typically
``None``), so only genuinely cached assets (e.g. ``.md`` skill
documents) get populated.
This eliminates the need for callers to filter by file extension
before deciding whether to read from the DB cache.
"""
if not items:
return items
node_ids = [a.asset_id for a in items]
cached = AssetContentService.get_many(self._tenant_id, self._app_id, node_ids)
if not cached:
return items
return [
AssetItem(
asset_id=a.asset_id,
path=a.path,
file_name=a.file_name,
extension=a.extension,
storage_key=a.storage_key,
content=cached[a.asset_id].encode("utf-8") if a.asset_id in cached else a.content,
)
for a in items
]
def delete(self, node: AppAssetNode) -> None:
AssetContentService.delete(self._tenant_id, self._app_id, node.id)
self._inner.delete(node)

View File

@@ -0,0 +1,12 @@
from .base import AssetBuilder, BuildContext
from .file_builder import FileBuilder
from .pipeline import AssetBuildPipeline
from .skill_builder import SkillBuilder
__all__ = [
"AssetBuildPipeline",
"AssetBuilder",
"BuildContext",
"FileBuilder",
"SkillBuilder",
]

View File

@@ -0,0 +1,20 @@
from dataclasses import dataclass
from typing import Protocol
from core.app.entities.app_asset_entities import AppAssetFileTree, AppAssetNode
from core.app_assets.entities import AssetItem
@dataclass
class BuildContext:
tenant_id: str
app_id: str
build_id: str
class AssetBuilder(Protocol):
def accept(self, node: AppAssetNode) -> bool: ...
def collect(self, node: AppAssetNode, path: str, ctx: BuildContext) -> None: ...
def build(self, tree: AppAssetFileTree, ctx: BuildContext) -> list[AssetItem]: ...

View File

@@ -0,0 +1,30 @@
from core.app.entities.app_asset_entities import AppAssetFileTree, AppAssetNode
from core.app_assets.entities import AssetItem
from core.app_assets.storage import AssetPaths
from .base import BuildContext
class FileBuilder:
_nodes: list[tuple[AppAssetNode, str]]
def __init__(self) -> None:
self._nodes = []
def accept(self, node: AppAssetNode) -> bool:
return True
def collect(self, node: AppAssetNode, path: str, ctx: BuildContext) -> None:
self._nodes.append((node, path))
def build(self, tree: AppAssetFileTree, ctx: BuildContext) -> list[AssetItem]:
return [
AssetItem(
asset_id=node.id,
path=path,
file_name=node.name,
extension=node.extension or "",
storage_key=AssetPaths.draft(ctx.tenant_id, ctx.app_id, node.id),
)
for node, path in self._nodes
]

View File

@@ -0,0 +1,27 @@
from core.app.entities.app_asset_entities import AppAssetFileTree
from core.app_assets.entities import AssetItem
from .base import AssetBuilder, BuildContext
class AssetBuildPipeline:
_builders: list[AssetBuilder]
def __init__(self, builders: list[AssetBuilder]) -> None:
self._builders = builders
def build_all(self, tree: AppAssetFileTree, ctx: BuildContext) -> list[AssetItem]:
# 1. Distribute: each node goes to first accepting builder
for node in tree.walk_files():
path = tree.get_path(node.id)
for builder in self._builders:
if builder.accept(node):
builder.collect(node, path, ctx)
break
# 2. Each builder builds its collected nodes
results: list[AssetItem] = []
for builder in self._builders:
results.extend(builder.build(tree, ctx))
return results

View File

@@ -0,0 +1,96 @@
"""Builder that compiles ``.md`` skill documents into resolved content.
The builder reads raw draft content from the DB-backed accessor, parses
each into a ``SkillDocument``, assembles a ``SkillBundle`` (with
transitive tool/file dependency resolution), and returns ``AssetItem``
objects whose *content* field carries the resolved bytes in-process.
The assembled ``SkillBundle`` is persisted via ``SkillManager``
(S3 + Redis) **and** retained on the ``bundle`` property so that
callers (e.g. ``DraftAppAssetsInitializer``) can pass it directly to
``sandbox.attrs`` without a redundant Redis/S3 round-trip.
"""
import json
import logging
from core.app.entities.app_asset_entities import AppAssetFileTree, AppAssetNode
from core.app_assets.accessor import CachedContentAccessor
from core.app_assets.entities import AssetItem
from core.skill.assembler import SkillBundleAssembler
from core.skill.entities.skill_bundle import SkillBundle
from core.skill.entities.skill_document import SkillDocument
from .base import BuildContext
logger = logging.getLogger(__name__)
class SkillBuilder:
_nodes: list[tuple[AppAssetNode, str]]
_accessor: CachedContentAccessor
_bundle: SkillBundle | None
def __init__(self, accessor: CachedContentAccessor) -> None:
self._nodes = []
self._accessor = accessor
self._bundle = None
@property
def bundle(self) -> SkillBundle | None:
"""The ``SkillBundle`` produced by the last ``build()`` call, or *None*."""
return self._bundle
def accept(self, node: AppAssetNode) -> bool:
return node.extension == "md"
def collect(self, node: AppAssetNode, path: str, ctx: BuildContext) -> None:
self._nodes.append((node, path))
def build(self, tree: AppAssetFileTree, ctx: BuildContext) -> list[AssetItem]:
from core.skill.skill_manager import SkillManager
if not self._nodes:
bundle = SkillBundle(assets_id=ctx.build_id, asset_tree=tree)
SkillManager.save_bundle(ctx.tenant_id, ctx.app_id, ctx.build_id, bundle)
self._bundle = bundle
return []
# Batch-load all skill draft content in one DB query (with S3 fallback on miss).
nodes_only = [node for node, _ in self._nodes]
raw_contents = self._accessor.bulk_load(nodes_only)
# Parse documents — skip nodes whose draft content is still the empty
# placeholder written at creation time.
documents: dict[str, SkillDocument] = {}
for node, _ in self._nodes:
try:
raw = raw_contents.get(node.id)
if not raw:
continue
data = {"skill_id": node.id, **json.loads(raw)}
documents[node.id] = SkillDocument.model_validate(data)
except (FileNotFoundError, json.JSONDecodeError, TypeError, ValueError) as e:
logger.exception("Failed to load or parse skill document for node %s", node.id)
raise ValueError(f"Failed to load or parse skill document for node {node.id}") from e
bundle = SkillBundleAssembler(tree).assemble_bundle(documents, ctx.build_id)
SkillManager.save_bundle(ctx.tenant_id, ctx.app_id, ctx.build_id, bundle)
self._bundle = bundle
items: list[AssetItem] = []
for node, path in self._nodes:
skill = bundle.get(node.id)
if skill is None:
continue
items.append(
AssetItem(
asset_id=node.id,
path=path,
file_name=node.name,
extension=node.extension or "",
storage_key="",
content=skill.content.encode("utf-8"),
)
)
return items

View File

@@ -0,0 +1,8 @@
from core.app.entities.app_asset_entities import AppAssetFileTree
from libs.attr_map import AttrKey
class AppAssetsAttrs:
# Skill artifact set
FILE_TREE = AttrKey("file_tree", AppAssetFileTree)
APP_ASSETS_ID = AttrKey("app_assets_id", str)

View File

@@ -0,0 +1,20 @@
from __future__ import annotations
from core.app.entities.app_asset_entities import AppAssetFileTree, AssetNodeType
from core.app_assets.entities import AssetItem
from core.app_assets.storage import AssetPaths
def tree_to_asset_items(tree: AppAssetFileTree, tenant_id: str, app_id: str) -> list[AssetItem]:
"""Convert AppAssetFileTree to list of AssetItem for packaging."""
return [
AssetItem(
asset_id=node.id,
path=tree.get_path(node.id),
file_name=node.name,
extension=node.extension or "",
storage_key=AssetPaths.draft(tenant_id, app_id, node.id),
)
for node in tree.nodes
if node.node_type == AssetNodeType.FILE
]

View File

@@ -0,0 +1,7 @@
from .assets import AssetItem
from .skill import SkillAsset
__all__ = [
"AssetItem",
"SkillAsset",
]

View File

@@ -0,0 +1,20 @@
from dataclasses import dataclass, field
@dataclass
class AssetItem:
"""A single asset file produced by the build pipeline.
When *content* is set the payload is available in-process and can be
written directly into a ZIP or uploaded to a sandbox VM without an
extra S3 round-trip. When *content* is ``None`` the caller should
fetch the bytes from *storage_key* (the traditional presigned-URL
path).
"""
asset_id: str
path: str
file_name: str
extension: str
storage_key: str
content: bytes | None = field(default=None, repr=False)

View File

@@ -0,0 +1,10 @@
from collections.abc import Mapping
from dataclasses import dataclass, field
from typing import Any
from .assets import AssetItem
@dataclass
class SkillAsset(AssetItem):
metadata: Mapping[str, Any] = field(default_factory=dict)

View File

@@ -0,0 +1,68 @@
"""App assets storage key generation.
Provides AssetPaths facade for generating storage keys for app assets.
Storage instances are obtained via AppAssetService.get_storage().
"""
from __future__ import annotations
from uuid import UUID
_BASE = "app_assets"
def _check_uuid(value: str, name: str) -> None:
try:
UUID(value)
except (ValueError, TypeError) as e:
raise ValueError(f"{name} must be a valid UUID") from e
class AssetPaths:
"""Facade for generating app asset storage keys."""
@staticmethod
def draft(tenant_id: str, app_id: str, node_id: str) -> str:
"""app_assets/{tenant}/{app}/draft/{node_id}"""
_check_uuid(tenant_id, "tenant_id")
_check_uuid(app_id, "app_id")
_check_uuid(node_id, "node_id")
return f"{_BASE}/{tenant_id}/{app_id}/draft/{node_id}"
@staticmethod
def build_zip(tenant_id: str, app_id: str, assets_id: str) -> str:
"""app_assets/{tenant}/{app}/artifacts/{assets_id}.zip"""
_check_uuid(tenant_id, "tenant_id")
_check_uuid(app_id, "app_id")
_check_uuid(assets_id, "assets_id")
return f"{_BASE}/{tenant_id}/{app_id}/artifacts/{assets_id}.zip"
@staticmethod
def skill_bundle(tenant_id: str, app_id: str, assets_id: str) -> str:
"""app_assets/{tenant}/{app}/artifacts/{assets_id}/skill_artifact_set.json"""
_check_uuid(tenant_id, "tenant_id")
_check_uuid(app_id, "app_id")
_check_uuid(assets_id, "assets_id")
return f"{_BASE}/{tenant_id}/{app_id}/artifacts/{assets_id}/skill_artifact_set.json"
@staticmethod
def source_zip(tenant_id: str, app_id: str, workflow_id: str) -> str:
"""app_assets/{tenant}/{app}/sources/{workflow_id}.zip"""
_check_uuid(tenant_id, "tenant_id")
_check_uuid(app_id, "app_id")
_check_uuid(workflow_id, "workflow_id")
return f"{_BASE}/{tenant_id}/{app_id}/sources/{workflow_id}.zip"
@staticmethod
def bundle_export(tenant_id: str, app_id: str, export_id: str) -> str:
"""app_assets/{tenant}/{app}/bundle_exports/{export_id}.zip"""
_check_uuid(tenant_id, "tenant_id")
_check_uuid(app_id, "app_id")
_check_uuid(export_id, "export_id")
return f"{_BASE}/{tenant_id}/{app_id}/bundle_exports/{export_id}.zip"
@staticmethod
def bundle_import(tenant_id: str, import_id: str) -> str:
"""app_assets/{tenant}/imports/{import_id}.zip"""
_check_uuid(tenant_id, "tenant_id")
return f"{_BASE}/{tenant_id}/imports/{import_id}.zip"

View File

@@ -0,0 +1 @@
# App bundle utilities - manifest-driven import/export handled by AppBundleService

View File

@@ -6,7 +6,7 @@ from sqlalchemy import select, update
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import QueueRetrieverResourcesEvent
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.rag.entities import RetrievalSourceMetadata
from core.rag.index_processor.constant.index_type import IndexStructureType
from core.rag.models.document import Document
from extensions.ext_database import db

View File

@@ -345,8 +345,8 @@ class DatasourceManager:
@classmethod
def get_upload_file_by_id(cls, file_id: str, tenant_id: str) -> File:
with session_factory.create_session() as session:
upload_file = (
session.query(UploadFile).where(UploadFile.id == file_id, UploadFile.tenant_id == tenant_id).first()
upload_file = session.scalar(
select(UploadFile).where(UploadFile.id == file_id, UploadFile.tenant_id == tenant_id).limit(1)
)
if not upload_file:
raise ValueError(f"UploadFile not found for file_id={file_id}, tenant_id={tenant_id}")

View File

@@ -1,22 +1,3 @@
from pydantic import BaseModel, Field, model_validator
from core.tools.entities.common_entities import I18nObject, I18nObjectDict
class I18nObject(BaseModel):
"""
Model class for i18n object.
"""
en_US: str
zh_Hans: str | None = Field(default=None)
pt_BR: str | None = Field(default=None)
ja_JP: str | None = Field(default=None)
@model_validator(mode="after")
def _(self):
self.zh_Hans = self.zh_Hans or self.en_US
self.pt_BR = self.pt_BR or self.en_US
self.ja_JP = self.ja_JP or self.en_US
return self
def to_dict(self) -> dict:
return {"zh_Hans": self.zh_Hans, "en_US": self.en_US, "pt_BR": self.pt_BR, "ja_JP": self.ja_JP}
__all__ = ["I18nObject", "I18nObjectDict"]

View File

@@ -9,7 +9,7 @@ from yarl import URL
from configs import dify_config
from core.entities.provider_entities import ProviderConfig
from core.plugin.entities.oauth import OAuthSchema
from core.plugin.entities import OAuthSchema
from core.plugin.entities.parameters import (
PluginParameter,
PluginParameterOption,

View File

@@ -1 +1,8 @@
from core.entities.plugin_credential_type import PluginCredentialType
DEFAULT_PLUGIN_ID = "langgenius"
__all__ = [
"DEFAULT_PLUGIN_ID",
"PluginCredentialType",
]

Some files were not shown because too many files have changed in this diff Show More